Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Concurrent_queue push when full behavior

Ender1618
Beginner
604 Views

Im completely new to TBB, so please forgive my ignorance.

Is there any way to have concurrent_queue drop the oldest item in the queue (if its reached its max size) on a push instead off blocking, and in a different usage is there a method for the queue to just not add to the new item on a push if its reached its max size?

The idea is to have a producer consumer throttling method, my data is a stream and i can afford drops if the consumer can't keep up with the producer. Since the producer can produce at a variable rate (typically steady, but it can spike).

I use this method in my own concurrent queue, with native threads running the producer and consumer. I just wondered if this would be possible using the TBB concurent_queue, as i am considering replacing most of my native threading code with TBB, and I need these behaviors.

Thanks

0 Kudos
7 Replies
Ender1618
Beginner
604 Views

Forgive me i meant concurrent bounded queue, not concurrent queue.

0 Kudos
jimdempseyatthecove
Honored Contributor III
604 Views

Have you considered writing your own specialized ring buffer? (IOW - push when full implies pre-pop and discard)

Jim Dempsey

0 Kudos
robert-reed
Valued Contributor II
604 Views

Does it have to be the oldest entry?  You could switch the push() for a try_push() and discard the newest on failure.  Or if not, how about a wrapper that does something like a "while (!try_push(item)) {pop(discard)}"?

0 Kudos
Ender1618
Beginner
604 Views

jimdempseyatthecove wrote:

Have you considered writing your own specialized ring buffer? (IOW - push when full implies pre-pop and discard)

Jim Dempsey

How would you create a specialized ring buffer that has the same TBB concurent lockless properties of concurrent bounded queue? Is there a way to specialize concurrent bounded queue?

0 Kudos
Ender1618
Beginner
604 Views

robert-reed (Intel) wrote:

Does it have to be the oldest entry?  You could switch the push() for a try_push() and discard the newest on failure.  Or if not, how about a wrapper that does something like a "while (!try_push(item)) {pop(discard)}"?

I need the abilty to select discard oldest or newest mode, discard oldest is right now the most important mode, since its the oldest its the least relavent in my circumstance (live streaming, newer items are most relavant). Would i be able to iterate and erase the oldest item (essentially the item at the front of the queue, next to be processed), in a safe qucik manner?

0 Kudos
RafSchietekat
Valued Contributor III
604 Views

Ender1618 wrote:

Would i be able to iterate and erase the oldest item (essentially the item at the front of the queue, next to be processed), in a safe qucik manner?

Robert provided suggestions for both back and front, respectively (the "or if not" apparently applies to the second sentence, not to the question).

That means that there is probably no need for new functionality, although the second suggestion is vulnerable to race conditions, so you should probably replace the pop() with a (void)try_pop() to avoid getting stuck or possibly deadlocked, however unlikely.

0 Kudos
Alexey-Kukanov
Employee
604 Views

The solution suggested by Robert (with Raf's correction) seems to be as good as we could do in a special method for concurrent_bounded_queue, if not even better.

For a special method to replace the oldest item we would need to solve how that method interoperates with the usual blocking push(), i.e. if there are waiting push() calls, what should the new method do - keep them waiting, let them proceed first (in order to keep FIFO), or maybe abort them? And I am not sure if any of these solutions can be implemented with low cost while keeping consistent state and guaranteed properties of the queue.

In your app, you might just not bother about such subtleties and require that blocking push() is never used directly.

One more correction you might consider (in the case of mutiple producers pushing to the queue) is to not try putting a new item indefinitely but just as many times as the queue size limit is. The reasoning is that if a thread popped as many items as the queue size and still cannot push its own item in, there must have been just as many "newer" items pushed by concurrent producers, which made the thread's item "outdated".

0 Kudos
Reply