Back to posts.

Consumer + Producer model with libuv

Simple example of using libuv for threading and using using condition variables. The code below shows a simple example of a producer / consumer model. This model is great if you want to perform some time consuming operation in a separate thread. The worker thread (threaded_func) will wait until the condition variable is signalled. Then it will quickly copy the Work* and start working. The threaded_func will not waste cpu by polling for new data. Note that you need to be aware of spurious wake-ups that are caused by singals!

The main thread signals the condition variable so the worker knows when it needs to start working. The process goes a bit like:

Producer

// adding new work
Work* new_work = new Work();
uv_mutex_lock(&mutex);
{
  work.push(new_work);
}
uv_cond_signal(&cv);
uv_mutex_unlock();

Consumer

while(true) {
  // copying work that has to be done
  uv_mutex_lock(&mutex);
  while(work.size() == 0) {
    uv_cond_wait(&cv, &mutex);
  }
  // do something
  uv_mutex_unlock(&mutex);
}

Here you see some code that shows a bit more, but still very basic, example on how to use libuv threading with condition variables.

Simulation.h

extern "C" {
#  include <uv.h>
}
 
#include <glfw_wrapper/SimulationBase.h>
#include <roxlu/Roxlu.h>
 
// This represents some work that the threaded function needs to process
struct Work {
  std::string name;
};
 
void threaded_func(void *arg);
 
class Simulation : public SimulationBase {
 public:
  Simulation();
  ~Simulation();
  void setup();
  void onKeyDown(int key, int scancode, int mods);
 
 public:
  uv_cond_t cv;
  uv_mutex_t mutex;
  uv_thread_t work_thread;
  std::vector<Work*> work_todo;
};

Simulation.cpp

#include <Simulation.h>
 
void threaded_func(void* arg) {
  Simulation* sim = static_cast<Simulation*>(arg);
  std::vector<Work*> consumer_work;
  while(!sim->must_stop) {
 
    // 1. Copy the work to our local queue
    // -----------------------------------
    uv_mutex_lock(&sim->mutex);
    {
       while(sim->work_todo.size() == 0) { 
          uv_cond_wait(&sim->cv, &sim->mutex);
       }
 
      printf("Got some work.\n");
      std::copy(sim->work_todo.begin(), sim->work_todo.end(), std::back_inserter(consumer_work));
      sim->work_todo.clear();
    }
    uv_mutex_unlock(&sim->mutex);
 
    // 2. Once copied, do something with it
    // -----------------------------------
    for(std::vector<Work*>::iterator it = consumer_work.begin(); it != consumer_work.end(); ++it) {
      Work* w = *it;
 
      printf("Working ...\n");
      rx_sleep_millis(200); 
 
      delete w;
      w = NULL;
    }
 
    consumer_work.clear();
  }
}
 
 
// --------------------------------------------------------------------
 
Simulation::Simulation()
  :SimulationBase()
  ,must_stop(false)
{
  uv_mutex_init(&mutex);
  uv_cond_init(&cv);
}
 
Simulation::~Simulation() {
  must_stop = true;
  uv_thread_join(&work_thread);
  uv_mutex_destroy(&mutex);
  uv_cond_destroy(&cv);
}
 
 
void Simulation::setup() {
  uv_thread_create(&work_thread, threaded_func, this);
}
 
void Simulation::onKeyDown(int key, int scancode, int mods) {
  if(key == GLFW_KEY_ESCAPE) {
    closeWindow();
  }
  else if(key == GLFW_KEY_SPACE) {
    Work* w = new Work();
 
    uv_mutex_lock(&mutex);
    {
      work_todo.push_back(w);
    }
    uv_cond_signal(&cv);
    uv_mutex_unlock(&mutex);
  }
}