Clever Geek Handbook
📜 ⬆️ ⬇️

Conditional variable

A conditional variable is a synchronization primitive that provides blocking of one or several streams until a signal arrives from another stream about the fulfillment of a certain condition or before the expiration of the maximum waiting period. Conditional variables are used together with the associated mutex and are an element of some types of monitors .

Overview

Conceptually, a conditional variable is a queue of threads associated with a shared data object that is awaiting the fulfillment of some condition imposed on the state of the data. Thus, each conditional variablec {\ displaystyle c} c associated with the statementPc {\ displaystyle P_ {c}} P_c . When a thread is idle on a conditional variable, it is not considered to own data and another thread may change the shared object and signal waiting threads if the statement is executedPc {\ displaystyle P_ {c}} P_c .

Use

The given example illustrates the use of conditional variables for synchronization of producer and consumer flows. The producer thread, gradually increasing the value of the general variable, signals the thread waiting on the conditional variable to complete the statement that the maximum value is exceeded. The waiting consumer stream, checking the value of the common variable, is blocked if the condition for exceeding the maximum is not met. Upon receipt of a signal about the truth of the statement, the stream "consumes" the shared resource, reducing the value of the common variable so that it does not become less than the permissible minimum.

POSIX threads

In the POSIX threads library for the C language, functions and data structures with the pthread_cond prefix are responsible for using conditional variables.

C source code using POSIX Threads
  #include <stdlib.h> 
 #include <stdio.h> 

 #include <unistd.h> 
 #include <pthread.h> 

 #define STORAGE_MIN 10
 #define STORAGE_MAX 20

 / * Shared resource * /
 int storage = STORAGE_MIN ;

 pthread_mutex_t mutex ;
 pthread_cond_t condition ;

 / * Consumer stream function * /
 void * consumer ( void * args )
 {
	 puts ( "[CONSUMER] thread started" );
	 int toConsume = 0 ;
	
	 while ( 1 )
	 {
		 pthread_mutex_lock ( & mutex );
		 / * If the value of the shared variable is less than the maximum,
 * then the stream enters the idle state of reaching the signal
 * maximum * /
		 while ( storage < STORAGE_MAX )
		 {
			 pthread_cond_wait ( & condition , & mutex );
		 }
		 toConsume = storage - STORAGE_MIN ;
		 printf ( "[CONSUMER] storage is maximum, consuming% d \ n " , \
			 toConsume );
		 / * "Consumption" of permissible volume from the value of the total
 * variable * /
		 storage - = toConsume ;
		 printf ( "[CONSUMER] storage =% d \ n " , storage );
		 pthread_mutex_unlock ( & mutex );
	 }
	
	 return NULL ;
 }

 / * Manufacturer stream function * /
 void * producer ( void * args )
 {
	 puts ( "[PRODUCER] thread started" );
	
	 while ( 1 )
	 {
		 usleep ( 200000 );
		 pthread_mutex_lock ( & mutex );
		 / * The manufacturer constantly increases the value of the common variable * /
		 ++ storage ;
		 printf ( "[PRODUCER] storage =% d \ n " , storage );
		 / * If the value of the common variable has reached or exceeded
 * maximum, consumer flow is notified about this * /
		 if ( storage > = STORAGE_MAX )
		 {
			 puts ( "[PRODUCER] storage maximum" );
			 pthread_cond_signal ( & condition );
		 }
		 pthread_mutex_unlock ( & mutex );
	 }
	 return NULL ;
 }

 int main ( int argc , char * argv [])
 {
	 int res = 0 ;
	 pthread_t thProducer , thConsumer ;
	
	 pthread_mutex_init ( & mutex , NULL );
	 pthread_cond_init ( & condition , NULL );
	
	 res = pthread_create ( & thProducer , NULL , producer , NULL );
	 if ( res ! = 0 )
	 {
		 perror ( "pthread_create" );
		 exit ( EXIT_FAILURE );
	 }
	
	 res = pthread_create ( & thConsumer , NULL , consumer , NULL );
	 if ( res ! = 0 )
	 {
		 perror ( "pthread_create" );
		 exit ( EXIT_FAILURE );
	 }
	
	 pthread_join ( thProducer , NULL );
	 pthread_join ( thConsumer , NULL );
	
	 return EXIT_SUCCESS ;
 }

C ++

The C ++ 11 standard has added multithreading support to the language. Working with conditional variables is provided by means declared in the condition_variable header file.

C ++ source code (C ++ 11)
  #include <cstdlib> 
 #include <iostream> 

 #include <thread> 
 #include <mutex> 
 #include <condition_variable> 
 #include <chrono> 

 #define STORAGE_MIN 10
 #define STORAGE_MAX 20

 int storage = STORAGE_MIN ;

 std :: mutex globalMutex ;
 std :: condition_variable condition ;

 / * Consumer stream function * /
 void consumer ()
 {
	 std :: cout << "[CONSUMER] thread started" << std :: endl ;
	 int toConsume = 0 ;
	
	 while ( true )
	 {
		 std :: unique_lock < std :: mutex > lock ( globalMutex );
		 / * If the value of the shared variable is less than the maximum,
 * then the stream enters the idle state of reaching the signal
 * maximum * /
		 if ( storage < STORAGE_MAX )
		 {
			 condition .  wait ( lock );  // Atomically _ releases the mutex_ and immediately blocks the thread
			 toConsume = storage - STORAGE_MIN ;
			 std :: cout << "[CONSUMER] storage is maximum, consuming"
				       << toConsume << std :: endl ;
		 }
		 / * "Consumption" of permissible volume from the value of the total
 * variable * /
		 storage - = toConsume ;
		 std :: cout << "[CONSUMER] storage =" << storage << std :: endl ;
	 }
 }

 / * Manufacturer stream function * /
 void producer ()
 {
	 std :: cout << "[PRODUCER] thread started" << std :: endl ;

	 while ( true )
	 {
		 std :: this_thread :: sleep_for ( std :: chrono :: milliseconds ( 200 ));  
    
		 std :: unique_lock < std :: mutex > lock ( globalMutex );
		 ++ storage ;
		 std :: cout << "[PRODUCER] storage =" << storage << std :: endl ;
		 / * If the value of the common variable has reached or exceeded
 * maximum, consumer flow is notified about this * /
		 if ( storage > = STORAGE_MAX )
		 {
			 std :: cout << "[PRODUCER] storage maximum" << std :: endl ;
			 condition .  notify_one ();
		 }
	 }
 }

 int main ( int argc , char * argv [])
 {
	 std :: thread thProducer ( producer );
	 std :: thread thConsumer ( consumer );
	
	 thProducer .  join ();
	 thConsumer .  join ();
	
	 return 0 ;
 }

Qt 4

C ++ source code using Qt libraries

cw.h

  #ifndef CW_H
 #define CW_H

 #include <QThread> 
 #include <QMutex> 
 #include <QWaitCondition> 
 #include <QDebug> 

 #define STORAGE_MIN 10
 #define STORAGE_MAX 20

 extern int storage ;
 extern QMutex qmt ;
 extern QWaitCondition condition ;

 class Producer : public QThread
 {
     Q_OBJECT
 private :
     void run ()
     {
         qDebug () << "[PRODUCER] thread started" ;
        
         while ( 1 )
	     {
             QThread :: msleep ( 200 );
    
             qmt .  lock ();
             ++ storage ;
             qDebug () << "[PRODUCER] storage =" << storage ;
             / * If the value of the common variable has reached or exceeded
 * maximum, consumer flow is notified about this * /
             if ( storage > = STORAGE_MAX )
             {
                 qDebug () << "[PRODUCER] storage maximum" ;
                 condition .  wakeOne ();
             }
             qmt .  unlock ();
	     }
     }
 };

 class Consumer : public QThread
 {
     Q_OBJECT
 private :
     void run ()
     {
         qDebug () << "[CONSUMER] thread started" ;

         int toConsume = 0 ;
         while ( 1 )
         {
             qmt .  lock ();
             / * If the value of the shared variable is less than the maximum,
 * then the stream enters the idle state of reaching the signal
 * maximum * /
             if ( storage < STORAGE_MAX )
             {
                 condition .  wait ( & qmt );
		         toConsume = storage - STORAGE_MIN ;
		         qDebug () << "[CONSUMER] storage is maximum, consuming"
		           << toConsume ;
             }
             / * "Consumption" of permissible volume from the value of the total
 * variable * /
             storage - = toConsume ;
             qDebug () << "[CONSUMER] storage =" << storage ;
             qmt .  unlock ();
	     }
     }
 };

 #endif / * CW_H * / 

main.cpp

  #include <QCoreApplication> 
 #include "cw.h" 

 int storage = STORAGE_MIN ;
 QMutex qmt ;
 QWaitCondition condition ;

 int main ( int argc , char * argv [])
 {
     QCoreApplication app ( argc , argv );

     Producer prod ;
     Consumer cons ;

     prod .  start ();
     cons .  start ();

     return app .  exec ();
 }

Python

In Python, conditional variables are implemented as instances of the Condition class of the threading module. In the following example, the same conditional variable is used in producer and consumer flows using the context manager syntax [1]

  # Consumer flow
 with cond_var : # in the context of cond_var condition
     while not an_item_is_available (): # while the item is unavailable
         cond_var .  wait () # wait
     get_an_item () # get element
  # Stream-producer
 with cond_var : # in the context of cond_var condition
     make_an_item_available () # produce an element
     cond_var .  notify () # notify consumers

Ada '95

There is no need for conditional variables in the language of Hell . To organize monitors with task locking, it is possible to use protected data types.

Ada '95 source code
  with Ada.Text_IO ;

 procedure Main is

    task producer  - manufacturer task declaration
    task consumer  - consumer task announcement

    type Storage_T is range 10 .. 20 ;  - type range for shared resource

    - monitor (protected object) shared by the manufacturer and the consumer
    protected type Storage is
       entry Put ;  - operation "produce" a resource unit
       entry Get ;  - operation "consume" the allowable amount of resource
       entry Value ( val : out Storage_T );  - variable value accessor
    private
       - a hidden variable with a minimum initial value from a range of type
       StorageData : Storage_T : = Storage_T ' First ; 
    end storage ;

    - implementation of the Storage monitor
    protected body storage is
       entry Put when StorageData < Storage_T ' Last is
       begin
          StorageData : = StorageData + 1 ;
          if StorageData > = Storage_T ' Last then
             Ada .  Text_IO .  Put_Line ( "[PRODUCER] storage maximum" );
          end if ;
       end ;

       entry Get when StorageData > = Storage_T ' Last is
          To_Consume : Storage_T ;
       begin
          To_Consume : = StorageData - Storage_T ' First ;
          StorageData : = StorageData - To_Consume ;
          Ada .  Text_IO .  Put_Line ( "[CONSUMER] consuming" );
       end get ;

       entry Value ( val : out Storage_T ) when true is
       begin
          val : = StorageData ;
       end ;

    end storage ;
    - instance of the Storage monitor
    Storage1 : Storage ;
    - implementation of the task of the manufacturer
    task body Producer is
       v : Storage_T ;
    begin
       Ada .  Text_IO .  Put_Line ( "[PRODUCER] Task started" );
       loop
          delay 0.2 ;
          Storage1 .  Put ;
          Storage1 .  Value ( v );
          Ada .  Text_IO .  Put ( "[PRODUCER]" );
          Ada .  Text_IO .  Put_Line ( v ' Img );
       end loop ;
    end Producer ;
    - implementation of the consumer's task
    task body Consumer is
    begin
       Ada .  Text_IO .  Put_Line ( "[CONSUMER] Task started" );
       loop
          Storage1 .  Get
       end loop ;
    end consumer

 begin
    null
 end Main ;

Notes

  1. ↑ The Python Standard Library, 16.2. threading - Higher-level threading interface
Source - https://ru.wikipedia.org/w/index.php?title=Condition_Variable&oldid=92708578


More articles:

  • Rubin, Bruce Joel
  • Tivoli World
  • Old Madushi
  • Combofontaine
  • Zhevigne-e-Merse
  • Battle of Merv
  • Dudal, Mikhail Stepanovich
  • Bourdon, Francois-Louis
  • Lambre (commune)
  • Lyakin, Ivan Petrovich

All articles

Clever Geek | 2019