I thinkit makes sense to step back, and base on your use case which seems to be well described in the above quote.
You want to use bounded (and so blocking) queue as the interface between the pipeline-based producer and thread-based legacy consumer. It is expected that producer fills the queue faster than the consumer drains it; in this case, the pipeline should block and wait. On the other hand, there might be several simultaneous pipelines working at the same time, so blocking a TBB worker thread is undesirable. Ideally, the master thread that started the pipeline should be the one that blocks.
Unfortunately, the current TBB pipeline implementation has the problems of idle spinning here and there, in particular with thread-bound filters. Basically, the only way to avoid idle spinning in situations when the pipeline can neither proceed nor finish is to make the master thread do a blocking call, and let worker threads go asleep due to absence of available tasks.
Now what I can suggest is to recognize which thread executes the last pipeline stage that should push an iteminto the queue, and do different things depending on that. In a sense, it's like converting a non-thread-bound filter toact likethread-bound. To understand which thread runs the filter, you might use tbb_thread::id (see sections 12.2 and 12.3.1 in TBB 2.2 Reference Manual) - I hate to say this as we always argue for "thread-agnostic" parallel programming, but after all it's TBB issues that require such a workaround. If the master thread executes the filter, it uses the blocking push() method of the queue, so that it blocks when the queue is full. If the worker thread executes the fliter, it should use the non-blocking try_push() method. The question is, what to do when try_push fails. So far, there is no way to tell the pipeline that the filter failed to process the current token, and it should be re-attempted. So the solution I see is to use an intermediate queue in the filter for such items. As the lastfilter should be serial, std::queue would work there, without any additional locking.
In the pseudocode, the last filter I described looks like this:
[plain]if the filter is executed by the master // can block // push pending items first while intermediate queue is not empty pop an item from the top on intermediate queue push this item into the bounded output queue push the item received as the argument into the bounded output queue else // the filter executed by a worker; should not block // push pending items first while intermediate queue is not empty read an item from the top of intermediate queue (but not pop it) try_push this item into the bounded output queue if try_push succeeded pop from the top of intermediate queue else break the while loop // process the item received as the argument if intermediate queue is empty // i.e. no more pending items try_push the received item into the bounded output queue if try_push succeeded return push the item into intermediate queue return [/plain]
The non-blocking section might be simplified if first of all the received item is unconditionally pushed to the intermediate queue; then you can just process the queue in the loop. It's little bit suboptimal execution-wise, but the code will be simpler.
" was not aware that the input filter is run on a tbb taskpool thread. I had the impression that the input filter was run on the same thread that pipeline::run was called. If this is not the case then yes, I might aswell block in the outputfilter."
I consider the pipeline::run() thread to be part of the pool, assuming that you even know on which thread it's running, because it is one of the 1+(N-1) threads that participate approximately on an equal basis in processing and stealing on a system with N hardware threads if automatic initialisation is not overridden (by default this creates N-1 additional threads), which is why I'm not sure there's even a meaningful distinction between master and worker. So if you block any filter, only N-1 threads are available, and it's not really relevant whether the original master is among them or not. The situation will probably change somewhat when tasks get segregated by master thread (I don't know yet how that's going to work out), but for now that's what you have.
"This is a problem for me since in my application I could end up blocking all of the tbb taskpool threads... how would I solve that?"
Exactly: it would only work if TBB only has that pipeline to run, or if you require a certain level of parallelism, which is why I called this "far from ideal".
Could I move the blocking filter (output) to be executed on its own thread, and avoids blocking a tbb taskpool thread? thread_bound_filter?"
That's not what a thread_bound_filter does (unless I'm mistaken), but you could perhaps assume that this filter is almost always in a blocked state and initialise TBB with an additional thread, if you want to go that way. But then you'll incur additional overhead related to thread scheduling, I'm afraid (right?).
But I don't know the overall setup...
on the same thread (the one that starts the pipelines)
Yes thread-bound filter is for some filter to be executed on its own thread.
However I am not sure why would not you just use a regular pipeline, with the limit to the maximal number of tokens set to the value you need. At glance, it should do exactly what you want.
Edited: Oh I see, you want to run several pipelines at the same time. Indeed, this is not well supported in TBB at the moment because pipeline.run() is a blocking call so you'd have to start each pipeline in its own thread. I will think whether there is a better solution to suggest toyou.
Let me ask some questions (and maybe more will follow after your answers). What is the existing (sequential?) way to do the job? Does your code run several producer/consumer patterns at the same time? If so, does every producerrun in its own thread, or all in a single thread?
To me, it does make sense - but onlyas long as there is just one pipeline. With multiple such things (that were mentioned once), it might not work.
Side note: I am sorry for setting "1 star" rating to your above post; it was an accidental click.
#12 "Does that make sense?"
That was basically my suggestion in #1 (use existing flow control and queue), but as things are, to avoid taking worker threads away from TBB, I think you'll have to poll with a thread_bound_filter or dedicate a new thread (with all associated static and dynamic overhead), unless you have another idea that uses TBB as-is (I still have a few ideas, but they would require some non-trivial changes, and it would still be unclear how that would work out in the uncontrolled environment that has those user-thread consumers)?
It's a question with broader significance: how does TBB behave if it doesn't "own" the whole machine? Even two separate processes both using TBB don't make any attempt to coordinate their efforts to avoid oversubscription, so what does that do for performance?