Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

concurrent_bounded_queue clear safety

nagy
New Contributor I
374 Views
I just noticed that concurrent_bounded_queue::clear is not thread safe. Would someone mind explaining in what context this could be a problem?
e.g.
Has the following class definedbehavior?
[cpp]class active_object
{
public:
     active_object()
     {
          queue_.set_capacity(3);
          thread_ = tbb::tbb_thread([this]{run();}); // Another edit.
     }

     ~active_object()
     {
         queue_.clear(); // problem?
         queue_.push(nullptr);
         thread_.join(); // EDIT. Forgot this.
     }

     void push(const std::function& func)
     {
         queue_.push(func);
     }   

private:

     void run()
     {
          std::function func = []{};
          while(func != nullptr)
          {
               queue_.pop(func); 
               func();
          }
     }

     tbb::tbb_thread thread_;
     tbb::concurrent_bounded_queue<:FUNCTION>> queue_;

};[/cpp]
0 Kudos
1 Solution
Dmitry_Vyukov
Valued Contributor I
374 Views
As far as I see, yes, here is an undefined behavior. Non-thread-safe clear() is excuted concurrently with pop().
I would implement it with separate out-of-band flag:


[cpp]class active_object
{
public:
     active_object()
        : thread_([this]{run();})
        , stop_(0)
    {
    }

     ~active_object()
     {
         stop_ = 1;
         queue_.push(nullptr);
         thread_.join();
     }

     void push(const std::function& func)
     {
         queue_.push(func);
     }   

private:

     void run()
     {
          for (;;)
          {
               std::function func;
               queue_.pop(func);
			   if (stop_ == 1)
			       break;
               func();
          }
     }

     tbb::atomic stop_;
     tbb::tbb_thread thread_;
     tbb::concurrent_bounded_queue<:FUNCTION>> queue_;
};[/cpp]

View solution in original post

0 Kudos
7 Replies
RafSchietekat
Valued Contributor III
374 Views

I think that without further guarantees for tbb::concurrent_bounded_queue it is problematic to (drop out of run() and therefore destroy this and therefore) call clear() before the thread knows that the client that pushed nullptr from another thread has safely returned from push() (which can be remedied by locking a mutex in active_object::push(), but at some cost to or possibly defeating queue_'s scalability), silly to push nullptr at the end of the destructor (right?), and problematic to destroy queue_ as part of running ~active_object() for similar but even strongerreasons as/than for calling clear(), i.e., the queue is still being using by multiple threads.

This has been discussed in another thread ("atomicity" of push()). Maybe I'll add a reference later, unless somebody else does it first.

(Added)Maybe I'm mistaken about what the queue type guarantees, but a better workaround seems to lock a mutex to a special end-of-input operation that pushes nullptr, with active_object briefly locking that same mutex when it receives nullptr.

0 Kudos
Dmitry_Vyukov
Valued Contributor I
375 Views
As far as I see, yes, here is an undefined behavior. Non-thread-safe clear() is excuted concurrently with pop().
I would implement it with separate out-of-band flag:


[cpp]class active_object
{
public:
     active_object()
        : thread_([this]{run();})
        , stop_(0)
    {
    }

     ~active_object()
     {
         stop_ = 1;
         queue_.push(nullptr);
         thread_.join();
     }

     void push(const std::function& func)
     {
         queue_.push(func);
     }   

private:

     void run()
     {
          for (;;)
          {
               std::function func;
               queue_.pop(func);
			   if (stop_ == 1)
			       break;
               func();
          }
     }

     tbb::atomic stop_;
     tbb::tbb_thread thread_;
     tbb::concurrent_bounded_queue<:FUNCTION>> queue_;
};[/cpp]
0 Kudos
RafSchietekat
Valued Contributor III
374 Views
I think clear() is only called after the last pop(), the one that returned nullptr, because the thread first has to finish executing before it is destroyed, right?

(Added) No, wait, apparently the queue's capacity is set while the thread is already running, which seems like another problem. But I was wrong about when active_object is destroyed (some confusion with Java Threads, I thought that the consumer would destroy active_object), if the intention is that one thread creates the object, performs a number of push() operations, and then destroys it, because that creator/producer thread does not know when the consumer thread has finished.
0 Kudos
Dmitry_Vyukov
Valued Contributor I
374 Views
I think clear() is only called after the last pop(), the one that returned nullptr, because the thread first has to finish executing before it is destroyed, right?

The thread is not yet joined/destroyed when clear() is called.

0 Kudos
RafSchietekat
Valued Contributor III
374 Views

"The thread is not yet joined/destroyed when clear() is called."
Right, there's a synchronisation missing, see my addition above.

(Added) Ah, I see a join() has magically appeared in the original question...

(Added) Sorry, I'm not helping (too much distraction); Dmitriy's answer seems to be right. Don't forget to set capacity before starting the thread, though.

(Added) Apparently it's even worse: the thread is possibly launched before the queue is even created...

0 Kudos
RafSchietekat
Valued Contributor III
374 Views
I think that Dmitriy's solution should be corrected by either launching the thread inside the body of the constructor, or otherwise making thread_ the last instance variable, so that initialisation of the others will have completed first, and then the body must remain empty of anything that the thread might use. Also, use value-initialisation (sic?) for stop_, i.e., empty argument list, which makes the initial value zero, because an atomic has no constructor that takes an argument of 0. Right?
0 Kudos
Dmitry_Vyukov
Valued Contributor I
374 Views
> I think that Dmitriy's solution should be corrected by either launching the thread inside the body of the constructor

Agree. Implicit thread launch is not a good idea anyway.

> Also, use value-initialisation (sic?) for stop_, i.e., empty argument list, which makes the initial value zero, because an atomic has no constructor that takes an argument of 0.

I don't remember what initialization it provides. Use whatever works for you. I would prefer:
atomic_store_explicit(&stop_, 0, memory_order_relaxed);

0 Kudos
Reply