Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

atomic issue

pmp_ppeters1
Beginner
412 Views
context: attempting to create multiple reader/writer ring buffer
issue: I am not sure I am using atomics correctly
notes: the code below is the array that is used as the internal data store for ring buffer, the assert below fails (sometimes) and I am not sure why or how.
All wisdom is welcome, thank you
[bash]/*
 This class implements a locking read/write array w/
 multiple writers and readers.  There are three stipulations
 1) known number of readers, 2) after a write all readers 
 must read the node before another write to it, 3) no reader
 may read the same node twice before a write
 */
template
class locking_array
{
    static const int cardinality = N;
    static const int numthreads = M;
    
    struct element{
        T val;
        tbb::atomic count;
        element(): count()
        {} 
    };
    
    element array[cardinality];
    
public:
    
    locking_array()
    {}
    
    inline T read(int i)
    {
        while (!array.count);
        assert(array.count >= 0);
        array.count--;
        return array.val;
    }
    
    inline void write(int i, T* val)
    {
        while (array.count);
        array.val = *val;
        array.count.fetch_and_store(numthreads);
    }
};[/bash]
0 Kudos
3 Replies
RafSchietekat
Valued Contributor III
412 Views
The writer only needs assignment (and its implied release semantics) of "count", but the code is still correct.

In the readers, you must read "val" before decrementing "count", so the code is wrong, but not in a way that makes the assert fail, except perhaps indirectly.

But you should be able to make the assert test for count>0, which implies count>=0, so I don't immediately see the cause of the problem.

(Added 2011-10-14) Perhaps showing a bit more of how the code is used might help, if you didn't solve it already by correcting the reader code in case my assumption of indirect causation was correct.
0 Kudos
pmp_ppeters1
Beginner
412 Views
[cpp]I followed your suggestions, but i'm still having issues.  I mostly wanted to make sure I was using the atomics correctly, and [/cpp]
[cpp]it sounds like I was for the most part.  It appears there is perhaps a bug somewhere else in the implementation. I am not sure where.[/cpp]
[cpp]As per your request I have included the rest of the code.[/cpp]
[cpp]
[/cpp]
[cpp]/*
 This class implements a locking read/write array w/
 multiple writers and readers.  There are three stipulations
 1) known number of readers, 2) after a write all readers 
 must read the node before another write to it, 3) no reader
 may read the same node twice before a write
 */
template
class blocking_array
{
    static const int cardinality = N;
    static const int numthreads = M;
    
    struct element{
        T val;
        tbb::atomic count;
        element(): count()
        {} 
    };
    
    element array[cardinality];
    
public:
    
    blocking_array()
    {}
    
    inline T read(int i)
    {
        while (!array.count);
        assert(array.count > 0);
        T val = array.val;
        array.count--;
        return val;
    }
    
    inline void write(int i, T* val)
    {
        while (array.count);
        array.val = *val;
        array.count = numthreads;
    }
};

/*
 N must be greater than M, otherwise we get deadlock
 */
template 
class circle_buffer{
    
        
    static const int cardinality = N;
    static const int numthreads = M;
    
public:
    circle_buffer() : consumer_head(), exited(), entered(), produced_head()
    {}
    
    ~circle_buffer()
    {}
    
    /*
     All threads must be attached before we begin producing
     */
    int attach()
    {
        assert(produced_head == 0);
        tbb::atomic  c_head;
        c_head = 0;
        consumer_head.push_back(c_head);
        return consumer_head.size()-1;
    }
    
    inline void push(T* element)
    {
        uint32_t enteredindex = entered++;
        elements.write((enteredindex) & (cardinality - 1), element);
        uint32_t exitedindex = exited++;
        
        if(exitedindex == entered)
            produced_head.fetch_and_store(exitedindex);
    }
    
    inline T pop(int idx)
    {
        return elements.read(consumer_head[idx]++ & (cardinality - 1));
    }
    
    
    inline uint32_t head_index()
    {
        return produced_head;
    }
    
    
    private:
    
    blocking_array elements;
    
    tbb::atomic produced_head;
    tbb::atomic entered;
    tbb::atomic exited;
    
    
    std::vector< tbb::atomic > consumer_head;
    
    
};[/cpp]
0 Kudos
RafSchietekat
Valued Contributor III
412 Views
I don't know what you intend with "entered" and "exited", but it seems to me that the conditional code in push() is never executed, so produced_head stays 0, so head_index() always returns 0. Does that explain anything of what you are seeing (it's not really a self-contained program)?

(Added) If you have multiple writers (which I had not considered before seeing "entered" and "exited", hereby retracting what I wrote about produced_head above without reexamining what may potentially still be problematic code), there would be a possible race if a new writer catches up with an older one and considers the same position in the circular buffer. Both detect that the value has been consumed, one signals a new value by writing "numthreads" to "count" (I'm ignoring the overwritten "val" for now), readers start consuming and N succeed, the other writer signals a new value ("count" at "numthreads" again), the other numthreads-N readers consume "val" and "count" is now numthreads-(numthreads-N)=N, then the readers make a round of the buffer, N of them read "val", and the next reader sees "count" at 0, triggering the assert. I've got to go now, others may chime in to suggest alternatives.
0 Kudos
Reply