Community
cancel
Showing results for 
Search instead for 
Did you mean: 
nagy
New Contributor I
67 Views

concurrent_bounded_queue clear safety

Jump to solution
I just noticed that concurrent_bounded_queue::clear is not thread safe. Would someone mind explaining in what context this could be a problem?
e.g.
Has the following class definedbehavior?
[cpp]class active_object
{
public:
     active_object()
     {
          queue_.set_capacity(3);
          thread_ = tbb::tbb_thread([this]{run();}); // Another edit.
     }

     ~active_object()
     {
         queue_.clear(); // problem?
         queue_.push(nullptr);
         thread_.join(); // EDIT. Forgot this.
     }

     void push(const std::function& func)
     {
         queue_.push(func);
     }   

private:

     void run()
     {
          std::function func = []{};
          while(func != nullptr)
          {
               queue_.pop(func); 
               func();
          }
     }

     tbb::tbb_thread thread_;
     tbb::concurrent_bounded_queue<:FUNCTION>> queue_;

};[/cpp]
0 Kudos
1 Solution
Dmitry_Vyukov
Valued Contributor I
67 Views
As far as I see, yes, here is an undefined behavior. Non-thread-safe clear() is excuted concurrently with pop().
I would implement it with separate out-of-band flag:


[cpp]class active_object
{
public:
     active_object()
        : thread_([this]{run();})
        , stop_(0)
    {
    }

     ~active_object()
     {
         stop_ = 1;
         queue_.push(nullptr);
         thread_.join();
     }

     void push(const std::function& func)
     {
         queue_.push(func);
     }   

private:

     void run()
     {
          for (;;)
          {
               std::function func;
               queue_.pop(func);
			   if (stop_ == 1)
			       break;
               func();
          }
     }

     tbb::atomic stop_;
     tbb::tbb_thread thread_;
     tbb::concurrent_bounded_queue<:FUNCTION>> queue_;
};[/cpp]

View solution in original post

7 Replies
RafSchietekat
Black Belt
67 Views

I think that without further guarantees for tbb::concurrent_bounded_queue it is problematic to (drop out of run() and therefore destroy this and therefore) call clear() before the thread knows that the client that pushed nullptr from another thread has safely returned from push() (which can be remedied by locking a mutex in active_object::push(), but at some cost to or possibly defeating queue_'s scalability), silly to push nullptr at the end of the destructor (right?), and problematic to destroy queue_ as part of running ~active_object() for similar but even strongerreasons as/than for calling clear(), i.e., the queue is still being using by multiple threads.

This has been discussed in another thread ("atomicity" of push()). Maybe I'll add a reference later, unless somebody else does it first.

(Added)Maybe I'm mistaken about what the queue type guarantees, but a better workaround seems to lock a mutex to a special end-of-input operation that pushes nullptr, with active_object briefly locking that same mutex when it receives nullptr.

Dmitry_Vyukov
Valued Contributor I
68 Views
As far as I see, yes, here is an undefined behavior. Non-thread-safe clear() is excuted concurrently with pop().
I would implement it with separate out-of-band flag:


[cpp]class active_object
{
public:
     active_object()
        : thread_([this]{run();})
        , stop_(0)
    {
    }

     ~active_object()
     {
         stop_ = 1;
         queue_.push(nullptr);
         thread_.join();
     }

     void push(const std::function& func)
     {
         queue_.push(func);
     }   

private:

     void run()
     {
          for (;;)
          {
               std::function func;
               queue_.pop(func);
			   if (stop_ == 1)
			       break;
               func();
          }
     }

     tbb::atomic stop_;
     tbb::tbb_thread thread_;
     tbb::concurrent_bounded_queue<:FUNCTION>> queue_;
};[/cpp]

View solution in original post

RafSchietekat
Black Belt
67 Views
I think clear() is only called after the last pop(), the one that returned nullptr, because the thread first has to finish executing before it is destroyed, right?

(Added) No, wait, apparently the queue's capacity is set while the thread is already running, which seems like another problem. But I was wrong about when active_object is destroyed (some confusion with Java Threads, I thought that the consumer would destroy active_object), if the intention is that one thread creates the object, performs a number of push() operations, and then destroys it, because that creator/producer thread does not know when the consumer thread has finished.
Dmitry_Vyukov
Valued Contributor I
67 Views
I think clear() is only called after the last pop(), the one that returned nullptr, because the thread first has to finish executing before it is destroyed, right?

The thread is not yet joined/destroyed when clear() is called.

RafSchietekat
Black Belt
67 Views

"The thread is not yet joined/destroyed when clear() is called."
Right, there's a synchronisation missing, see my addition above.

(Added) Ah, I see a join() has magically appeared in the original question...

(Added) Sorry, I'm not helping (too much distraction); Dmitriy's answer seems to be right. Don't forget to set capacity before starting the thread, though.

(Added) Apparently it's even worse: the thread is possibly launched before the queue is even created...

RafSchietekat
Black Belt
67 Views
I think that Dmitriy's solution should be corrected by either launching the thread inside the body of the constructor, or otherwise making thread_ the last instance variable, so that initialisation of the others will have completed first, and then the body must remain empty of anything that the thread might use. Also, use value-initialisation (sic?) for stop_, i.e., empty argument list, which makes the initial value zero, because an atomic has no constructor that takes an argument of 0. Right?
Dmitry_Vyukov
Valued Contributor I
67 Views
> I think that Dmitriy's solution should be corrected by either launching the thread inside the body of the constructor

Agree. Implicit thread launch is not a good idea anyway.

> Also, use value-initialisation (sic?) for stop_, i.e., empty argument list, which makes the initial value zero, because an atomic has no constructor that takes an argument of 0.

I don't remember what initialization it provides. Use whatever works for you. I would prefer:
atomic_store_explicit(&stop_, 0, memory_order_relaxed);

Reply