Intel® Moderncode for Parallel Architectures
Support for developing parallel programming applications on Intel® Architecture.

an eventcount algorithm...

Chris_M__Thomasson
New Contributor I
473 Views
I am presenting an eventcount algorithm I invented. I know that Dmitriy Vyukov has already submitted one to the TBB, but his algorithm is based on TLS/TSD. I just wanted to show an alternative technique that does not require TLS/TSD. Here is code which is based on Dmitriy excellent Relacy Race Detector package:



[cpp]#define RL_GC
#include 
#include 



class eventcount {
public:
  typedef unsigned long key_type;


private:
  mutable rl::atomic m_count;
  rl::mutex m_mutex;
  rl::condition_variable m_cond;


  void prv_signal(key_type key) {
    if (key & 1) {
      m_mutex.lock($);
      while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1, 
        rl::memory_order_seq_cst));
      m_mutex.unlock($);
      m_cond.notify_all($);
    }
  }


public:
  eventcount() {
    m_count($).store(0, rl::memory_order_relaxed);
  }


public:
  key_type get() const {
    return m_count($).fetch_or(1, rl::memory_order_acquire);
  }


  void signal() {
    prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst));
  }


  void signal_relaxed() {
    prv_signal(m_count($).load(rl::memory_order_relaxed));
  }


  void wait(key_type cmp) {
    m_mutex.lock($);
    if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) {
      m_cond.wait(m_mutex, $);
    }
    m_mutex.unlock($);
  }
};




template
class mpmcq {
  struct node {
    rl::atomic m_next;
    T volatile m_state;
  };


  rl::atomic m_head;
  rl::atomic m_tail;


public:
  mpmcq() {
    node* n = RL_NEW(node);
    n->m_next($).store(NULL, rl::memory_order_relaxed);
    m_head($).store(n, rl::memory_order_relaxed);
    m_tail($).store(n, rl::memory_order_relaxed);
  }


  ~mpmcq() {
    RL_ASSERT(m_head($).load(rl::memory_order_relaxed) ==
              m_tail($).load(rl::memory_order_relaxed));
  }


public:
  void push(T& state) {
    node* n = RL_NEW(node);
    n->m_next($).store(NULL, rl::memory_order_relaxed);
    n->m_state = state;
    node* p = m_head($).exchange(n, rl::memory_order_seq_cst);
    p->m_next($).store(n, rl::memory_order_seq_cst);
  }


  bool pop(T& state) {
    node* n;
    node* t = m_tail($).load(rl::memory_order_seq_cst);
    do {
      n = t->m_next($).load(rl::memory_order_seq_cst);
      if (! n) return false;
      state = n->m_state;
    } while (! m_tail($).compare_exchange_weak(t, n, rl::memory_order_seq_cst));
    return true;
  }
};





#define PRODUCERS 2
#define CONSUMERS 2
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 6


struct mpmcq_test : rl::test_suite {
  eventcount m_ecount;
  mpmcq m_queue;
  std::atomic m_count;


  void before() {
    m_count($).store(PRODUCERS * ITERS, rl::memory_order_relaxed);
  }


  void invariant() {
    RL_ASSERT(m_count($).load(rl::memory_order_relaxed) > -1);
  }


  void after() {
    RL_ASSERT(! m_count($).load(rl::memory_order_relaxed));
  }


  void thread(unsigned tidx) {
    int i;
    if (tidx < PRODUCERS) {
      for (i = 0; i < ITERS; ++i) {
        m_queue.push(i);
        m_ecount.signal();
      }
    } else {
      do {
        while (! m_queue.pop(i)) {
          eventcount::key_type key = m_ecount.get();
          if (m_queue.pop(i)) break;
          m_ecount.wait(key);
        }
      } while (i != -666 &&
               m_count($).fetch_add(-1, rl::memory_order_relaxed) != 1);
      if (i != -666) {
        for (i = 1; i < CONSUMERS; ++i) {
          int x = -666;
          m_queue.push(x);
        }
        m_ecount.signal();
      }
    }
  }
};




int main() {
  rl::test_params params;
  params.iteration_count = 99999999;
  //params.search_type = rl::fair_full_search_scheduler_type;
  //params.search_type = rl::fair_context_bound_scheduler_type;
  rl::simulate(params);
  std::puts("nnn____________________________________n"
            "DONE!!!!npress  to exit...");
  std::fflush(stdin);
  std::fflush(stdout);
  std::getchar();
  return 0;
}[/cpp]




Here is an implementation of the algorithm in C#:

http://cpt.pastebin.com/f72cc3cc1

You can obtain Relacy here:

http://groups.google.com/group/relacy


The code shows how to use an eventcount which is basically like a condition variable for non-blocking, or even blocking, algorithms. Its an extremely usefull algorithm wrt adding conditional waiting ability to basically any non-blocking algorithm. If you have any questions on how to use it, please feel free to ask!

:^)
0 Kudos
0 Replies
Reply