A conditional variable is a synchronization primitive that provides blocking of one or several streams until a signal arrives from another stream about the fulfillment of a certain condition or before the expiration of the maximum waiting period. Conditional variables are used together with the associated mutex and are an element of some types of monitors .
Overview
Conceptually, a conditional variable is a queue of threads associated with a shared data object that is awaiting the fulfillment of some condition imposed on the state of the data. Thus, each conditional variable associated with the statement
. When a thread is idle on a conditional variable, it is not considered to own data and another thread may change the shared object and signal waiting threads if the statement is executed
.
Use
The given example illustrates the use of conditional variables for synchronization of producer and consumer flows. The producer thread, gradually increasing the value of the general variable, signals the thread waiting on the conditional variable to complete the statement that the maximum value is exceeded. The waiting consumer stream, checking the value of the common variable, is blocked if the condition for exceeding the maximum is not met. Upon receipt of a signal about the truth of the statement, the stream "consumes" the shared resource, reducing the value of the common variable so that it does not become less than the permissible minimum.
POSIX threads
In the POSIX threads library for the C language, functions and data structures with the pthread_cond prefix are responsible for using conditional variables.
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
/ * Shared resource * /
int storage = STORAGE_MIN ;
pthread_mutex_t mutex ;
pthread_cond_t condition ;
/ * Consumer stream function * /
void * consumer ( void * args )
{
puts ( "[CONSUMER] thread started" );
int toConsume = 0 ;
while ( 1 )
{
pthread_mutex_lock ( & mutex );
/ * If the value of the shared variable is less than the maximum,
* then the stream enters the idle state of reaching the signal
* maximum * /
while ( storage < STORAGE_MAX )
{
pthread_cond_wait ( & condition , & mutex );
}
toConsume = storage - STORAGE_MIN ;
printf ( "[CONSUMER] storage is maximum, consuming% d \ n " , \
toConsume );
/ * "Consumption" of permissible volume from the value of the total
* variable * /
storage - = toConsume ;
printf ( "[CONSUMER] storage =% d \ n " , storage );
pthread_mutex_unlock ( & mutex );
}
return NULL ;
}
/ * Manufacturer stream function * /
void * producer ( void * args )
{
puts ( "[PRODUCER] thread started" );
while ( 1 )
{
usleep ( 200000 );
pthread_mutex_lock ( & mutex );
/ * The manufacturer constantly increases the value of the common variable * /
++ storage ;
printf ( "[PRODUCER] storage =% d \ n " , storage );
/ * If the value of the common variable has reached or exceeded
* maximum, consumer flow is notified about this * /
if ( storage > = STORAGE_MAX )
{
puts ( "[PRODUCER] storage maximum" );
pthread_cond_signal ( & condition );
}
pthread_mutex_unlock ( & mutex );
}
return NULL ;
}
int main ( int argc , char * argv [])
{
int res = 0 ;
pthread_t thProducer , thConsumer ;
pthread_mutex_init ( & mutex , NULL );
pthread_cond_init ( & condition , NULL );
res = pthread_create ( & thProducer , NULL , producer , NULL );
if ( res ! = 0 )
{
perror ( "pthread_create" );
exit ( EXIT_FAILURE );
}
res = pthread_create ( & thConsumer , NULL , consumer , NULL );
if ( res ! = 0 )
{
perror ( "pthread_create" );
exit ( EXIT_FAILURE );
}
pthread_join ( thProducer , NULL );
pthread_join ( thConsumer , NULL );
return EXIT_SUCCESS ;
}
C ++
The C ++ 11 standard has added multithreading support to the language. Working with conditional variables is provided by means declared in the condition_variable header file.
#include <cstdlib>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
int storage = STORAGE_MIN ;
std :: mutex globalMutex ;
std :: condition_variable condition ;
/ * Consumer stream function * /
void consumer ()
{
std :: cout << "[CONSUMER] thread started" << std :: endl ;
int toConsume = 0 ;
while ( true )
{
std :: unique_lock < std :: mutex > lock ( globalMutex );
/ * If the value of the shared variable is less than the maximum,
* then the stream enters the idle state of reaching the signal
* maximum * /
if ( storage < STORAGE_MAX )
{
condition . wait ( lock ); // Atomically _ releases the mutex_ and immediately blocks the thread
toConsume = storage - STORAGE_MIN ;
std :: cout << "[CONSUMER] storage is maximum, consuming"
<< toConsume << std :: endl ;
}
/ * "Consumption" of permissible volume from the value of the total
* variable * /
storage - = toConsume ;
std :: cout << "[CONSUMER] storage =" << storage << std :: endl ;
}
}
/ * Manufacturer stream function * /
void producer ()
{
std :: cout << "[PRODUCER] thread started" << std :: endl ;
while ( true )
{
std :: this_thread :: sleep_for ( std :: chrono :: milliseconds ( 200 ));
std :: unique_lock < std :: mutex > lock ( globalMutex );
++ storage ;
std :: cout << "[PRODUCER] storage =" << storage << std :: endl ;
/ * If the value of the common variable has reached or exceeded
* maximum, consumer flow is notified about this * /
if ( storage > = STORAGE_MAX )
{
std :: cout << "[PRODUCER] storage maximum" << std :: endl ;
condition . notify_one ();
}
}
}
int main ( int argc , char * argv [])
{
std :: thread thProducer ( producer );
std :: thread thConsumer ( consumer );
thProducer . join ();
thConsumer . join ();
return 0 ;
}
Qt 4
cw.h
#ifndef CW_H
#define CW_H
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QDebug>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
extern int storage ;
extern QMutex qmt ;
extern QWaitCondition condition ;
class Producer : public QThread
{
Q_OBJECT
private :
void run ()
{
qDebug () << "[PRODUCER] thread started" ;
while ( 1 )
{
QThread :: msleep ( 200 );
qmt . lock ();
++ storage ;
qDebug () << "[PRODUCER] storage =" << storage ;
/ * If the value of the common variable has reached or exceeded
* maximum, consumer flow is notified about this * /
if ( storage > = STORAGE_MAX )
{
qDebug () << "[PRODUCER] storage maximum" ;
condition . wakeOne ();
}
qmt . unlock ();
}
}
};
class Consumer : public QThread
{
Q_OBJECT
private :
void run ()
{
qDebug () << "[CONSUMER] thread started" ;
int toConsume = 0 ;
while ( 1 )
{
qmt . lock ();
/ * If the value of the shared variable is less than the maximum,
* then the stream enters the idle state of reaching the signal
* maximum * /
if ( storage < STORAGE_MAX )
{
condition . wait ( & qmt );
toConsume = storage - STORAGE_MIN ;
qDebug () << "[CONSUMER] storage is maximum, consuming"
<< toConsume ;
}
/ * "Consumption" of permissible volume from the value of the total
* variable * /
storage - = toConsume ;
qDebug () << "[CONSUMER] storage =" << storage ;
qmt . unlock ();
}
}
};
#endif / * CW_H * /
main.cpp
#include <QCoreApplication>
#include "cw.h"
int storage = STORAGE_MIN ;
QMutex qmt ;
QWaitCondition condition ;
int main ( int argc , char * argv [])
{
QCoreApplication app ( argc , argv );
Producer prod ;
Consumer cons ;
prod . start ();
cons . start ();
return app . exec ();
}
Python
In Python, conditional variables are implemented as instances of the Condition class of the threading module. In the following example, the same conditional variable is used in producer and consumer flows using the context manager syntax [1]
# Consumer flow
with cond_var : # in the context of cond_var condition
while not an_item_is_available (): # while the item is unavailable
cond_var . wait () # wait
get_an_item () # get element
# Stream-producer
with cond_var : # in the context of cond_var condition
make_an_item_available () # produce an element
cond_var . notify () # notify consumers
Ada '95
There is no need for conditional variables in the language of Hell . To organize monitors with task locking, it is possible to use protected data types.
with Ada.Text_IO ;
procedure Main is
task producer - manufacturer task declaration
task consumer - consumer task announcement
type Storage_T is range 10 .. 20 ; - type range for shared resource
- monitor (protected object) shared by the manufacturer and the consumer
protected type Storage is
entry Put ; - operation "produce" a resource unit
entry Get ; - operation "consume" the allowable amount of resource
entry Value ( val : out Storage_T ); - variable value accessor
private
- a hidden variable with a minimum initial value from a range of type
StorageData : Storage_T : = Storage_T ' First ;
end storage ;
- implementation of the Storage monitor
protected body storage is
entry Put when StorageData < Storage_T ' Last is
begin
StorageData : = StorageData + 1 ;
if StorageData > = Storage_T ' Last then
Ada . Text_IO . Put_Line ( "[PRODUCER] storage maximum" );
end if ;
end ;
entry Get when StorageData > = Storage_T ' Last is
To_Consume : Storage_T ;
begin
To_Consume : = StorageData - Storage_T ' First ;
StorageData : = StorageData - To_Consume ;
Ada . Text_IO . Put_Line ( "[CONSUMER] consuming" );
end get ;
entry Value ( val : out Storage_T ) when true is
begin
val : = StorageData ;
end ;
end storage ;
- instance of the Storage monitor
Storage1 : Storage ;
- implementation of the task of the manufacturer
task body Producer is
v : Storage_T ;
begin
Ada . Text_IO . Put_Line ( "[PRODUCER] Task started" );
loop
delay 0.2 ;
Storage1 . Put ;
Storage1 . Value ( v );
Ada . Text_IO . Put ( "[PRODUCER]" );
Ada . Text_IO . Put_Line ( v ' Img );
end loop ;
end Producer ;
- implementation of the consumer's task
task body Consumer is
begin
Ada . Text_IO . Put_Line ( "[CONSUMER] Task started" );
loop
Storage1 . Get
end loop ;
end consumer
begin
null
end Main ;