- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am using a tbb::concurrent_bounded_queue to communicate in a buffered fashion between N producer/consumer threads.
Like so:
<code>//which has been set to some max capacity tbb::concurrent_bounded_queue<MyItemType> g_queue; ThreadA() { while(running_A) { MyItemType outItem; //do some stuff, resulting in filling outItem ... g_queue.push(outItem); } } ThreadB() { while(running_B) { MyItemType inItem; g_queue.pop(inItem); //do some stuff with the contents of inItem } } </code>
The example I just showed only has one producer and one consumer, in reality there can many of both. At a user's request all processing should cease as soon as possible.
So if thread A has pushed a few items, that B had not consumed, I want the next call to B's pop to immediately return somehow indicating that the queue is closed so that I can break from the loop. Any items on the queue not serviced yet can be discarded.
If thread A is waiting on a push, I want that to return immediately, perhaps indicating that the queue is closed for business.
I read some where a way of potentially doing this is to push a special item (stop item, indicator of cease and desist) on the queue, then when that item is popped i can know things are stopping. The issue with this is that it's a FIFO queue, and any items ahead of the stop item need to be popped before it can get to my stop item. Which i want to avoid.
I see there is an q.abort method, that would cause waiting push or pops to return throwing an exception (works only if I set a tbb preprocessor define TBB_USE_EXCEPTIONS). Didn't think having to turn on exceptions for all of tbb to be a proper way of doing this?
I also thought I could clear the queue before adding my stop item, but theoretically another producer can come in between my current consumers q.clear() and q.push(stopItem), and push a non-stopItem, which is not what I want. Since i would have to wait for that non-stopItem to be processed before it can get to the pushed stopItem.
What is the proper way of handling this? I previously had my own queue implementation that had a close signal that would cause all pops and pushes to return (not just fill in the reference item parameter, but return a code from the function all) with a certain value code that would indicate that the queue was closed. Not sure how to have a similar capability with tbb's concurrent bounded queue.
Any suggestions?
Thanks,
-Ryan
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
(Removed duplicate.)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If you want to roll your own, use a combination of an in-band poison pill (a special value) and an out-of-band kill switch (e.g., an atomic<bool>).
Don't use clear(), which is marked "not thread-safe"... in the source code! The Reference Manual is obviously derelict in its duty of unambiguously documenting which operations are thread-safe, even listing clear() right after abort(), which is implicitly meant for concurrent use. If this were supposed to be obvious (which it isn't!), there wouldn't be a need to have a comment in the source code, either.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
for (;;) { pop(); if (poison pill) break; if (kill switch) break; process item; }
(Added) It is not immediately clear whether any producers could remain blocked that way, but I wouldn't start reasoning about edge conditions and instead just start flushing the pipeline using repeated try_pop() (for lack of the above-mentioned hypothetical thread-safe clear()), freeing any blocked producers, which would then read the kill switch and stop instead of pushing their next item.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
<code>//which has been set to some max capacity tbb::concurrent_bounded_queue<MyItemType> g_queue; ThreadA() { while(running_A) { MyItemType outItem; //do some stuff, resulting in filling outItem ... if(poison_pill || kill_switch) { for(;;) { try_pop(); // until no more } break; } g_queue.push(outItem); if(poison_pill || kill_switch) { for(;;) { try_pop(); // until no more } break; } } } ThreadB() { while(running_B) { MyItemType inItem; if(poison_pill || kill_switch) { for(;;) { try_pop(); // until no more } break; } g_queue.pop(inItem); //do some stuff with the contents of inItem } } </code>
I think this is what Raf was outlining
Jim Dempsey
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
See also my answer on SO: basically, the same atomic flag and usage of try_ methods as alternative to blocking methods + exception.

- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page