I have a problem concerning proper termination of pipelines and I'm running out of ideas on where the problem might be.
The setup is as follows. I have one class that handles loading and decompressing of video streams. This process is realised via the TBB pipeline mechanism. Once the start() method of my class is called, it begins loading and decompressing frames and delivers them to a follow-up component.
Calling start on the class spawns a thread which initialized the scheduler and calls pipeline.run() (which blocks until the pipeline is done). Calling stop on the class tells the first tbb::filter to stop loading frames and to return with NULL (to terminate the pipeline). There are multiple instances of this class running (with different input streams).
The problem I have is that once in a while when calling stop, the first filter returns with NULL but the pipeline is not stopped which results in the main thread method (the one that called pipeline::run()) not returning.
Inspecting the threads, there are a couple of tasks waiting :
but none of the threads hangs in the instance that blocks.
Any help is apreciated,
You've said that no other thrreads hang, but what do they do when you inspect them with the debugger? In particular the thread that called pipeline::run?
You've also said that there multipleinstances of your class running. Do you mean that there are multiple simultaneously running pipelines? If so, are they started each from its own thread?
There are multiple simultaneously running pipelines, each started from it's own thread.
After doing a lot of debugging, I found the problem but I don't quite understand the cause.
Debugging in the deadlock situation showed that one pipeline is not running in the thread it should be running. The following stack trace might make this a bit clearer (I omitted everything not relevant like code references and memory locations etc). Basically, the pipeline I started in #7 is not the one I'm in at #3 (TBBSinkFilter is the last filter in the pipeline).
[cpp]#0 in nanosleep () from /lib64/libc.so.6 #1 in usleep () from /lib64/libc.so.6 #2 in VideoMixerFilter::deliverPacket #3 in MovieFileSourceFilter::TBBSinkFilter::operator() #4 in tbb::internal::stage_task::execute () #5 in tbb::internal::CustomScheduler<:INTERNAL::INTELSCHEDULERTRAITS>::wait_for_all () from /opt/intel/tbb/current/lib/libtbb.so.2 #6 in tbb::pipeline::run () from /opt/intel/tbb/current/lib/libtbb.so.2 #7 in MovieFileSourceFilter::run #8 in non-virtual thunk to MovieFileSourceFilter::run() #9 in Thread::run #10 in Thread::StartThreadST #11 in start_thread () from /lib64/libpthread.so.0 #12 in clone () from /lib64/libc.so.6[/cpp]Any idea on how this can happen? Any help is apreciated :-)
This just means that thread has stealed 'pipeline task' from another thread. See there is 'CustomScheduler<:INTERNAL::INTELSCHEDULERTRAITS>::wait_for_all()' entry in the call stack, wait_for_all() function can steal and execute tasks from other threads.
I think that's Ok in itself.
Are you using Win32 or POSIX?
Where exactly are threads blocked? Whether they are blocked or spinning?
What is the value of GenericScheduler::arena->prefix().gate.state variable?
I'm using POSIX threads under Redhat Enterprise Linux 5 (64bit).
The problem I have is with thread termination. The stack trace above is from the thread that's supposed to terminate but it isn't, because the pipeline is still working. But that pipeline in the stack trace is belonging to a different MovieFileSourceFilter Instance and I don't understand why it's showing up in the stack trace of a totally different thread...
How do I query this variable you speak of and when should I query it ?
Again, this is the situation :
T1 Source1, Pipeline1
T2 Source2, Pipeline2
T3 Source3, Pipeline3
after a while it looks like this
T1 Source1, Pipeline1
T2 Source2, Pipeline2
T3 Source3, Pipeline2 !!!
and all I'm doing is stoping and starting the threads which in turn run the pipelines...
If the situation is really the way I think, then it's funny.
You start the thread to process the pipeline, then, when pipeline terminates, you are expecting that the pipeline::run() will return. Right?
It seems that some other thread has stolen some task from the pipeline. Then current thread start stealing while waiting for the stolen task to complete. The thread successfully steals some big 'self-respawning' task (other pipeline's task), and start processing it to the end.
This way pipeline::run() can not return while there is some work to do in the whole system.
Yes exactly and I really don't understand how something like this can happen. The pipelines should be autonomous and as far as I can see, I'm using them as described in the Refrence and Tutorial manuals. Is there a way to debug what happens in the scheduler because this is the only component in my system that knows about all the pipelines ?
I'm not totally sure how the scheduler works here. Despite the fact that I'm having an scheduler_init object in each of my "pipeline run threads", there is a common (static) pool of worker threads right ? So there is no way to make the piplines totally autonomous ?
Let me to explain what Dmitry tried to say in other words.
When you start a pipeline it produces a bunch of tasks, some of which may be stolen and executed by TBB worker threads.
When you start several pipelines each in a therad of its own, they produce a bunch of tasks each, some of which are stolen and executed by TBB worker threads.
So far so good.
Now you stop one of the pipelines. Its input filter returns zero, so no new tasks are produced. But there are still a few previously created tasks in the flight somewhere in the TBB thread pool. And the pipeline::run method cannot return until all the tasks it engenederd are finished.
Instead of idly waiting until all its old tasks finish, method run (or actually wait_for_all that was called inside it) goes and steals some work from other threads. It may be its own task (stolen back) (if you are lucky), but it also may be a task of another pipeline (more probably when you have several pipelines).
Since that other pipeline was not cancelled, that task generates new ones, and your original call to say Pipeline3's run() method gets trapped under a pile of work generated by the other pipeline. And until this other pipeline is stopped, our Pipeline 3 will not return from its method run.
Alex, am I right to understand that your pipelines are endless by themselves, and require an external signal to stop them? And until the first pipeline stops your program does not stop other ones?
I don't see how this can be accomplished with current pipeline API.
To support such usage pipeline API can be extended with asynchronous execution:
... // fill pipeline
HANDLE event = CreateEvent(...);
struct completion_task : public tbb::task
virtual void execute()
What do you think? Remember: continuation-based style is always more efficient ;)
Also such usage eliminates oversubscription of threads, i.e. if user creates 3 pipelines on 2-core system, only 2 threads will be working instead of 5.
Let me see... The program requires parallelism (with the current API, anyway). The scheduler supports (optional) parallelism. Twice the word "parallelism": we have a go! :-)
I don't see what this async_run is supposed to do: completion_task cannot be executed when the first filter is exhausted (that would not make a difference), it cannot be executed when the pipeline has been fully drained (that would not make a difference), so what did I miss?
But I think I've advocated a possible solution enough recently, repeating it would only make me sound like a broken record.
Do you think that FIFO processing will help here?
I think that the problem is not in order of processing. I beleive that all tasks from "deadlocked" pipeline have processed, but run() method just don't returns, because wait_for_all()'s loop spins processing other tasks and doesn't check parent task's reference counter.
Possible solution is to move check of parent task's reference counter into inner-most loop in wait_for_all(). Probably this must be done only for non-worker threads... and probably some other problems will emerge.
"Do you think that FIFO processing will help here?" Well, at least global progress, but it probably depends on how you want to balance latency vs. throughput.
"probably some other problems will emerge" That only seems likely if the root cause of the problem is not attacked.
If it won't help here, then how it can be a solution to the problem?..
The root cause of this problem is attacked (hypothesis), but some completely independent problems can arise. And root causes of those problems are, of course, *not* attacked, because nobody is aware of them now.
"If it won't help here, then how it can be a solution to the problem?" Ah, a band-aid... maybe when a pipeline is stopped an alarm can be set, and, if run() doesn't return soon enough, everything is stopped and restarted. Not very elegant, but it may do the trick.
"The root cause of this problem is attacked" No, I meant the real root cause. :-)
I think there are schemes that (1) are acceptable and (2) solve the problem at the same time. Don't you think so?
Work-stealing? I don't think that it's possible to create something viable w/o work-stealing. Yeah, there are things like work-distribution and work-requesting... but try to get them work in real-life... Oh, and there is scheduler based on just single fifo-queue...
"I think there are schemes that (1) are acceptable and (2) solve the problem at the same time. Don't you think so?" The latter generally implies the former in my book. :-)
"Work-stealing?" I never said that.
At the moment you've proposed (1) something secret (probably to use FIFO processing, which IMVHO won't solve the problem) and (2) to restart the process which is not acceptable... Probably you have 2 books...
Please, say explicitly what you mean. I would not consider you as a broken record.