Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Eventcount (gate) proposal

Dmitry_Vyukov
Valued Contributor I
1,882 Views

I've implemented a sketch of eventcount for TBB. It can be used as replacement for Gate in current TBB scheduler, and/or it can be used as blocking/signalling logic in concurrent_queue, and/or it can be exposed as public API, and/or on top of it portable condition variable can be implemented and exposed as public API. I want to know what do you think about implementation and these usages, and whether it's worth doing for me to finish the implementation and submit it as official contribution.

The eventcount is portable, fine-grained, fairly efficient, general-purpose and reusable.
Portable in a sense that it requires only semaphore, mutex and full memory fence primitives. No Win32 events and futexes.
Fine-grained in a sense that it supports notifications of interesting set of threads, not only coarse-grained notify_one() and notify_all().
Fairly efficient in a sense that producer overhead is single load, single test, single conditional jump, and possibly single full memory fence (controlled by user depending on an algorithm) on fast-path, consumer overhead is no-op on fast-path.
General-purpose in a sense that it includes no management of user state, this simplifies reasoning and implementation and allows reusability.
Reusable in a sense that it can be used basically with any predicate (user algorithm).

Implementation takes into account Arch's requirements:
http://software.intel.com/en-us/forums/showpost.php?p=71895

I've attached the implementation and will post it along with several examples in following posts.

0 Kudos
25 Replies
Dmitry_Vyukov
Valued Contributor I
1,658 Views

First of all several helper components: semaphore, mutex, full memory fence and doubly-linked list (just to make implementation self-contained):

[cpp]/** Fine-grained Eventcount
 *  Copyright (C) 2008  Dmitriy S. V'jukov
 */

#include 


#if defined(WIN32) && defined(_MSC_VER)

#include 
#include 

class semaphore
{
public:
    semaphore()
    {
        h_ = CreateSemaphore(0, 0, LONG_MAX, 0);
    }

    ~semaphore()
    {
        CloseHandle(h_);
    }

    void wait()
    {
        WaitForSingleObject(h_, INFINITE);
    }

    void post()
    {
        ReleaseSemaphore(h_, 1, 0);
    }

private:
    HANDLE h_;

    semaphore(semaphore const&);
    semaphore& operator = (semaphore const&);
};

class mutex
{
public:
    mutex()
    {
        InitializeCriticalSection(&cs_);
    }

    ~mutex()
    {
        DeleteCriticalSection(&cs_);
    }

    void lock()
    {
        EnterCriticalSection(&cs_);
    }

    void unlock()
    {
        LeaveCriticalSection(&cs_);
    }

private:
    CRITICAL_SECTION    cs_;

    mutex(mutex const&);
    mutex& operator = (mutex const&);
};

void full_memory_fence()
{
    _mm_mfence();
}

#define THREAD_LOCAL __declspec(thread)

#elif defined(POSIX) && defined(GCC)

#include 
#include 

class semaphore
{
public:
    semaphore()
    {
        sem_init(&sem_, 0, 0);
    }

    ~semaphore()
    {
        sem_destroy(&sem_);
    }

    void wait()
    {
        sem_wait(&sem_);
    }

    void post()
    {
        sem_post(&sem_);
    }

private:
    sem_t               sem_;

    semaphore(semaphore const&);
    semaphore& operator = (semaphore const&);
};

class mutex
{
public:
    mutex()
    {
        pthread_mutex_init(&mutex_, 0);
    }

    ~mutex()
    {
        pthread_mutex_destroy(&mutex_);
    }

    void lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

private:
    pthread_mutex_t     mutex_;

    mutex(mutex const&);
    mutex& operator = (mutex const&);
};

void full_memory_fence()
{
    __sync_synchronize();
}

#define THREAD_LOCAL __thread

#endif



class lock
{
public:
    lock(mutex& m)
        : m_(m)
    {
        m.lock();
    }

    ~lock()
    {
        m_.unlock();
    }

private:
    mutex&              m_;

    lock(lock const&);
    lock& operator = (lock const&);
};




/** simple single-threaded double-linked list
 *  nothing interesting
 */
class dlist
{
public:
    struct node
    {
        node*           prev_;
        node*           next_;

        node()
        {
            prev_ = 0;
            next_ = 0;
        }
    };

    dlist()
    {
        reset();
    }

    void push(node* n)
    {
        size_ += 1;
        n->next_ = head_.next_;
        n->prev_ = &head_;
        head_.next_->prev_ = n;
        head_.next_ = n;
    }

    node* pop()
    {
        if (size_ == 0)
            return 0;
        node* n = head_.next_;
        remove(n);
        return n;
    }

    void remove(node* n)
    {
        size_ -= 1;
        n->prev_->next_ = n->next_;
        n->next_->prev_ = n->prev_;
    }

    size_t size() const
    {
        return size_;
    }

    node* begin()
    {
        return head_.next_;
    }

    void flush_to(dlist& target)
    {
        if (size_)
        {
            target.size_ = size_;
            target.head_.next_ = head_.next_;
            target.head_.next_->prev_ = &target.head_;
            target.tail_.prev_ = tail_.prev_;
            target.tail_.prev_->next_ = &target.tail_;
        }
        else
        {
            target.reset();
        }
        reset();
    }

    static bool not_last(node* n)
    {
        return n->next_ != 0;
    }

    static node* get_next(node* n)
    {
        return n->next_;
    }

private:
    size_t volatile     size_;
    node                head_;
    node                tail_;

    void reset()
    {
        size_ = 0;
        head_.next_ = &tail_;
        head_.prev_ = 0;
        tail_.next_ = 0;
        tail_.prev_ = &head_;
    }

    dlist(dlist const&);
    dlist& operator = (dlist const&);
};
[/cpp]

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

Implementation of eventcount:

[cpp]/** Fine-grained Eventcount
 *  Copyright (C) 2008  Dmitriy S. V'jukov
 */


/** pre-thread descriptor for eventcount
 */
struct ec_thread
{
    dlist::node         node_;
    semaphore           sema_;
    unsigned            epoch_;
    bool volatile       in_waitset_;
    bool                spurious_;
    void*               ctx_;

    ec_thread()
    {
        epoch_ = 0;
        in_waitset_ = false;
        spurious_ = false;
        ctx_ = 0;
    }

    ~ec_thread()
    {
        if (spurious_)
            sema_.wait();
    }

    static ec_thread* current()
    {
        static THREAD_LOCAL ec_thread* ec_thread_instance = 0;
        ec_thread* instance = ec_thread_instance;
        if (instance == 0)
        {
            instance = new ec_thread;
            ec_thread_instance = instance;
        }
        return instance;
        // instance must be destroyed in DllMain() callback
        // or in pthread_key_create() callback
    }

private:
    ec_thread(ec_thread const&);
    ec_thread& operator = (ec_thread const&);
};



/** fine-grained eventcount implementation
 */
class eventcount
{
public:
    eventcount()
    {
        epoch_ = 0;
    }

    void prepare_wait(void* ctx = 0)
    {
        ec_thread* th = ec_thread::current();
        // this is good place to pump previous spurious wakeup
        if (th->spurious_)
        {
            th->spurious_ = false;
            th->sema_.wait();
        }
        th->in_waitset_ = true;
        th->ctx_ = ctx;
        {
            lock l (mtx_);
            th->epoch_ = epoch_;
            waitset_.push(&th->node_);
        }
        full_memory_fence();
    }

    void wait()
    {
        ec_thread* th = ec_thread::current();
        // this check is just an optimization
        if (th->epoch_ == epoch_)
            th->sema_.wait();
        else
            retire_wait();
    }

    void retire_wait()
    {
        ec_thread* th = ec_thread::current();
        // spurious wakeup will be pumped in following prepare_wait()
        th->spurious_  = true;
        // try to remove node from waitset
        if (th->in_waitset_)
        {
            lock l (mtx_);
            if (th->in_waitset_)
            {
                // successfully removed from waitset,
                // so there will be no spurious wakeup
                th->in_waitset_ = false;
                th->spurious_ = false;
                waitset_.remove(&th->node_);
            }
        }
    }

    void notify_one()
    {
        full_memory_fence();
        notify_one_relaxed();
    }

    template
    void notify(predicate_t pred)
    {
        full_memory_fence();
        notify_relaxed(pred);
    }

    void notify_all()
    {
        full_memory_fence();
        notify_all_relaxed();
    }

    void notify_one_relaxed()
    {
        if (waitset_.size() == 0)
            return;
        dlist::node* n;
        {
            lock l (mtx_);
            epoch_ += 1;
            n = waitset_.pop();
            if (n)
                to_ec_thread(n)->in_waitset_ = false;
        }
        if (n)
        {
            to_ec_thread(n)->sema_.post();
        }
    }

    template
    void notify_relaxed(predicate_t pred)
    {
        if (waitset_.size() == 0)
            return;
        dlist temp;
        {
            lock l (mtx_);
            epoch_ += 1;
            size_t size = waitset_.size();
            size_t idx = 0;
            dlist::node* n = waitset_.begin();
            while (dlist::not_last(n))
            {
                dlist::node* next = dlist::get_next(n);
                ec_thread* th = to_ec_thread(n);
                if (pred(th->ctx_, size, idx))
                {
                    waitset_.remove(n);
                    temp.push(n);
                    th->in_waitset_ = false;
                }
                n = next;
                idx += 1;
            }
        }
        dlist::node* n = temp.begin();
        while (dlist::not_last(n))
        {
            dlist::node* next = dlist::get_next(n);
            to_ec_thread(n)->sema_.post();
            n = next;
        }
    }

    void notify_all_relaxed()
    {
        if (waitset_.size() == 0)
            return;
        dlist temp;
        {
            lock l (mtx_);
            epoch_ += 1;
            waitset_.flush_to(temp);
            dlist::node* n = temp.begin();
            while (dlist::not_last(n))
            {
                to_ec_thread(n)->in_waitset_ = false;
                n = dlist::get_next(n);
            }
        }
        dlist::node* n = temp.begin();
        while (dlist::not_last(n))
        {
            dlist::node* next = dlist::get_next(n);
            to_ec_thread(n)->sema_.post();
            n = next;
        }
    }

private:
    mutex               mtx_;
    dlist               waitset_;
    volatile unsigned   epoch_;

    ec_thread* to_ec_thread(dlist::node* n)
    {
        return (ec_thread*)((char*)n - offsetof(ec_thread, node_));
    }

    eventcount(eventcount const&);
    eventcount& operator = (eventcount const&);
};

[/cpp]

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

Here is how the eventcount can be used in TBB scheduler:

[cpp]/** Fine-grained Eventcount
 *  Copyright (C) 2008  Dmitriy S. V'jukov
 */


struct scheduler
{
    struct tbb_thread {};

    eventcount          ec_;
    tbb_thread*         threads_;
    bool volatile       is_permanently_open_;

    void wait_while_pool_is_empty(tbb_thread* th)
    {
        if (is_permanently_open_)
            return;
        ec_.prepare_wait(th);
        if (pool_is_empty())
            ec_.wait();
        else
            ec_.retire_wait();
    }

    void notify_about_new_task_available()
    {
        ec_.notify_one_relaxed();
    }

    void notify_about_new_task_available_with_preference(tbb_thread* preference)
    {
        struct local
        {
            tbb_thread*     preference_;
            bool            fired_;

            bool operator () (void* ctx, size_t count, size_t idx)
            {
                tbb_thread* th = (tbb_thread*)ctx;
                if (th == preference_)
                {
                    fired_ = true;
                    return true;
                }
                else if (idx == count - 1 && fired_ == false)
                {
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }
        pred = {preference};
        ec_.notify_relaxed(pred);
    }

    void notify_about_list_of_tasks_available(size_t total_count, size_t preference_count, tbb_thread** preferences)
    {
        struct local
        {
            size_t          remain_to_signal_;
            size_t          preference_count_;
            tbb_thread**    preferences_;

            bool operator () (void* ctx, size_t count, size_t idx)
            {
                tbb_thread* th = (tbb_thread*)ctx;
                size_t remain_in_waitset = count - idx;
                if (remain_in_waitset <= remain_to_signal_)
                {
                    return true;
                }
                else
                {
                    for (size_t i = 0; i != preference_count_; ++i)
                    {
                        if (preferences_ == th)
                        {
                            remain_to_signal_ -= 1;
                            return true;
                        }
                    }
                }
                return false;
            }
        }
        pred = {total_count, preference_count, preferences};
        ec_.notify_relaxed(pred);
    }

    bool pool_is_empty()
    {
        return true;
    }
};
[/cpp]

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

Here is how the eventcount can be used in concurrent_queue to eliminate thundering herd:

[cpp]struct queue
{
    int                 producer_idx_;
    int                 consumer_idx_;

    void**              buffer_;

    eventcount          ec_;

    void enqueue(void* data)
    {
        int idx = ++producer_idx_; // atomic
        buffer_[idx] = data;

        struct local
        {
            int         idx_;
            bool operator () (void* ctx, size_t count, size_t idx)
            {
                return idx_ == *(int*)ctx;
            }
        }
        pred = {idx};
        ec_.notify(pred); // not relaxed!!!
    }

    void* dequeue()
    {
        int idx = ++consumer_idx_; // atomic
        void* data = buffer_[idx];
        if (data)
            return data;
        for (;;)
        {
            ec_.prepare_wait(&idx);
            data = buffer_[idx];
            if (data)
            {
                ec_.retire_wait();
                return data;
            }
            ec_.wait();
            data = buffer_[idx];
            if (data)
            {
                return data;
            }
        }
    }
};
[/cpp]

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

Here is how the eventcount can be used in a general setup to notify about arbitrary state changes (in conjunction with a lock-based, lock-free or even wait-free containers - eventcount itself is wait-free on fast-path):

[cpp]eventcount ec;

// non-blocking consume
void* consume_impl()
{
    // a plurality of non-blocking containers
    return high_prio_queue.dequeue()
      || normal_prio_queue.dequeue()
      || work_stealing_dequeue.pop()
      || low_prio_queue.dequeue()
      || global_root_task_queue.dequeue();
}

// blocking consume
void* consume()
{
    void* data = 0;
    if (data = consume_impl())
        return data;
    for (;;)
    {
        ec.prepare_wait();
        if (data = consume_impl())
        {
            ec.retire_wait();
            return data;
        }
        ec.wait();
        if (data = consume_impl())
            return data;
    }
}

void produce(void* data)
{
    some_queue_or_deque.enqueue(data);
    ec.notify_all();
}

[/cpp]

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

And finally here is how one can build portable condition variable with wait-free fast-path for signalers on top of the eventcount:

[cpp]class condition_variable
{
    eventcount ec_;

public:
    void wait(mutex& mtx)
    {
        ec_.prepare_wait();
        mtx.unlock();
        ec_.wait();
        mtx.lock();
    }

    void signal()
    {
        ec_.notify_one();
    }

    void broadcast()
    {
        ec_.notify_all();
    }
}; 

[/cpp]

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

The considerable part of credits must go to Chris Thomasson:

http://software.intel.com/en-us/profile/6002

and Joseph Seigh:

http://software.intel.com/en-us/profile/390813

0 Kudos
ARCH_R_Intel
Employee
1,657 Views

Nice work and well presented! Yes, please contribute it as a TBB submission when you are happy with it.

For clients like the TBB scheduler, we would probably make the ec_thread object a parameter to the wait-related methods instead of using ec_thread::current(), and thus avoid using another TLS slot. For clients like concurrent_queue, your interface seems most sensible.

If the relaxed notification methods are going to be public, they need documentation explaining the fencing rules. The rule is "don't let a notification become visible before the action it is advertising", right?

The waiting logic is a two-phase commit protocol. I suspect two-phase commit will show up in the future, so it would be good to establish a regular naming scheme for two-phase commit now. I like "prepare_wait". I'd prefer the name "cancel_wait" over "retire_wait", because "retire" tends to imply success around here (as in "retiring instructions"). The name for the operation that completes a successful wait should have a longer name than "wait" to make it clear that it is the completion of a two-phase commit. If people have suggestions, I suggest starting them as a separate forum topic on "naming for two-phase commit protocols"

Thanks for taking the time for such a detailed posting.

- Arch

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

Nice work and well presented! Yes, please contribute it as a TBB submission when you are happy with it.

Ok. The most notable thing is that I have to run it at least once before contribution :)

For clients like the TBB scheduler, we would probably make the ec_thread object a parameter to the wait-related methods instead of using ec_thread::current(), and thus avoid using another TLS slot. For clients like concurrent_queue, your interface seems most sensible.

Hmmm... What do you mean here? __declspec(thread) doesn't use OS TLS slot. MSVC runtime uses single OS TLS slot for ALL __declspec(thread) variables, they are distinguish by offsets in that TLS slot.

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

If the relaxed notification methods are going to be public, they need documentation explaining the fencing rules. The rule is "don't let a notification become visible before the action it is advertising", right?

From technical point of view it must be something like "don't let the load of eventcount state become visible before the action it is advertising". Because the point is that there must be store-load fence between modification of state and call to notify_relaxed().

For example, if modification of state is made with atomic RMW with full fence (which must contain trailing store-load fence), then signaling can be made with relaxed version.

Hmm... btw, another situation is when user doesn't need 100% guarantee wrt notification and can tolerate small possibility of a race between producer and consumer. Good example is scheduler like TBB's task scheduler. Or I was using such relaxed notifications in distributed memory allocator, where I was able to tolerate small amount of memory delayed in depths of the allocator.

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views

The waiting logic is a two-phase commit protocol. I suspect two-phase commit will show up in the future, so it would be good to establish a regular naming scheme for two-phase commit now. I like "prepare_wait". I'd prefer the name "cancel_wait" over "retire_wait", because "retire" tends to imply success around here (as in "retiring instructions"). The name for the operation that completes a successful wait should have a longer name than "wait" to make it clear that it is the completion of a two-phase commit. If people have suggestions, I suggest starting them as a separate forum topic on "naming for two-phase commit protocols"

Agree about cancel_wait().

About wait()... maybe... but I don't see better names... commit_wait() is definitely stupid.

And note that it's better used with following wrapper. So names prepare_wait() and cancel_wait() is not visible to the end user. Probably wait related methods of eventcount must be made private and blocking class be declared as friend. Anyway it's currently prohibited to make overlapping prepare_wait-wait/cancel_wait sequences.

[cpp]class eventcount
{
public:
    ...
		class blocking
		{
		    blocking(eventcount& ec)
		        : ec_(ec)
		        , wait_(false)
		    {
		        ec_.prepare_wait();
		    }
		
		    void wait()
		    {
		        assert(false == wait_);
		        wait_ = true;
		        ec_.wait();
		    }
		
		    ~blocking()
		    {
		        if (false == wait_)
		            ec_.cancel_wait();
		    }
		
		private:
		    eventcount& ec_;
		    bool wait_;
		
		    blocking(blocking const&);
		    blocking& operator = (blocking const&);
		};
		...
};

void* consume()
{
    void* data = 0;
    if (data = q.dequeue())
        return data;
    for (;;)
    {
        eventcount::blocking block (ec);
        if (data = q.dequeue())
            return data;
        block.wait();
        if (data = q.dequeue())
            return data;
    }
}

[/cpp]

0 Kudos
ARCH_R_Intel
Employee
1,657 Views

In my remark about TLS slots, I was thinking in general across OSes. It may not be an issue for Windows. However, __declspec(thread) is broken on pre-Vista OSes. It does not work correctly for explicitly loaded dynamic libraries (see here). We found out the hard way from a customer writing plug-ins with TBB 1.0, and switched to using TlsAlloc.

"commit_wait" is not totally crazy. The thread is committing to wait until woken up. It's like committing to hibernation in science fiction. It's a serious commitment :-)

Another way to wrap the two-phase commit protocol, which is particularly attractive with C++0x lambda expressions, would be to have wait take a functor argument that returns true if wait should commit, and false if it should cancel.

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,657 Views

In my remark about TLS slots, I was thinking in general across OSes. It may not be an issue for Windows. However, __declspec(thread) is broken on pre-Vista OSes. It does not work correctly for explicitly loaded dynamic libraries (see here). We found out the hard way from a customer writing plug-ins with TBB 1.0, and switched to using TlsAlloc.

Oh, Ok, I see. So, while user of eventcount provides thread descriptor manually, eventcount must not allocate TLS slots. But once user calls prepare_wait() w/o thread descriptor, eventcount will allocate TLS slot.

There is another problem. On Windows eventcount must hook into DllMain to get thread detach notifications.

The other possible solution is to implement trick desribed here:

http://www.codeguru.com/Cpp/misc/misc/threadsprocesses/article.php/c6945__2/

It allows one to get thread atach/detach notifications w/o DllMain, even from statically linked libraries. However it's quite risky. It was working on my machine with statically/dynamically linked run-time, under debug/release, on several versions of MSVC; but I heard from some people that it doesn't work on their machines... however maybe they was testing on MinGW or CygWin, I don't know.

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,657 Views

"commit_wait" is not totally crazy. The thread is committing to wait until woken up. It's like committing to hibernation in science fiction. It's a serious commitment :-)

Ok, it was sounding stupid from point of view of non-native speaker. I will rename it to commit_wait() in eventcount. However what to do in eventcount::blocking? It doesn't have prepare_wait() and cancel_wait(), only wait()...

Another way to wrap the two-phase commit protocol, which is particularly attractive with C++0x lambda expressions, would be to have wait take a functor argument that returns true if wait should commit, and false if it should cancel.

Yes, it's possible to make an interface like boost/std::condition_variable has, with:

template void wait(pred_t pred);

However I am not sure whether I want so much sugar in low-level synchronization primitives. Note that it also possible to add similar signature for atomic<>::compare_exchange(), which accepts functor which receives old value and returns new, such compare_exchange() will not return bool or old value.

0 Kudos
ARCH_R_Intel
Employee
1,657 Views
On the syntactic sugar issue, I'd be fine with you keeping your original proposed interface with prepare/commit/cancel methods, and have something completely separate for the sugared lambda form. I like the way C++0x did their mutexes and provided both a low level unstructured interface and a high level interface.At the low level, there raw lock/unlock calls, and for exception safety andstructure there is the lock_guard form. Maybe call your blocking class wait_guard?

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views
On the syntactic sugar issue, I'd be fine with you keeping your original proposed interface with prepare/commit/cancel methods, and have something completely separate for the sugared lambda form.

Ok. It's always possible to add those methods later, if there will be the need.

At the low level, there raw lock/unlock calls, and for exception safety andstructure there is the lock_guard form. Maybe call your blocking class wait_guard?

Ok. I will rename it.

Now I must get some time for validation of the thing...

0 Kudos
ARCH_R_Intel
Employee
1,658 Views
Quoting - Dmitriy Vyukov

Now I must get some time for validation of the thing...

Any progress on validating it? I keep running into situations where it would be useful to have your official contribution.

- Arch
0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,658 Views
Quoting - Dmitriy Vyukov

Now I must get some time for validation of the thing...

Any progress on validating it? I keep running into situations where it would be useful to have your official contribution.

- Arch

Sorry for the delay. Now I've almost recovered from New Year's holidays :)

Ok, let's do it this way. I've made minor changes (rename methods, add wait_guard class, add explicit ec_thread parameter) and submitted it as official contribution (and also attached copy to this post).

I will still try to validate it, and if I will find some bugs I will resubmit or just report them here.



0 Kudos
ARCH_R_Intel
Employee
1,658 Views
Quoting - Dmitriy Vyukov
Ok, let's do it this way. I've made minor changes (rename methods, add wait_guard class, add explicit ec_thread parameter) and submitted it as official contribution (and also attached copy to this post).

Thanks!

- Arch
0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,553 Views
I've made some unit-tests with Relacy Race Detector (mpmc-queue with fine-grained signaling, and test which uses condition variable implemented on top of eventcount). The only error revealed so far is:

[cpp]    void commit_wait(ec_thread* th = 0)
    {
        if (th == 0)
            th = ec_thread::current();
        // this check is just an optimization
        if (th->epoch_ == epoch_)
            th->sema_.wait();
        else
            cancel_wait(th); // <--- add 'th' parameter here, it's missed in original version
    }
[/cpp]


Also some variables must be replaced with atomic and relaxed operations on them (in order to shutdown 'data race' errors and provide better documentation). Here is a list of such variables:
dlist::size_
ec_thread::in_waitset_
eventcount::epoch_
Note that all operations on them must be relaxed, i.e. increment of the epoch must look like:
unsigned ep = epoch_.load(std::memory_order_relaxed);
epoch_.store(ep + 1, std::memory_order_relaxed);




0 Kudos
Reply