Introduction
In the previous mutex
t1
This lesson discusses the tools that we can use to achieve such behavior efficiently condition variables
Why do we need condition variables
?
The solution the problem stated above, i.e. waiting for a condition to be true could be easily solvable using a mutex
protecting a flag variable which indicates whether the task is completed or not. That is it, nothing else is needed to make it work.
A t1
false
true
(in a critical section as we learned because it is shared) t1
The problems with this approach are:
- is it highly inefficient because
t1
is keeping the CPU busy, - the flag is shared and protected by a
mutex
, meaning that only one thread at the time can operate on it. - since
t1
is using running, the other threads will have less available resources for doing their work and eventually setting the shared variable.
The standard library provides a basic and useful mechanism for waiting for a certain event to occur: the condition variable
.
Each condition variable
associated with a certain event, and a thread can ask such variable to wait for the event to happen. During this wait the thread is asleep i.e. not using any CPU, and whenever the event actually happen the condition variable
is able to wake the threads up that are waiting for it!
The standard library provides a basic and useful mechanism for waiting for a certain event to occur: condition variable
condition variable
condition variable
Condition variables
A condition variable can be used to block (without using any resources) one or more threads until another threads modifies the state of the shared variable (protected by the lock) and notifies one or all the blocked threads threads about the change in state.
Condition variables work in tandem with mutex
, meaning that you need to use both to achieve the desired behavior. Each thread that intend to operate on the shared data has to follow the following pattern:
- acquire
a std::unique_lock<std::mutex>
, on the same mutex as used to protect the shared variable execute wait
, wait_for
, or wait_until
on the condition variable. The wait operations atomically release the mutex and suspend the execution of the thread.- When the condition variable is notified, a timeout expires, or a spurious wakeup occurs, the thread is awakened, and the mutex is atomically reacquired. The thread should then check the condition and resume waiting if the wake up was spurious (more on that later).
In order to showcase the its usage let's consider the following scenario:
Imagine you have a queue of fixed size S
and two sets of threads:
- Producers, generating data that will put in the queue only if the latter is not full yet
- Consumer, popping elements from the queue, if of course any is present.
As you can see, the producers might need to stop their work and wait for the queue to have some free space, and the consumer, on the other hand, might have to wait for the queue to stop being empty. Whenever a producer pushes into the queue, it can notify any of the waiting consumers that the queue is not empty anymore. Similarly, whenever a consumer successfully pops an element out of the queue it can notify any of the producers that the queue is not full anymore.
We will write a std::queue
safe_queue
class contains two methods:
enqueue()
dequeue()
and four private fields:
max_size
is the maximum size of the queue.a mutable mutex
. I should remind you that mutableallows for the modification by theconst qualified member functions, ofthe mutable fields . Why do we need that? Well, we still need to acquire the lock when we readthe size
of the queue for instance, as other threads might as well as be writing to it. Even if we are not really modifying the queue, we still need to modify the lock.condition_variable
std::queue
The following is the entire code for the thread safe queue.
template <typename T> struct safe_queue { safe_queue(const size_t _max_size) : max_size(_max_size), mtx(), condition(), queue() {} size_t empty() const { std::unique_lock<std::mutex> l(mtx); return queue.empty(); } size_t size() const { std::unique_lock<std::mutex> l(mtx); return queue.size(); } void enqueue(T t) { std::unique_lock<std::mutex> l(mtx); while (queue.size() >= max_size) { condition.wait(l); } queue.push(t); l.unlock(); condition.notify_all(); } T dequeue() { std::unique_lock<std::mutex> l(mtx); while (queue.empty() ) { condition.wait(l); } T val = queue.front(); queue.pop(); l.unlock(); condition.notify_all(); return val; } private: uint max_size; mutable std::mutex mtx; std::condition_variable condition; std::queue<T> queue; };
Code Analysis
The first thing that we should notice is that now we are std::unique_lock<std::mutex> l(mtx);
unique_lock
(as well as std::lock_guard
) locks the mutex as soon as it is constructed and unlocks it as soon as it is destructed. It manipulates the mutex in a Resource acquisition is initialization (RAII) style.
enqueue()
eneueue()
lock
(we need to make sure we do not modify the queue while other threads are reading or writing to it) and then lock
wait()
can also
That is why the wait
is embedded in a while loop as shown in the following:
while (queue.size() >= max_size) { condition.wait(l); }
Whenever the thread is awakened, its
Finally when the queue has room for a new element we can proceed to the next instructions (we are still owning the lock at this moment) i.e. pushing one element in the queue, releasing the lock (we completed our critical section at this point) and notify all other threads that might be sleeping on the condition
variable.
queue.push(t); l.unlock(); condition.notify_all(); return;
dequeue()
The dequeue()
function works similarly, and applies the same patter we saw in enqueue()
i.e. acquire the lock first, check the if we have the right preconditions to be able to pop one element out of the queue and if not go to sleep using the condition variable.
Once out of the condition variable loop, we own the lock and we can keep operating on the queue thus finally popping out one element, and notifying all the other threads that things have changes in the queue.
There is another way of waiting, yet as idiomatic as the one we have seen so far, for a condition to be true. condition_variable
offers the wait
function that has the following signature:
template< class Predicate > void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
A call to wait(pred)
using the predicate (any callable object returning true or false, usually a lambda function) pred
is equivalent to (note the negation):
while (!pred()) { wait(lock); }
This mean that we can rewrite our queue
and dequeue
function as follows:
void enqueue(T t) { std::unique_lock<std::mutex> l(mtx); condition.wait(l , [this](){ return queue.size() < max_size; }); queue.push(t); l.unlock(); condition.notify_all(); } T dequeue() { std::unique_lock<std::mutex> l(mtx); condition.wait(l , [this](){ return !queue.empty; }); T val = queue.front(); queue.pop(); l.unlock(); condition.notify_all(); return val; }
Note pred
while
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
pred