Modern C++ Concurrency - Synchronizing threads - Condition Variables

Introduction

In the previous lesson we have seen how data can be protected using mutex. We now know how to make threads do their work concurrently without messing around with shared resources and data. But sometimes we need to synchronize their actions in time, meaning that we might want a thread t1 to wait until a certain condition is true before allowing it to continue its execution.

This lesson discusses the tools that we can use to achieve such behavior efficiently using condition variables.

Why do we need condition variables?

The solution the problem stated above, i.e. waiting for a condition to be true could be easily solvable using a mutex protecting a flag variable which indicates whether the task is completed or not. That is it, nothing else is needed to make it work.
A thread t1 could loop, constantly and continuously checking until the flag holds the value false. When some other thread sets it to true (in a critical section as we learned because it is shared) then t1 is off the hook and can continue its execution.

The problems with this approach are:

  1. is it highly inefficient because t1 is keeping the CPU busy,
  2. the flag is shared and protected by a mutex, meaning that only one thread at the time can operate on it.
  3. since t1 is using running, the other threads will have less available resources for doing their work and eventually setting the shared variable.

The standard library provides a basic and useful mechanism for waiting for a certain event to occur: the condition variable.
Each condition variable associated with a certain event, and a thread can ask such variable to wait for the event to happen. During this wait the thread is asleep i.e. not using any CPU, and whenever the event actually happen the condition variable is able to wake the threads up that are waiting for it!

The standard library provides a basic and useful mechanism for waiting for a certain event to occur: the condition variable.
Each condition variable is associated with a certain event, and a thread can ask such variable to wait for the event to happen. During this wait the thread is asleep hence, not using any CPU, and whenever the event actually happens, the condition variable is able to wake the threads up that are waiting for it!

Condition variables

A condition variable can be used to block (without using any resources) one or more threads until another threads modifies the state of the shared variable (protected by the lock) and notifies one or all the blocked threads threads about the change in state.
Condition variables work in tandem with mutex, meaning that you need to use both to achieve the desired behavior. Each thread that intend to operate on the shared data has to follow the following pattern:

  1. acquire a std::unique_lock<std::mutex>, on the same mutex as used to protect the shared variable
  2. execute wait, wait_for, or wait_until on the condition variable. The wait operations atomically release the mutex and suspend the execution of the thread.
  3. When the condition variable is notified, a timeout expires, or a spurious wakeup occurs, the thread is awakened, and the mutex is atomically reacquired. The thread should then check the condition and resume waiting if the wake up was spurious (more on that later).

In order to showcase the its usage let's consider the following scenario:
Imagine you have a queue of fixed size S and two sets of threads:

  1. Producers, generating data that will put in the queue only if the latter is not full yet
  2. Consumer, popping elements from the queue, if of course any is present.

As you can see, the producers might need to stop their work and wait for the queue to have some free space, and the consumer, on the other hand, might have to wait for the queue to stop being empty. Whenever a producer pushes into the queue, it can notify any of the waiting consumers that the queue is not empty anymore. Similarly, whenever a consumer successfully pops an element out of the queue it can notify any of the producers that the queue is not full anymore.

We will write a thread safe wrapper for std::queue allowing the behavior described above. The following safe_queue class contains two methods:

  1. enqueue()
  2. dequeue()

and four private fields:

  1. max_size is the maximum size of the queue.
  2. a mutable mutex. I should remind you that mutable allows for the modification by the const qualified member functions, of the mutable fields. Why do we need that? Well, we still need to acquire the lock when we read the size of the queue for instance, as other threads might as well as be writing to it. Even if we are not really modifying the queue, we still need to modify the lock.
  3. condition_variable
  4. std::queue

The following is the entire code for the thread safe queue.

template <typename T>
struct safe_queue {
  safe_queue(const size_t _max_size)
      : max_size(_max_size), mtx(), condition(), queue() {}

  size_t empty() const {
    std::unique_lock<std::mutex> l(mtx);
    return queue.empty();
  }

  size_t size() const {
    std::unique_lock<std::mutex> l(mtx);
    return queue.size();
  }
  void enqueue(T t) {
    std::unique_lock<std::mutex> l(mtx);

    while (queue.size() >= max_size) {
      condition.wait(l);
    }

    queue.push(t);
    l.unlock();
    condition.notify_all();
  }

  T dequeue() {
    std::unique_lock<std::mutex> l(mtx);

    while (queue.empty() ) {
      condition.wait(l);
    }

    T val = queue.front();
    queue.pop();

    l.unlock();
    condition.notify_all();
    return val;
  }

 private:
  uint max_size;
  mutable std::mutex mtx;
  std::condition_variable condition;
  std::queue<T> queue;
};

Code Analysis

The first thing that we should notice is that now we are using std::unique_lock<std::mutex> l(mtx); as locking mechanism. unique_lock (as well as std::lock_guard ) locks the mutex as soon as it is constructed and unlocks it as soon as it is destructed. It manipulates the mutex in a Resource acquisition is initialization (RAII) style.

enqueue()

The eneueue() method is used by the producers and it is really a simple one: a producer first acquires the lock (we need to make sure we do not modify the queue while other threads are reading or writing to it) and then is waits until the the queue has room for another element. The key part to remember here is that when the threads is put to sleep by the condition variable, the lock is released so other threads can operate on the queue (dequeuing for instance). The other important fact is that whenever the thread is awakened is it ensured that it own the lock.

wait() can also wakes up the thread spuriously, meaning that a thread can be woke up when the condition is not satisfied, that is, a thread might be awoken from its waiting state even though no thread signaled the condition variable.
That is why the wait is embedded in a while loop as shown in the following:

while (queue.size() >= max_size) {
      condition.wait(l);
    }

Whenever the thread is awakened, its own the lock (so we are sure no one is touching our queue) and we can safely check if the queue has room for a new element, and if not, it means we have been awakened by a spurious notification and we go to sleep again.

Finally when the queue has room for a new element we can proceed to the next instructions (we are still owning the lock at this moment) i.e. pushing one element in the queue, releasing the lock (we completed our critical section at this point) and notify all other threads that might be sleeping on the condition variable.

queue.push(t);
l.unlock();
condition.notify_all();
return;

dequeue()

The dequeue() function works similarly, and applies the same patter we saw in enqueue() i.e. acquire the lock first, check the if we have the right preconditions to be able to pop one element out of the queue and if not go to sleep using the condition variable.
Once out of the condition variable loop, we own the lock and we can keep operating on the queue thus finally popping out one element, and notifying all the other threads that things have changes in the queue.

There is another way of waiting, yet as idiomatic as the one we have seen so far, for a condition to be true. condition_variable offers the wait function that has the following signature:

template< class Predicate >
void wait( std::unique_lock<std::mutex>&amp; lock, Predicate pred );

A call to wait(pred) using the predicate (any callable object returning true or false, usually a lambda function) pred is equivalent to (note the negation):

while (!pred()) {
    wait(lock);
}

This mean that we can rewrite our queue and dequeue function as follows:

void enqueue(T t) {
    std::unique_lock<std::mutex> l(mtx);

    condition.wait(l , 
      [this](){
        return queue.size() < max_size;
      });
  
    queue.push(t);
    l.unlock();
    condition.notify_all();
  }

  T dequeue() {
    std::unique_lock<std::mutex> l(mtx);
    
    condition.wait(l , 
      [this](){
        return !queue.empty;
      });
    
    T val = queue.front();
    queue.pop();

    l.unlock();
    condition.notify_all();

    return val;
  }

Note that pred is the negation of the condition that we had in the previous version (with the explicit while loop). Also note that using void wait( std::unique_lock<std::mutex>& lock, Predicate pred ); we are relieved from the burden of checking for spurious wakes. Whenever we return from this call, we are sure that pred is true.

Leave a Reply

Your email address will not be published. Required fields are marked *