Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Blocking pipeline output

nagy
New Contributor I
2,600 Views
I have a pipeline that im using to fill a result buffer with processed items. However I don't want the result buffer to get more than x number of items. This means that the pipeline somewhere has to "sleep" until someonepulls (takes) an item from the buffer.

Could I do this by using a blocking concurrent_queue::push in the serial output filter? How does the pipeline handle when the maximum number of tokens are active? Is it ok to block the output filter (in the same way you can block the input filter)? If the output filter is run as a task this would be a problem since it would block a thread in the tbb threadpool for an unknown amount of time?

My rather less elegant solution right now is that my input filter blocks until it gets a "ticket" from the output filter. The problem with this is that I get a rather ugly dependency between the input and output filter. And also not every processed item gets put into the result buffer which means the "ticket" system needs to take this into account.
0 Kudos
1 Solution
Alexey-Kukanov
Employee
2,600 Views
Quoting nagy
This is a rather big problem for me.
Basicly what ive got is a producer/consumer setup. Where the producer reads in a file and processes several "packets" in different steps. These packets are then consumed by the consumer which runs on its own thread (legacy code) andis NOT part of the pipeline, thus does not take part in the "tokensystem". This means that i have to block the last filter (or the first, as i did previously) in the pipeline when the consumer is slower in order to not get too many active tokens.
The big problem is that i can have several of these producer/consumer setups running at the same time. Which means that it is very probable that I will end up blocking the entire tbb threadpool, and since I dont know how many of these will run i cannot start tbb with N + X threads. Not to mention that it could deadlock the entire application if the consumer starts using tbb tasks.
What id like is to be able to call pipeline::run on a thread i explicitly create and then somehow have the outputfilter run on this thread (or maybe another thread i explicitly create). If the outputfilter blocks then maybe there would be no problem since the blocking thread is not part of the tbb thread pool.
EDIT:
The input filter is also reading from a file, which also would be a good idea to have on its own thread?


I thinkit makes sense to step back, and base on your use case which seems to be well described in the above quote.
You want to use bounded (and so blocking) queue as the interface between the pipeline-based producer and thread-based legacy consumer. It is expected that producer fills the queue faster than the consumer drains it; in this case, the pipeline should block and wait. On the other hand, there might be several simultaneous pipelines working at the same time, so blocking a TBB worker thread is undesirable. Ideally, the master thread that started the pipeline should be the one that blocks.
Unfortunately, the current TBB pipeline implementation has the problems of idle spinning here and there, in particular with thread-bound filters. Basically, the only way to avoid idle spinning in situations when the pipeline can neither proceed nor finish is to make the master thread do a blocking call, and let worker threads go asleep due to absence of available tasks.
Now what I can suggest is to recognize which thread executes the last pipeline stage that should push an iteminto the queue, and do different things depending on that. In a sense, it's like converting a non-thread-bound filter toact likethread-bound. To understand which thread runs the filter, you might use tbb_thread::id (see sections 12.2 and 12.3.1 in TBB 2.2 Reference Manual) - I hate to say this as we always argue for "thread-agnostic" parallel programming, but after all it's TBB issues that require such a workaround. If the master thread executes the filter, it uses the blocking push() method of the queue, so that it blocks when the queue is full. If the worker thread executes the fliter, it should use the non-blocking try_push() method. The question is, what to do when try_push fails. So far, there is no way to tell the pipeline that the filter failed to process the current token, and it should be re-attempted. So the solution I see is to use an intermediate queue in the filter for such items. As the lastfilter should be serial, std::queue would work there, without any additional locking.
In the pseudocode, the last filter I described looks like this:

[plain]if the filter is executed by the master // can block
    // push pending items first
    while intermediate queue is not empty
        pop an item from the top on intermediate queue
        push this item into the bounded output queue
    push the item received as the argument into the bounded output queue
else // the filter executed by a worker; should not block
    // push pending items first
    while intermediate queue is not empty
        read an item from the top of intermediate queue (but not pop it)
        try_push this item into the bounded output queue
        if try_push succeeded
            pop from the top of intermediate queue
        else
            break the while loop
    // process the item received as the argument
    if intermediate queue is empty // i.e. no more pending items
        try_push the received item into the bounded output queue
        if try_push succeeded
            return
    push the item into intermediate queue
return
[/plain]


The non-blocking section might be simplified if first of all the received item is unconditionally pushed to the intermediate queue; then you can just process the queue in the loop. It's little bit suboptimal execution-wise, but the code will be simpler.

View solution in original post

0 Kudos
34 Replies
nagy
New Contributor I
692 Views
"Let me ask some questions (and maybe more will follow after your answers). What is the existing (sequential?) way to do the job? Does your code run several producer/consumer patterns at the same time? If so, does every producerrun in its own thread, or all in a single thread?"

I missed this post, sorry for the late answer.

The way this was done before I started rewriting the code to tbb, it worked the following way.

A producer runs in its own thread, and runs a loop which reads a packet from a file, processes itsequentiallyin different steps (aka pipeline filters), and then puts it into a blocking bounded queue (this needs to block since I do not want the entire file in memory while the consumer pulls packets). This is repeated until all packets are read from the file.

The consumer runs in its own thread. It is connected to one or several producers by receiving a shared pointers to theirqueues. It then pulls items from the queue and sleeps between to sync the rate at which packets are consumed.

Every consumer and producer runs in its own thread, even if there are several producers or consumers.

This design suffers from heavyover-subscription and limited scalabilitywhich why I'm currently rewriting it.

EDIT:

How do I quote in these forums? At first I thought the post I'm replying to isautomaticallyquoted, but that doesn't seem too be the case. And the "blockquote" button doesn't seem to do anything.
0 Kudos
nagy
New Contributor I
692 Views
I have solved the problem, without the need to create an additional thread for blocking.
I create a ConsumerProxy (as suggested) implementing athread_bound_filter(as suggested).
This way I can let the consumer pull from the proxy and the legacy consumer thread blocks if there are no items available. Thus avoiding both the problem with an additional blocking thread or blocking the tbb taskpool.
It looks something like this.
class ConsumerAdapter : tbb::thread_bounded_filter
VideoFramePtr get_frame() // buffering is not included for simplicity
{
while(frameQueue_.empty() && !eof_)
eof_ = process_item() == tbb::thread_bound_filter::end_of_stream;
if(frameQueue_.empty())
return nullptr;
VideoFramePtr result =frameQueue_.front();
frameQueue_.pop();
return result;
}
void* operator()(void* item)
{
if(item != nullptr)
frameQueue_.push(VideoFramePtr(static_cast(item)));
return nullptr;
}
What happens is that as soon as the consumer needs a frame it calls get_frame, if there are no frames available it calls the thread_bound_filter function process_item which potentially pulls an item from the pipeline. Works like a charm.Seems to work.
Thanks for all the help.
EDIT: updated the code
EDIT2: Did some more testing. It seems that a blocking thread_bound_filter has very high overhead. Even when used in the waydemonstratedin the tutorial document. My original solution with blocking tasks had about 30% cpu utilization while if i use a thread_bound_filter i get up to 80% cpu utilization, something is wrong.
0 Kudos
RafSchietekat
Valued Contributor III
692 Views
It seems a good idea to lend thread_bound_filter the use of a thread only when needed (but then of course you know more about that legacy code than you've told us).

Unfortunately for performance, process_item() busy-waits.
0 Kudos
Alexey-Kukanov
Employee
692 Views
Quoting nagy
EDIT2: Did some more testing. It seems that a blocking thread_bound_filter has very high overhead. Even when used in the waydemonstratedin the tutorial document. My original solution with blocking tasks had about 30% cpu utilization while if i use a thread_bound_filter i get up to 80% cpu utilization, something is wrong.


Yes, as Raf said, process_item() busy-waits, and we actually need to fix it (I am sorry, the problem was just lost from our radars, so to say).

Meanwhile, the recommendation is to use try_process_item() (which returns one more type of value to indicate absense of work) and some external blocking mechanism, such as Windows event, or condition_variable, or semaphore.

0 Kudos
nagy
New Contributor I
692 Views
yes, under further investigation i noticed it does a__TBB_Yield() loop. I hope you will be able to fix this as soon as possible. Meanwhile I while usetbb::interface5::condition_variable.
0 Kudos
Alexey-Kukanov
Employee
692 Views
Quoting nagy
Meanwhile I while usetbb::interface5::condition_variable.


The recommendation is to use it as std::condition_variable.

tbb::interface5 namespace is an implementation detail that can change over time (e.g. if the implementationis reworked in incompatible way it will go into tbb::interfaceX), while the interface represented by std::condition_variable is expected to be backward-compatible.

0 Kudos
nagy
New Contributor I
692 Views
I didn't want to create another thread, but there seems to be an error in the reference document.
If there is other work to do while the pipeline is running, the call to method
pipeline::run can be replaced by a pair of calls pipeline::start_run and
pipeline::finish_run, and the calling thread can do other work between the calls.
The example in Section 3.9.7 has an example.
This does not seem to be accurate?
0 Kudos
Alexey-Kukanov
Employee
692 Views

Yes, it's inaccurate. I apologize for that. In prototypes of thread-bound filter, we tried to provide the ability to bind it to the same thread that starts the pipeline; thisnote in Reference is an artifact of that effort. The methods were never provided as a supported feature.

If you do not want to start another thread for TBF but instead use the current one, I have a suggestion. The common idea is to fire a special task to start the pipeline, which will be taken and executed by a TBB worker thread. This way, the worker becomes blocked by pipeline::run(), while your thread can serve TBF or do something else. I can suggest two ways for implementing that:
- use class task_group. It is pretty convenient to fire a task for asyncronous execution, then do some other stuff, then wait for completion of the task. Its disadvantage is that if no worker thread is available (e.g. the program runs on a single core), the task may not get executed so the pipeline won't start at all.
- use new method task::enqueue() available in most recent stable releases. It is easy enough to start a task though not as convenient as task_group, and much less convenient to wait for task completion (no direct support to wait for enqueued tasks; if you need it you have to do it manually, either with an event/semaphore/condition variable, or with otherwise unnecessary parent task). Its advantage is that it guarantees execution by a worker thread, i.e. does not have the problem of the first approach.

The common shortcoming of both approaches, however, is that the worker thread that takes the task and starts the pipeline will busy-wait when the pipeline is empty but not finished.

0 Kudos
nagy
New Contributor I
692 Views
I've been experimenting a bit and i still can't get rid of the cpu overhead of thread_bound_filter. Its better than when I had the busy wait, but im still at 70% cpu compared to 30% with blocking tbb task.

As suggested I'm using try_process_item with my own blocking.
My current setup is something like like this. This is not my intended final solution but I need to be able to solve the overhead problem first.
input_filter -> process_filter -> output_filter -> dummy_filter
input_filter: reads from a file
process_filter: different processing filters
output_filter: implements a blocking "ticket system"
concurrent_bounded_queue queue_;
FramePtr output_filter::GetFrame()
{
FramePtr pFrame;
queue_.pop(pFrame);
returnpFrame;
}
void* output_filter::operator()(void* item)
{
/* Does what's necessary... */
queue.push(pFrame); // should never block
return nullptr;
}
dummy_filter : a thread_bound_filter that does nothing and is used to control pipeline token count.
Then I start the pipeline like this.
tbb::tbb_thread t([&]{ pipeline.run(n_tokens); });
do
{
FramePtr pFrame = output_.GetFrame(); // Blocking pop, wait until there is a finished frame
if(pFrame)
frameBuffer_.push_back(pFrame); // Blocking push, wait until there is room for another frame
}while (dummy_.try_process_item() != tbb::thread_bound_filter::end_of_stream); // Release token count
t.join();
Any ideas as to what I might be doing wrong?
EDIT: Does the pipeline also busy wait when waiting for the thread_bound_filter to release a token? As far as I can see from the pipeline source theinternal_process_item doesn't result in any new tasks being spawned, or any other notification other thanlow_token being incremented.
0 Kudos
nagy
New Contributor I
692 Views
It seems this discussion died out. I will do one last try.
Ive looked a bit onpipeline_root_task which looks somewhat like this: (I have removed the segmentscanning part since no non-thread_bound_filter follows my thread_bound_filter, i am not 100% sure I can do this but from what I understood from the code comments it shouldnt do anything in my case)
/*override*/ task* pipeline_root_task::execute() {
if( !my_pipeline.end_of_input )
if( !my_pipeline.filter_list->is_bound() )
if( my_pipeline.input_tokens > 0 ) {
recycle_as_continuation();
set_ref_count(1);
return new( allocate_child() ) stage_task( my_pipeline );
}
if( !my_pipeline.end_of_input ) {
recycle_as_continuation();
return this;
}
return NULL;
}
From my understanding this task will keep recycling and reexecute itself when a thread_bound_filter holds tokens (blocking) causing input_tokens == 0. I believe this is a busy-wait? Wouldn't it be a better solution if the pipeline_root_taskwasn't recycled in the case of being starved by a thread_bound_filter. The thread_bound_filter would then respawn thepipeline_root_task once it release one (or several?) tokens and if thepipeline_root_task has died out.
0 Kudos
Alexey-Kukanov
Employee
692 Views
Your analysis is basically correct. Yes it's a busy wait. Indeed the TBF implementation suffers from these, and should be improved somehow, by sleeping internally.

I will think some more what can be done to solve your case, and get back with ideas later today, or tomorrow.
0 Kudos
Alexey-Kukanov
Employee
2,601 Views
Quoting nagy
This is a rather big problem for me.
Basicly what ive got is a producer/consumer setup. Where the producer reads in a file and processes several "packets" in different steps. These packets are then consumed by the consumer which runs on its own thread (legacy code) andis NOT part of the pipeline, thus does not take part in the "tokensystem". This means that i have to block the last filter (or the first, as i did previously) in the pipeline when the consumer is slower in order to not get too many active tokens.
The big problem is that i can have several of these producer/consumer setups running at the same time. Which means that it is very probable that I will end up blocking the entire tbb threadpool, and since I dont know how many of these will run i cannot start tbb with N + X threads. Not to mention that it could deadlock the entire application if the consumer starts using tbb tasks.
What id like is to be able to call pipeline::run on a thread i explicitly create and then somehow have the outputfilter run on this thread (or maybe another thread i explicitly create). If the outputfilter blocks then maybe there would be no problem since the blocking thread is not part of the tbb thread pool.
EDIT:
The input filter is also reading from a file, which also would be a good idea to have on its own thread?


I thinkit makes sense to step back, and base on your use case which seems to be well described in the above quote.
You want to use bounded (and so blocking) queue as the interface between the pipeline-based producer and thread-based legacy consumer. It is expected that producer fills the queue faster than the consumer drains it; in this case, the pipeline should block and wait. On the other hand, there might be several simultaneous pipelines working at the same time, so blocking a TBB worker thread is undesirable. Ideally, the master thread that started the pipeline should be the one that blocks.
Unfortunately, the current TBB pipeline implementation has the problems of idle spinning here and there, in particular with thread-bound filters. Basically, the only way to avoid idle spinning in situations when the pipeline can neither proceed nor finish is to make the master thread do a blocking call, and let worker threads go asleep due to absence of available tasks.
Now what I can suggest is to recognize which thread executes the last pipeline stage that should push an iteminto the queue, and do different things depending on that. In a sense, it's like converting a non-thread-bound filter toact likethread-bound. To understand which thread runs the filter, you might use tbb_thread::id (see sections 12.2 and 12.3.1 in TBB 2.2 Reference Manual) - I hate to say this as we always argue for "thread-agnostic" parallel programming, but after all it's TBB issues that require such a workaround. If the master thread executes the filter, it uses the blocking push() method of the queue, so that it blocks when the queue is full. If the worker thread executes the fliter, it should use the non-blocking try_push() method. The question is, what to do when try_push fails. So far, there is no way to tell the pipeline that the filter failed to process the current token, and it should be re-attempted. So the solution I see is to use an intermediate queue in the filter for such items. As the lastfilter should be serial, std::queue would work there, without any additional locking.
In the pseudocode, the last filter I described looks like this:

[plain]if the filter is executed by the master // can block
    // push pending items first
    while intermediate queue is not empty
        pop an item from the top on intermediate queue
        push this item into the bounded output queue
    push the item received as the argument into the bounded output queue
else // the filter executed by a worker; should not block
    // push pending items first
    while intermediate queue is not empty
        read an item from the top of intermediate queue (but not pop it)
        try_push this item into the bounded output queue
        if try_push succeeded
            pop from the top of intermediate queue
        else
            break the while loop
    // process the item received as the argument
    if intermediate queue is empty // i.e. no more pending items
        try_push the received item into the bounded output queue
        if try_push succeeded
            return
    push the item into intermediate queue
return
[/plain]


The non-blocking section might be simplified if first of all the received item is unconditionally pushed to the intermediate queue; then you can just process the queue in the loop. It's little bit suboptimal execution-wise, but the code will be simpler.

0 Kudos
nagy
New Contributor I
692 Views
Sounds like a solution. I was not aware of the concept of master threads though. Previously I thought that the task scheduler only uses its own worker threads to avoidoversubscribtion. Where can I find more information explaining this? What if the last item is not executed on the master thread and previous items failed to push into the destination buffer?
EDIT: I guess the thread calling wait() (master?) is also included into the task schedulers worker threads?
0 Kudos
Alexey-Kukanov
Employee
692 Views
Any user thread that calls wait_for_all() either directly or via parallel algorithms including pipeline::run() becomes the "master" thread that supplies at least initial set of tasks for TBB workers, and it participates in processing of the tasks.

The task scheduler by default allocates one less SW thread than there are HW threads available, to account for the user thread that starts the work.

Yes, there should be special handling for the last items that can hoard in the intermediate queue; probably it can be done right after returning from pipeline::run(), or alternatively in the destructor of the filter.
0 Kudos
Reply