Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2464 Discussions

Proper way to cancel all waiting pushes or pops on tbb::concurrent_bounded_queue?

Ender1618
Beginner
714 Views

I am using a tbb::concurrent_bounded_queue to communicate in a buffered fashion between N producer/consumer threads.

Like so:

<code>//which has been set to some max capacity
tbb::concurrent_bounded_queue<MyItemType> g_queue; 

ThreadA()
{ 
  while(running_A)
  { 
    MyItemType outItem;
    //do some stuff, resulting in filling outItem ...
    g_queue.push(outItem);
  }
}

ThreadB()
{
  while(running_B)
  { 
    MyItemType inItem;
    g_queue.pop(inItem);
    //do some stuff with the contents of inItem
  }
}
</code>

The example I just showed only has one producer and one consumer, in reality there can many of both. At a user's request all processing should cease as soon as possible.

So if thread A has pushed a few items, that B had not consumed, I want the next call to B's pop to immediately return somehow indicating that the queue is closed so that I can break from the loop. Any items on the queue not serviced yet can be discarded.

If thread A is waiting on a push, I want that to return immediately, perhaps indicating that the queue is closed for business.

I read some where a way of potentially doing this is to push a special item (stop item, indicator of cease and desist) on the queue, then when that item is popped i can know things are stopping. The issue with this is that it's a FIFO queue, and any items ahead of the stop item need to be popped before it can get to my stop item. Which i want to avoid.

I see there is an q.abort method, that would cause waiting push or pops to return throwing an exception (works only if I set a tbb preprocessor define TBB_USE_EXCEPTIONS). Didn't think having to turn on exceptions for all of tbb to be a proper way of doing this?

I also thought I could clear the queue before adding my stop item, but theoretically another producer can come in between my current consumers q.clear() and q.push(stopItem), and push a non-stopItem, which is not what I want. Since i would have to wait for that non-stopItem to be processed before it can get to the pushed stopItem.

What is the proper way of handling this? I previously had my own queue implementation that had a close signal that would cause all pops and pushes to return (not just fill in the reference item parameter, but return a code from the function all) with a certain value code that would indicate that the queue was closed. Not sure how to have a similar capability with tbb's concurrent bounded queue.

Any suggestions?

Thanks,

-Ryan

0 Kudos
6 Replies
RafSchietekat
Valued Contributor III
714 Views

(Removed duplicate.)

0 Kudos
RafSchietekat
Valued Contributor III
714 Views

If you want to roll your own, use a combination of an in-band poison pill (a special value) and an out-of-band kill switch (e.g., an atomic<bool>).

Don't use clear(), which is marked "not thread-safe"... in the source code! The Reference Manual is obviously derelict in its duty of unambiguously documenting which operations are thread-safe, even listing clear() right after abort(), which is implicitly meant for concurrent use. If this were supposed to be obvious (which it isn't!), there wouldn't be a need to have a comment in the source code, either.

0 Kudos
Ender1618
Beginner
714 Views
How does a in-band poison pill and an out-of-band kill switch solve the problem? Maybe I am not clear on what you mean by an out off band kill switch. If by kill switch you mean an atomic bool that is used to control (stop) the while loop, it will never be reached if the while loop is halted because of a blocking push or pop. By out of band kill switch what do you mean? Thanks, -Ryan
0 Kudos
RafSchietekat
Valued Contributor III
714 Views
for (;;) {
  pop();
  if (poison pill) break;
  if (kill switch) break;
  process item;
}

(Added) It is not immediately clear whether any producers could remain blocked that way, but I wouldn't start reasoning about edge conditions and instead just start flushing the pipeline using repeated try_pop() (for lack of the above-mentioned hypothetical thread-safe clear()), freeing any blocked producers, which would then read the kill switch and stop instead of pushing their next item.

0 Kudos
jimdempseyatthecove
Honored Contributor III
714 Views
<code>//which has been set to some max capacity
tbb::concurrent_bounded_queue<MyItemType> g_queue; 

ThreadA()
{ 
  while(running_A)
  { 
    MyItemType outItem;
    //do some stuff, resulting in filling outItem ...
    if(poison_pill || kill_switch)
    {
        for(;;)
        {
           try_pop(); // until no more
        }
        break;
    }
    g_queue.push(outItem);
    if(poison_pill || kill_switch)
    {
        for(;;)
        {
           try_pop(); // until no more
        }
        break;
    }
  }
}

ThreadB()
{
  while(running_B)
  { 
    MyItemType inItem;
    if(poison_pill || kill_switch)
    {
        for(;;)
        {
           try_pop(); // until no more
        }
        break;
    }
    g_queue.pop(inItem);
    //do some stuff with the contents of inItem
  }
}
</code>

I think this is what Raf was outlining

Jim Dempsey

0 Kudos
Anton_M_Intel
Employee
714 Views

See also my answer on SO: basically, the same atomic flag and usage of try_ methods as alternative to blocking methods + exception.

0 Kudos
Reply