- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I've implemented a sketch of eventcount for TBB. It can be used as replacement for Gate in current TBB scheduler, and/or it can be used as blocking/signalling logic in concurrent_queue, and/or it can be exposed as public API, and/or on top of it portable condition variable can be implemented and exposed as public API. I want to know what do you think about implementation and these usages, and whether it's worth doing for me to finish the implementation and submit it as official contribution.
The eventcount is portable, fine-grained, fairly efficient, general-purpose and reusable.
Portable in a sense that it requires only semaphore, mutex and full memory fence primitives. No Win32 events and futexes.
Fine-grained in a sense that it supports notifications of interesting set of threads, not only coarse-grained notify_one() and notify_all().
Fairly efficient in a sense that producer overhead is single load, single test, single conditional jump, and possibly single full memory fence (controlled by user depending on an algorithm) on fast-path, consumer overhead is no-op on fast-path.
General-purpose in a sense that it includes no management of user state, this simplifies reasoning and implementation and allows reusability.
Reusable in a sense that it can be used basically with any predicate (user algorithm).
Implementation takes into account Arch's requirements:
http://software.intel.com/en-us/forums/showpost.php?p=71895
I've attached the implementation and will post it along with several examples in following posts.
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
First of all several helper components: semaphore, mutex, full memory fence and doubly-linked list (just to make implementation self-contained):
[cpp]/** Fine-grained Eventcount * Copyright (C) 2008 Dmitriy S. V'jukov */ #include#if defined(WIN32) && defined(_MSC_VER) #include #include class semaphore { public: semaphore() { h_ = CreateSemaphore(0, 0, LONG_MAX, 0); } ~semaphore() { CloseHandle(h_); } void wait() { WaitForSingleObject(h_, INFINITE); } void post() { ReleaseSemaphore(h_, 1, 0); } private: HANDLE h_; semaphore(semaphore const&); semaphore& operator = (semaphore const&); }; class mutex { public: mutex() { InitializeCriticalSection(&cs_); } ~mutex() { DeleteCriticalSection(&cs_); } void lock() { EnterCriticalSection(&cs_); } void unlock() { LeaveCriticalSection(&cs_); } private: CRITICAL_SECTION cs_; mutex(mutex const&); mutex& operator = (mutex const&); }; void full_memory_fence() { _mm_mfence(); } #define THREAD_LOCAL __declspec(thread) #elif defined(POSIX) && defined(GCC) #include #include class semaphore { public: semaphore() { sem_init(&sem_, 0, 0); } ~semaphore() { sem_destroy(&sem_); } void wait() { sem_wait(&sem_); } void post() { sem_post(&sem_); } private: sem_t sem_; semaphore(semaphore const&); semaphore& operator = (semaphore const&); }; class mutex { public: mutex() { pthread_mutex_init(&mutex_, 0); } ~mutex() { pthread_mutex_destroy(&mutex_); } void lock() { pthread_mutex_lock(&mutex_); } void unlock() { pthread_mutex_unlock(&mutex_); } private: pthread_mutex_t mutex_; mutex(mutex const&); mutex& operator = (mutex const&); }; void full_memory_fence() { __sync_synchronize(); } #define THREAD_LOCAL __thread #endif class lock { public: lock(mutex& m) : m_(m) { m.lock(); } ~lock() { m_.unlock(); } private: mutex& m_; lock(lock const&); lock& operator = (lock const&); }; /** simple single-threaded double-linked list * nothing interesting */ class dlist { public: struct node { node* prev_; node* next_; node() { prev_ = 0; next_ = 0; } }; dlist() { reset(); } void push(node* n) { size_ += 1; n->next_ = head_.next_; n->prev_ = &head_; head_.next_->prev_ = n; head_.next_ = n; } node* pop() { if (size_ == 0) return 0; node* n = head_.next_; remove(n); return n; } void remove(node* n) { size_ -= 1; n->prev_->next_ = n->next_; n->next_->prev_ = n->prev_; } size_t size() const { return size_; } node* begin() { return head_.next_; } void flush_to(dlist& target) { if (size_) { target.size_ = size_; target.head_.next_ = head_.next_; target.head_.next_->prev_ = &target.head_; target.tail_.prev_ = tail_.prev_; target.tail_.prev_->next_ = &target.tail_; } else { target.reset(); } reset(); } static bool not_last(node* n) { return n->next_ != 0; } static node* get_next(node* n) { return n->next_; } private: size_t volatile size_; node head_; node tail_; void reset() { size_ = 0; head_.next_ = &tail_; head_.prev_ = 0; tail_.next_ = 0; tail_.prev_ = &head_; } dlist(dlist const&); dlist& operator = (dlist const&); }; [/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Implementation of eventcount:
[cpp]/** Fine-grained Eventcount * Copyright (C) 2008 Dmitriy S. V'jukov */ /** pre-thread descriptor for eventcount */ struct ec_thread { dlist::node node_; semaphore sema_; unsigned epoch_; bool volatile in_waitset_; bool spurious_; void* ctx_; ec_thread() { epoch_ = 0; in_waitset_ = false; spurious_ = false; ctx_ = 0; } ~ec_thread() { if (spurious_) sema_.wait(); } static ec_thread* current() { static THREAD_LOCAL ec_thread* ec_thread_instance = 0; ec_thread* instance = ec_thread_instance; if (instance == 0) { instance = new ec_thread; ec_thread_instance = instance; } return instance; // instance must be destroyed in DllMain() callback // or in pthread_key_create() callback } private: ec_thread(ec_thread const&); ec_thread& operator = (ec_thread const&); }; /** fine-grained eventcount implementation */ class eventcount { public: eventcount() { epoch_ = 0; } void prepare_wait(void* ctx = 0) { ec_thread* th = ec_thread::current(); // this is good place to pump previous spurious wakeup if (th->spurious_) { th->spurious_ = false; th->sema_.wait(); } th->in_waitset_ = true; th->ctx_ = ctx; { lock l (mtx_); th->epoch_ = epoch_; waitset_.push(&th->node_); } full_memory_fence(); } void wait() { ec_thread* th = ec_thread::current(); // this check is just an optimization if (th->epoch_ == epoch_) th->sema_.wait(); else retire_wait(); } void retire_wait() { ec_thread* th = ec_thread::current(); // spurious wakeup will be pumped in following prepare_wait() th->spurious_ = true; // try to remove node from waitset if (th->in_waitset_) { lock l (mtx_); if (th->in_waitset_) { // successfully removed from waitset, // so there will be no spurious wakeup th->in_waitset_ = false; th->spurious_ = false; waitset_.remove(&th->node_); } } } void notify_one() { full_memory_fence(); notify_one_relaxed(); } templatevoid notify(predicate_t pred) { full_memory_fence(); notify_relaxed(pred); } void notify_all() { full_memory_fence(); notify_all_relaxed(); } void notify_one_relaxed() { if (waitset_.size() == 0) return; dlist::node* n; { lock l (mtx_); epoch_ += 1; n = waitset_.pop(); if (n) to_ec_thread(n)->in_waitset_ = false; } if (n) { to_ec_thread(n)->sema_.post(); } } template void notify_relaxed(predicate_t pred) { if (waitset_.size() == 0) return; dlist temp; { lock l (mtx_); epoch_ += 1; size_t size = waitset_.size(); size_t idx = 0; dlist::node* n = waitset_.begin(); while (dlist::not_last(n)) { dlist::node* next = dlist::get_next(n); ec_thread* th = to_ec_thread(n); if (pred(th->ctx_, size, idx)) { waitset_.remove(n); temp.push(n); th->in_waitset_ = false; } n = next; idx += 1; } } dlist::node* n = temp.begin(); while (dlist::not_last(n)) { dlist::node* next = dlist::get_next(n); to_ec_thread(n)->sema_.post(); n = next; } } void notify_all_relaxed() { if (waitset_.size() == 0) return; dlist temp; { lock l (mtx_); epoch_ += 1; waitset_.flush_to(temp); dlist::node* n = temp.begin(); while (dlist::not_last(n)) { to_ec_thread(n)->in_waitset_ = false; n = dlist::get_next(n); } } dlist::node* n = temp.begin(); while (dlist::not_last(n)) { dlist::node* next = dlist::get_next(n); to_ec_thread(n)->sema_.post(); n = next; } } private: mutex mtx_; dlist waitset_; volatile unsigned epoch_; ec_thread* to_ec_thread(dlist::node* n) { return (ec_thread*)((char*)n - offsetof(ec_thread, node_)); } eventcount(eventcount const&); eventcount& operator = (eventcount const&); }; [/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Here is how the eventcount can be used in TBB scheduler:
[cpp]/** Fine-grained Eventcount * Copyright (C) 2008 Dmitriy S. V'jukov */ struct scheduler { struct tbb_thread {}; eventcount ec_; tbb_thread* threads_; bool volatile is_permanently_open_; void wait_while_pool_is_empty(tbb_thread* th) { if (is_permanently_open_) return; ec_.prepare_wait(th); if (pool_is_empty()) ec_.wait(); else ec_.retire_wait(); } void notify_about_new_task_available() { ec_.notify_one_relaxed(); } void notify_about_new_task_available_with_preference(tbb_thread* preference) { struct local { tbb_thread* preference_; bool fired_; bool operator () (void* ctx, size_t count, size_t idx) { tbb_thread* th = (tbb_thread*)ctx; if (th == preference_) { fired_ = true; return true; } else if (idx == count - 1 && fired_ == false) { return true; } else { return false; } } } pred = {preference}; ec_.notify_relaxed(pred); } void notify_about_list_of_tasks_available(size_t total_count, size_t preference_count, tbb_thread** preferences) { struct local { size_t remain_to_signal_; size_t preference_count_; tbb_thread** preferences_; bool operator () (void* ctx, size_t count, size_t idx) { tbb_thread* th = (tbb_thread*)ctx; size_t remain_in_waitset = count - idx; if (remain_in_waitset <= remain_to_signal_) { return true; } else { for (size_t i = 0; i != preference_count_; ++i) { if (preferences_ == th) { remain_to_signal_ -= 1; return true; } } } return false; } } pred = {total_count, preference_count, preferences}; ec_.notify_relaxed(pred); } bool pool_is_empty() { return true; } }; [/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Here is how the eventcount can be used in concurrent_queue to eliminate thundering herd:
[cpp]struct queue { int producer_idx_; int consumer_idx_; void** buffer_; eventcount ec_; void enqueue(void* data) { int idx = ++producer_idx_; // atomic buffer_[idx] = data; struct local { int idx_; bool operator () (void* ctx, size_t count, size_t idx) { return idx_ == *(int*)ctx; } } pred = {idx}; ec_.notify(pred); // not relaxed!!! } void* dequeue() { int idx = ++consumer_idx_; // atomic void* data = buffer_[idx]; if (data) return data; for (;;) { ec_.prepare_wait(&idx); data = buffer_[idx]; if (data) { ec_.retire_wait(); return data; } ec_.wait(); data = buffer_[idx]; if (data) { return data; } } } }; [/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Here is how the eventcount can be used in a general setup to notify about arbitrary state changes (in conjunction with a lock-based, lock-free or even wait-free containers - eventcount itself is wait-free on fast-path):
[cpp]eventcount ec; // non-blocking consume void* consume_impl() { // a plurality of non-blocking containers return high_prio_queue.dequeue() || normal_prio_queue.dequeue() || work_stealing_dequeue.pop() || low_prio_queue.dequeue() || global_root_task_queue.dequeue(); } // blocking consume void* consume() { void* data = 0; if (data = consume_impl()) return data; for (;;) { ec.prepare_wait(); if (data = consume_impl()) { ec.retire_wait(); return data; } ec.wait(); if (data = consume_impl()) return data; } } void produce(void* data) { some_queue_or_deque.enqueue(data); ec.notify_all(); } [/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
And finally here is how one can build portable condition variable with wait-free fast-path for signalers on top of the eventcount:
[cpp]class condition_variable { eventcount ec_; public: void wait(mutex& mtx) { ec_.prepare_wait(); mtx.unlock(); ec_.wait(); mtx.lock(); } void signal() { ec_.notify_one(); } void broadcast() { ec_.notify_all(); } }; [/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The considerable part of credits must go to Chris Thomasson:
http://software.intel.com/en-us/profile/6002
and Joseph Seigh:
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Nice work and well presented! Yes, please contribute it as a TBB submission when you are happy with it.
For clients like the TBB scheduler, we would probably make the ec_thread object a parameter to the wait-related methods instead of using ec_thread::current(), and thus avoid using another TLS slot. For clients like concurrent_queue, your interface seems most sensible.
If the relaxed notification methods are going to be public, they need documentation explaining the fencing rules. The rule is "don't let a notification become visible before the action it is advertising", right?
The waiting logic is a two-phase commit protocol. I suspect two-phase commit will show up in the future, so it would be good to establish a regular naming scheme for two-phase commit now. I like "prepare_wait". I'd prefer the name "cancel_wait" over "retire_wait", because "retire" tends to imply success around here (as in "retiring instructions"). The name for the operation that completes a successful wait should have a longer name than "wait" to make it clear that it is the completion of a two-phase commit. If people have suggestions, I suggest starting them as a separate forum topic on "naming for two-phase commit protocols"
Thanks for taking the time for such a detailed posting.
- Arch
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Nice work and well presented! Yes, please contribute it as a TBB submission when you are happy with it.
Ok. The most notable thing is that I have to run it at least once before contribution :)
Hmmm... What do you mean here? __declspec(thread) doesn't use OS TLS slot. MSVC runtime uses single OS TLS slot for ALL __declspec(thread) variables, they are distinguish by offsets in that TLS slot.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If the relaxed notification methods are going to be public, they need documentation explaining the fencing rules. The rule is "don't let a notification become visible before the action it is advertising", right?
From technical point of view it must be something like "don't let the load of eventcount state become visible before the action it is advertising". Because the point is that there must be store-load fence between modification of state and call to notify_relaxed().
For example, if modification of state is made with atomic RMW with full fence (which must contain trailing store-load fence), then signaling can be made with relaxed version.
Hmm... btw, another situation is when user doesn't need 100% guarantee wrt notification and can tolerate small possibility of a race between producer and consumer. Good example is scheduler like TBB's task scheduler. Or I was using such relaxed notifications in distributed memory allocator, where I was able to tolerate small amount of memory delayed in depths of the allocator.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The waiting logic is a two-phase commit protocol. I suspect two-phase commit will show up in the future, so it would be good to establish a regular naming scheme for two-phase commit now. I like "prepare_wait". I'd prefer the name "cancel_wait" over "retire_wait", because "retire" tends to imply success around here (as in "retiring instructions"). The name for the operation that completes a successful wait should have a longer name than "wait" to make it clear that it is the completion of a two-phase commit. If people have suggestions, I suggest starting them as a separate forum topic on "naming for two-phase commit protocols"
Agree about cancel_wait().
About wait()... maybe... but I don't see better names... commit_wait() is definitely stupid.
And note that it's better used with following wrapper. So names prepare_wait() and cancel_wait() is not visible to the end user. Probably wait related methods of eventcount must be made private and blocking class be declared as friend. Anyway it's currently prohibited to make overlapping prepare_wait-wait/cancel_wait sequences.
[cpp]class eventcount { public: ... class blocking { blocking(eventcount& ec) : ec_(ec) , wait_(false) { ec_.prepare_wait(); } void wait() { assert(false == wait_); wait_ = true; ec_.wait(); } ~blocking() { if (false == wait_) ec_.cancel_wait(); } private: eventcount& ec_; bool wait_; blocking(blocking const&); blocking& operator = (blocking const&); }; ... }; void* consume() { void* data = 0; if (data = q.dequeue()) return data; for (;;) { eventcount::blocking block (ec); if (data = q.dequeue()) return data; block.wait(); if (data = q.dequeue()) return data; } } [/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In my remark about TLS slots, I was thinking in general across OSes. It may not be an issue for Windows. However, __declspec(thread) is broken on pre-Vista OSes. It does not work correctly for explicitly loaded dynamic libraries (see here). We found out the hard way from a customer writing plug-ins with TBB 1.0, and switched to using TlsAlloc.
"commit_wait" is not totally crazy. The thread is committing to wait until woken up. It's like committing to hibernation in science fiction. It's a serious commitment :-)
Another way to wrap the two-phase commit protocol, which is particularly attractive with C++0x lambda expressions, would be to have wait take a functor argument that returns true if wait should commit, and false if it should cancel.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In my remark about TLS slots, I was thinking in general across OSes. It may not be an issue for Windows. However, __declspec(thread) is broken on pre-Vista OSes. It does not work correctly for explicitly loaded dynamic libraries (see here). We found out the hard way from a customer writing plug-ins with TBB 1.0, and switched to using TlsAlloc.
Oh, Ok, I see. So, while user of eventcount provides thread descriptor manually, eventcount must not allocate TLS slots. But once user calls prepare_wait() w/o thread descriptor, eventcount will allocate TLS slot.
There is another problem. On Windows eventcount must hook into DllMain to get thread detach notifications.
The other possible solution is to implement trick desribed here:
http://www.codeguru.com/Cpp/misc/misc/threadsprocesses/article.php/c6945__2/
It allows one to get thread atach/detach notifications w/o DllMain, even from statically linked libraries. However it's quite risky. It was working on my machine with statically/dynamically linked run-time, under debug/release, on several versions of MSVC; but I heard from some people that it doesn't work on their machines... however maybe they was testing on MinGW or CygWin, I don't know.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
"commit_wait" is not totally crazy. The thread is committing to wait until woken up. It's like committing to hibernation in science fiction. It's a serious commitment :-)
Ok, it was sounding stupid from point of view of non-native speaker. I will rename it to commit_wait() in eventcount. However what to do in eventcount::blocking? It doesn't have prepare_wait() and cancel_wait(), only wait()...
Another way to wrap the two-phase commit protocol, which is particularly attractive with C++0x lambda expressions, would be to have wait take a functor argument that returns true if wait should commit, and false if it should cancel.
Yes, it's possible to make an interface like boost/std::condition_variable has, with:
template
However I am not sure whether I want so much sugar in low-level synchronization primitives. Note that it also possible to add similar signature for atomic<>::compare_exchange(), which accepts functor which receives old value and returns new, such compare_exchange() will not return bool or old value.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Ok. It's always possible to add those methods later, if there will be the need.
Ok. I will rename it.
Now I must get some time for validation of the thing...
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Now I must get some time for validation of the thing...
- Arch
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Now I must get some time for validation of the thing...
- Arch
Sorry for the delay. Now I've almost recovered from New Year's holidays :)
Ok, let's do it this way. I've made minor changes (rename methods, add wait_guard class, add explicit ec_thread parameter) and submitted it as official contribution (and also attached copy to this post).
I will still try to validate it, and if I will find some bugs I will resubmit or just report them here.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks!
- Arch
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
[cpp] void commit_wait(ec_thread* th = 0) { if (th == 0) th = ec_thread::current(); // this check is just an optimization if (th->epoch_ == epoch_) th->sema_.wait(); else cancel_wait(th); // <--- add 'th' parameter here, it's missed in original version } [/cpp]
Also some variables must be replaced with atomic
dlist::size_
ec_thread::in_waitset_
eventcount::epoch_
Note that all operations on them must be relaxed, i.e. increment of the epoch must look like:
unsigned ep = epoch_.load(std::memory_order_relaxed);
epoch_.store(ep + 1, std::memory_order_relaxed);
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page