Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Blocking pipeline output

nagy
New Contributor I
2,608 Views
I have a pipeline that im using to fill a result buffer with processed items. However I don't want the result buffer to get more than x number of items. This means that the pipeline somewhere has to "sleep" until someonepulls (takes) an item from the buffer.

Could I do this by using a blocking concurrent_queue::push in the serial output filter? How does the pipeline handle when the maximum number of tokens are active? Is it ok to block the output filter (in the same way you can block the input filter)? If the output filter is run as a task this would be a problem since it would block a thread in the tbb threadpool for an unknown amount of time?

My rather less elegant solution right now is that my input filter blocks until it gets a "ticket" from the output filter. The problem with this is that I get a rather ugly dependency between the input and output filter. And also not every processed item gets put into the result buffer which means the "ticket" system needs to take this into account.
0 Kudos
1 Solution
Alexey-Kukanov
Employee
2,608 Views
Quoting nagy
This is a rather big problem for me.
Basicly what ive got is a producer/consumer setup. Where the producer reads in a file and processes several "packets" in different steps. These packets are then consumed by the consumer which runs on its own thread (legacy code) andis NOT part of the pipeline, thus does not take part in the "tokensystem". This means that i have to block the last filter (or the first, as i did previously) in the pipeline when the consumer is slower in order to not get too many active tokens.
The big problem is that i can have several of these producer/consumer setups running at the same time. Which means that it is very probable that I will end up blocking the entire tbb threadpool, and since I dont know how many of these will run i cannot start tbb with N + X threads. Not to mention that it could deadlock the entire application if the consumer starts using tbb tasks.
What id like is to be able to call pipeline::run on a thread i explicitly create and then somehow have the outputfilter run on this thread (or maybe another thread i explicitly create). If the outputfilter blocks then maybe there would be no problem since the blocking thread is not part of the tbb thread pool.
EDIT:
The input filter is also reading from a file, which also would be a good idea to have on its own thread?


I thinkit makes sense to step back, and base on your use case which seems to be well described in the above quote.
You want to use bounded (and so blocking) queue as the interface between the pipeline-based producer and thread-based legacy consumer. It is expected that producer fills the queue faster than the consumer drains it; in this case, the pipeline should block and wait. On the other hand, there might be several simultaneous pipelines working at the same time, so blocking a TBB worker thread is undesirable. Ideally, the master thread that started the pipeline should be the one that blocks.
Unfortunately, the current TBB pipeline implementation has the problems of idle spinning here and there, in particular with thread-bound filters. Basically, the only way to avoid idle spinning in situations when the pipeline can neither proceed nor finish is to make the master thread do a blocking call, and let worker threads go asleep due to absence of available tasks.
Now what I can suggest is to recognize which thread executes the last pipeline stage that should push an iteminto the queue, and do different things depending on that. In a sense, it's like converting a non-thread-bound filter toact likethread-bound. To understand which thread runs the filter, you might use tbb_thread::id (see sections 12.2 and 12.3.1 in TBB 2.2 Reference Manual) - I hate to say this as we always argue for "thread-agnostic" parallel programming, but after all it's TBB issues that require such a workaround. If the master thread executes the filter, it uses the blocking push() method of the queue, so that it blocks when the queue is full. If the worker thread executes the fliter, it should use the non-blocking try_push() method. The question is, what to do when try_push fails. So far, there is no way to tell the pipeline that the filter failed to process the current token, and it should be re-attempted. So the solution I see is to use an intermediate queue in the filter for such items. As the lastfilter should be serial, std::queue would work there, without any additional locking.
In the pseudocode, the last filter I described looks like this:

[plain]if the filter is executed by the master // can block
    // push pending items first
    while intermediate queue is not empty
        pop an item from the top on intermediate queue
        push this item into the bounded output queue
    push the item received as the argument into the bounded output queue
else // the filter executed by a worker; should not block
    // push pending items first
    while intermediate queue is not empty
        read an item from the top of intermediate queue (but not pop it)
        try_push this item into the bounded output queue
        if try_push succeeded
            pop from the top of intermediate queue
        else
            break the while loop
    // process the item received as the argument
    if intermediate queue is empty // i.e. no more pending items
        try_push the received item into the bounded output queue
        if try_push succeeded
            return
    push the item into intermediate queue
return
[/plain]


The non-blocking section might be simplified if first of all the received item is unconditionally pushed to the intermediate queue; then you can just process the queue in the loop. It's little bit suboptimal execution-wise, but the code will be simpler.

View solution in original post

0 Kudos
34 Replies
RafSchietekat
Valued Contributor III
1,914 Views
If the alternative is to block the input filter, you might as well block the output filter: either deprives TBB of one worker thread (that's not to say there isn't room for improvement!). You might be surprised to learn that this maximum number of tokens in flight is based on a ticket system itself (you say ticket, TBB says token, let's...), and that a pipeline has internal queues before non-input serial filters, but it's not trivial to plug into that, although you might also decide to set the pipeline maximum number of tokens in flight (quite the mouthful!) to x and do away with your own concurrent_queue, if that is appropriate for the situation. It's far from ideal, but this should get you... unblocked, so to say.
0 Kudos
nagy
New Contributor I
1,914 Views
I was not aware that the input filter is run on a tbb taskpool thread. I had the impression that the input filter was run on the same thread that pipeline::run was called. If this is not the case then yes, I might aswell block in the outputfilter.
This is a problem for me since in my application I could end up blocking all of the tbb taskpool threads... how would I solve that?
EDIT:
Could I move the blocking filter (output) to be executed on its own thread, and avoids blocking a tbb taskpool thread? thread_bound_filter?
0 Kudos
RafSchietekat
Valued Contributor III
1,914 Views

" was not aware that the input filter is run on a tbb taskpool thread. I had the impression that the input filter was run on the same thread that pipeline::run was called. If this is not the case then yes, I might aswell block in the outputfilter."
I consider the pipeline::run() thread to be part of the pool, assuming that you even know on which thread it's running, because it is one of the 1+(N-1) threads that participate approximately on an equal basis in processing and stealing on a system with N hardware threads if automatic initialisation is not overridden (by default this creates N-1 additional threads), which is why I'm not sure there's even a meaningful distinction between master and worker. So if you block any filter, only N-1 threads are available, and it's not really relevant whether the original master is among them or not. The situation will probably change somewhat when tasks get segregated by master thread (I don't know yet how that's going to work out), but for now that's what you have.

"This is a problem for me since in my application I could end up blocking all of the tbb taskpool threads... how would I solve that?"
Exactly: it would only work if TBB only has that pipeline to run, or if you require a certain level of parallelism, which is why I called this "far from ideal".

"EDIT:
Could I move the blocking filter (output) to be executed on its own thread, and avoids blocking a tbb taskpool thread? thread_bound_filter?"
That's not what a thread_bound_filter does (unless I'm mistaken), but you could perhaps assume that this filter is almost always in a blocked state and initialise TBB with an additional thread, if you want to go that way. But then you'll incur additional overhead related to thread scheduling, I'm afraid (right?).

But I don't know the overall setup...

0 Kudos
nagy
New Contributor I
1,914 Views
This is a rather big problem for me.
Basicly what ive got is a producer/consumer setup. Where the producer reads in a file and processes several "packets" in different steps. These packets are then consumed by the consumer which runs on its own thread (legacy code) andis NOT part of the pipeline, thus does not take part in the "tokensystem". This means that i have to block the last filter (or the first, as i did previously) in the pipeline when the consumer is slower in order to not get too many active tokens.
The big problem is that i can have several of these producer/consumer setups running at the same time. Which means that it is very probable that I will end up blocking the entire tbb threadpool, and since I dont know how many of these will run i cannot start tbb with N + X threads. Not to mention that it could deadlock the entire application if the consumer starts using tbb tasks.
What id like is to be able to call pipeline::run on a thread i explicitly create and then somehow have the outputfilter run on this thread (or maybe another thread i explicitly create). If the outputfilter blocks then maybe there would be no problem since the blocking thread is not part of the tbb thread pool.
EDIT:
The input filter is also reading from a file, which also would be a good idea to have on its own thread?
0 Kudos
RafSchietekat
Valued Contributor III
1,914 Views
I've had another look at thread_bound_filter, and it does seem like you could run each pipeline in a separate thread and run all the output filters on the same thread (the one that starts the pipelines). But running an unbounded number of pipelines would lead to oversubscription, and avoiding that is one of the reasons to turn to TBB...
0 Kudos
nagy
New Contributor I
1,914 Views
on the same thread (the one that starts the pipelines)
The documentation says "The thread that services a thread_bound_filter must not be the thread thatcalls pipeline::run()". I remember seeing somewhere something like pipeline.start(), pipeline.end(), but i can't find anything like that in the documentation or the source files. According to the reference I would need to have an additional thread for servicing a thread_bound_filter.
Could I have a single thread that all output filters run on and does all the blocking? Using something like "WaitForMultipleObjects", is this possible with tbb?

0 Kudos
RafSchietekat
Valued Contributor III
1,914 Views
Sorry, I meant "the one that starts the threads that run the pipelines" (really, I did), but even without the oversubscription it would have to be a polling solution (for both pipelines and concurrent_queues, with sort of a poor man's rendez-vous for each pair), which doesn't seem very appealing.

Also, I'm wondering how to even tune TBB to coexist with user threads.

I think I'll bench myself on this one, for now anyway.
0 Kudos
Alexey-Kukanov
Employee
1,914 Views
Quoting nagy
Could I move the blocking filter (output) to be executed on its own thread, and avoids blocking a tbb taskpool thread? thread_bound_filter?


Yes thread-bound filter is for some filter to be executed on its own thread.

However I am not sure why would not you just use a regular pipeline, with the limit to the maximal number of tokens set to the value you need. At glance, it should do exactly what you want.

Edited: Oh I see, you want to run several pipelines at the same time. Indeed, this is not well supported in TBB at the moment because pipeline.run() is a blocking call so you'd have to start each pipeline in its own thread. I will think whether there is a better solution to suggest toyou.

0 Kudos
Anton_Pegushin
New Contributor II
1,914 Views
Hi, initially when I just read the post I thought that putting a token into a bounded queue with a preset capacity during the last pipeline stagewas some sort of a design feature. But then you're saying that you just want to limit the number of tokens being passed through the pipeline to avoid memory overuse. This is exactly what the argument for pipeline::run(num_of_tokens) is for and it seems like the ideal solution to the problem you're trying to solve would be to pull the "consumer" functionality right into the pipeline and make it the last stage. Or else if this is some untouchable legacy code we're talking about you could wrap it with a "consumer"-proxy that talks directly to the consumer and provides it with tokens in a synchronous manner one by one and add consumer-proxy as the last stage of the pipeline.
0 Kudos
Alexey-Kukanov
Employee
1,914 Views
Quoting nagy
This is a rather big problem for me.
Basicly what ive got is a producer/consumer setup. Where the producer reads in a file and processes several "packets" in different steps. These packets are then consumed by the consumer which runs on its own thread (legacy code) andis NOT part of the pipeline, thus does not take part in the "tokensystem". This means that i have to block the last filter (or the first, as i did previously) in the pipeline when the consumer is slower in order to not get too many active tokens.
The big problem is that i can have several of these producer/consumer setups running at the same time. Which means that it is very probable that I will end up blocking the entire tbb threadpool, and since I dont know how many of these will run i cannot start tbb with N + X threads. Not to mention that it could deadlock the entire application if the consumer starts using tbb tasks.
What id like is to be able to call pipeline::run on a thread i explicitly create and then somehow have the outputfilter run on this thread (or maybe another thread i explicitly create). If the outputfilter blocks then maybe there would be no problem since the blocking thread is not part of the tbb thread pool.
EDIT:
The input filter is also reading from a file, which also would be a good idea to have on its own thread?


Let me ask some questions (and maybe more will follow after your answers). What is the existing (sequential?) way to do the job? Does your code run several producer/consumer patterns at the same time? If so, does every producerrun in its own thread, or all in a single thread?

0 Kudos
nagy
New Contributor I
1,914 Views
That got me thinking.
Im unsure if it would work to do the consumer as a pipeline stage since the consumer in my case could sleep between packets. The feature im writing for our application is a video player and the "consumer"/"display" thread has to sleep to sync the framerate which is why i have a bounded queue "in front" of the consumer.
Sorry too bring new details into play, but I try to keep it as simple as possible most of the time.

However instead of blocking the output filter I believe I should simply stop the input filter tasks from being spawned while the buffer to the consumer is full. I would need some way to externally control the pipelines active token count (one option to control this is as we said earlier, an blocking thread_bound_filter, at the cost ofover subscription). Maybesome kindof special last filter stage, unsure how that would work? Or maybe i could manually control how the input filter tasks are spawned?
SobasicallyI would need a way to control how the pipeline spawns input filter tasks.
0 Kudos
Anton_Pegushin
New Contributor II
1,914 Views
Hi, in my understanding if the allowed number of tokens does not change during the application runtime, then you don't need to be able to control it externally during runtime - just set it once, when you start up the pipeline.
Unless I'm missing something here, it totally makes sense to pull the consumer into the pipeline as the last stage now. First - you get rid of the over-subscription by removing this additional "consumer" thread that you have now. Second - you implement this last consumer stage as a serial_in_order stage and you put all of the frame rate sync code inside operator(). Then, when you start pipeline, you just specify "x" as an input argument to the pipeline.run() function. And this turns pipeline into the functionality that you want, plus the queue that you want right in front of the last "consumer" stage. Does that make sense?
0 Kudos
Alexey-Kukanov
Employee
1,914 Views

To me, it does make sense - but onlyas long as there is just one pipeline. With multiple such things (that were mentioned once), it might not work.

Side note: I am sorry for setting "1 star" rating to your above post; it was an accidental click.

0 Kudos
nagy
New Contributor I
1,914 Views
This is starting to sound pretty good.
However there is one last issue. If I put the consumer into the pipeline then the last stage in the pipeline will occasionally sleep/block to sync the framerate, thus still taking up an entire tbb taskpool thread. If I run several pipelines this would be a problem? Only way I can see avoiding this is with external active token control, or have I missed something?
0 Kudos
RafSchietekat
Valued Contributor III
1,914 Views

#12 "Does that make sense?"
That was basically my suggestion in #1 (use existing flow control and queue), but as things are, to avoid taking worker threads away from TBB, I think you'll have to poll with a thread_bound_filter or dedicate a new thread (with all associated static and dynamic overhead), unless you have another idea that uses TBB as-is (I still have a few ideas, but they would require some non-trivial changes, and it would still be unclear how that would work out in the uncontrolled environment that has those user-thread consumers)?

It's a question with broader significance: how does TBB behave if it doesn't "own" the whole machine? Even two separate processes both using TBB don't make any attempt to coordinate their efforts to avoid oversubscription, so what does that do for performance?

0 Kudos
nagy
New Contributor I
1,914 Views
I'm unsure if its allowed to modify any of the tbb files. Also I can only read the header files (are the source files available somewhere so I can see how the pipeline works?).
But what I could do is to modify the pipeline.h and expose two methods in the pipeline and filter classes called "inc_token_count" and "dec_token_count" which modify the pipeline "token_counter". When the output filter pushes an item into the buffer it would call "inc_token_count" (to negate the dec that the pipeline does internally) and when the consumer pulls an item from the buffer a callback would be called which calls "dec_token_count". The "token_counter" seems to be atomic, would such an operation be supported by the pipeline?
EDIT:
swapped inc and dec in a few places
0 Kudos
RafSchietekat
Valued Contributor III
1,914 Views

The token_counter does something else, and it's not as simple as modifying the other atomic.

0 Kudos
nagy
New Contributor I
1,914 Views
indeed, I found the correct download with the source code.
EDIT:
Been reading through the source, unsure if I will be able to modify it to suit my needs. I will at least try if there are no better solutions? Any hints or tips are appreciated.
0 Kudos
Anton_Pegushin
New Contributor II
1,914 Views
Second what Raf is suggesting. If you're OK with pulling the consumer into the pipeline, but don't want to take out one of the TBB threads on frame rate sync, just make that last stage an in-order thread_bound_filter and have it working on an external thread. Setting "x" as an argument to pipeline.run() will set your memory limits and if consumer filter-thread sleeps and is too slow the TBB threads will build up a queue internally of exactly x tokens before TBB decides to pause the input filter.
0 Kudos
nagy
New Contributor I
1,817 Views
Ok, this is probably the best solution for now. Thank you all for the help.

I'd like to request a reserve/unreserve token feature,with the properties I've suggested in this thread, in future version of the pipeline.
0 Kudos
Reply