- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
We have a program that submits tasks to tbb from multiple threads. In one of our unit tests I used a parallel_for to test this functionality and found that occasionally tbb deletes a task without executing it. I have attached a smallproject that reproduces this behavior with both tbb30_20100406oss_win (vc10) and tbb22_20090809oss_win (vc9). Please let me know if I am doing something wrong or if I should submit a bug report.
Thanks!
Thanks!
[cpp]#include "tbb/task.h" #include "assert.h" #include "tbb/parallel_for.h" #include "tbb/task_scheduler_init.h" #include "tbb/tbb_thread.h" #include "tbb/atomic.h" #include// A task that triggers an assert if it is deleted without being execute()ed class TestTask : public tbb::task { public: tbb::atomic _taskWasExecuted; TestTask( ) { _taskWasExecuted = false; } virtual task* execute() { _taskWasExecuted = true; return NULL; } ~TestTask() { if(!_taskWasExecuted) { _asm { int 3; // user breakpoint } } assert(_taskWasExecuted); } }; // Submit many tasks from multiple threads struct TbbMultiSubmit { void operator()( const tbb::blocked_range & range ) const { for( int i = range.begin(); i < range.end(); i++) { tbb::task& tbbTask = *( new( tbb::task::allocate_root() ) TestTask() ); tbbTask.self().spawn( tbbTask ); } } TbbMultiSubmit() { } }; int main(int argc, char* argv[]) { tbb::task_scheduler_init init; // Run the test 1000 times for (int i = 0; i < 1000; i++) { std::cout << i << std::endl; tbb::parallel_for( tbb::blocked_range ( 0, 1000 ), TbbMultiSubmit() ); } return 0; } [/cpp]
Link Copied
5 Replies
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You do not wait for completion of spawned tasks. Is this intended?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
My originalapplication unit testused condition_variables to synchronize the tasks. I had omitted the waitin the test code since the test reproduced the same error.I have updated the codebelow to use a counter with a condition_variable to wait for the tasks to complete, but some tasks are still deleted without being executed.
[cpp]#include "tbb/task.h" #include "assert.h" #include "tbb/parallel_for.h" #include "tbb/task_scheduler_init.h" #include "tbb/tbb_thread.h" #include "tbb/atomic.h" #include#include "tbb/compat/condition_variable" #include "tbb/mutex.h" // Count the number of tasks that have finished unsigned int gs_taskCounter; // Mutex to guard gs_taskCounter tbb::mutex gs_taskCounterMutex; // Condition that is signaled when gs_taskCounter changes std::condition_variable gs_taskCounterCond; // A task that triggers an assert if it is deleted without being execute()ed class TestTask : public tbb::task { public: tbb::atomic _taskWasExecuted; TestTask( ) { _taskWasExecuted = false; } virtual task* execute() { _taskWasExecuted = true; { std::unique_lock<:MUTEX> lk(gs_taskCounterMutex); gs_taskCounter++; gs_taskCounterCond.notify_all(); } return NULL; } ~TestTask() { if(!_taskWasExecuted) { _asm { int 3; // user breakpoint } } assert(_taskWasExecuted); } }; // Submit many tasks from multiple threads struct TbbMultiSubmit { void operator()( const tbb::blocked_range & range ) const { for( int i = range.begin(); i < range.end(); i++) { tbb::task& tbbTask = *( new( tbb::task::allocate_root() ) TestTask() ); tbb::task::spawn( tbbTask ); } } TbbMultiSubmit() { } }; int main(int argc, char* argv[]) { tbb::task_scheduler_init init; // Run the test 1000 times for (int i = 0; i < 1000; i++) { std::cout << i << std::endl; const unsigned int NUM_TASKS = 1000; // reset the task counter { std::unique_lock<:MUTEX> lk(gs_taskCounterMutex); gs_taskCounter = 0; } // Spawn tasks from multiple threads tbb::parallel_for( tbb::blocked_range ( 0, NUM_TASKS ), TbbMultiSubmit()); // wait for tasks to complete std::unique_lock<:MUTEX> lk(gs_taskCounterMutex); while(gs_taskCounter < NUM_TASKS) { gs_taskCounterCond.wait(lk); } } return 0; } [/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I did not mean explicit synchronization via condition variable or alike, but TBB's wait_for_allloops.
Thanks for the case. I understand now what is happening there, however in order to help you best, I need to understand what you try to achieve.
So, why do you want to allocate every task as a root on its own, and not wait for task completion in the same scope where you spawned it?
We call such tasks "fire-and-forget" (FAF), and those are rather a corner case for TBB, therefore additional care should be taken.
What happens is that by default, the task at its creation is associated with the task_group_context object which is "active" in the current execution scope. When you create tasks inside the parallel_for() body, they become associated with the context object allocated on the stack of parallel_for(). That context then gets destroyed, and its memory referenced by FAF tasks can be rewritten. If that happened to the moment when a spawned task is about to start its execution, the scheduler can read a garbage value, interpret it as the command to cancel execution of this task, and proceed directly to task destruction.
Thanks for the case. I understand now what is happening there, however in order to help you best, I need to understand what you try to achieve.
So, why do you want to allocate every task as a root on its own, and not wait for task completion in the same scope where you spawned it?
We call such tasks "fire-and-forget" (FAF), and those are rather a corner case for TBB, therefore additional care should be taken.
What happens is that by default, the task at its creation is associated with the task_group_context object which is "active" in the current execution scope. When you create tasks inside the parallel_for() body, they become associated with the context object allocated on the stack of parallel_for(). That context then gets destroyed, and its memory referenced by FAF tasks can be rewritten. If that happened to the moment when a spawned task is about to start its execution, the scheduler can read a garbage value, interpret it as the command to cancel execution of this task, and proceed directly to task destruction.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have some code that is working with an existing thread pool, and sinceI amincorporating tbb into the code baseIwant to modifymy thread pool class towrap the existingtasks (e.g. MyTask)with atbb task and run them under tbb to avoid oversubscription from having two separate thread pools.
My first guess was to run each of our non-tbb tasks as a separate tbb root task and avoid modifying the existing synchronization logic, and that was how I ran into this issue.
It sounds like we need to associate the FAF tasks with a tbb task_group_contextthat has a longerlifetime than the FAF task. Maybe I can createa task_group_context associated with MyThreadPool and spawn the FAF tasks as children of that task_group_context...
My first guess was to run each of our non-tbb tasks as a separate tbb root task and avoid modifying the existing synchronization logic, and that was how I ran into this issue.
It sounds like we need to associate the FAF tasks with a tbb task_group_contextthat has a longerlifetime than the FAF task. Maybe I can createa task_group_context associated with MyThreadPool and spawn the FAF tasks as children of that task_group_context...
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I modified the test code to illustrate the MyTask and MyTaskExecutor case that I was describing. To ensure that the parent of my FAF tasks has a long enough lifetime, I am using an empty_task parent that is created by the thread that owns the MyTaskExecutor and creating the FAF tasks as children of the empty_task. This appears to work well.
It makes sense that the root tasks allocated in a thread would be associated with a task_group_context for that thread. I wonder if it would be possible to trigger a warning or assert if a task_group_context is destroyed before all of the 'root tasks' within it are joined. It might help catch some issues where people use task::allocate_root when they should have done something else. I can see how it might be tricky to catch this though.
Thank youfor helping resolving this issue!
[cpp]#include "tbb/task.h" #include "assert.h" #include "tbb/parallel_for.h" #include "tbb/task_scheduler_init.h" #include "tbb/tbb_thread.h" #include "tbb/atomic.h" #include "tbb/concurrent_queue.h" #include// MyTask Interface class MyTask { public: // execute the task virtual void execute() = 0; // release reference to task virtual void release() = 0; virtual ~MyTask() {} }; // Executor that wraps MyTasks with TBB fire-and-forget (FAF) tasks class MyTaskExecutor { // dummy parent to hold tbb tasks tbb::empty_task* _parent; public: MyTaskExecutor() { // parent is allocated in the task_group_context // of the thread that owns the TestExecutor _parent = new( tbb::task::allocate_root() ) tbb::empty_task; _parent->set_ref_count( 1 ); } ~MyTaskExecutor() { // Destructor waits for any remaining tasks to complete _parent->wait_for_all(); _parent->destroy(*_parent); } void submit( MyTask* myTask ) { // create a task wrapper and submit to tbb for execution tbb::task& tbbTask = *( new( _parent->allocate_child() ) TbbTaskWrapper(*this, myTask) ); _parent->increment_ref_count(); _parent->spawn( tbbTask ); } protected: // Wrap a MyTask in a tbb::task for execution class TbbTaskWrapper : public tbb::task { public: // Task to execute MyTask* _myTask; // executor that owns this task MyTaskExecutor& _executor; // Test to verify tbb executes task before deleting it tbb::atomic _taskWasExecuted; TbbTaskWrapper( MyTaskExecutor& executor, MyTask* myTask ): _executor( executor), _myTask( myTask ) { _taskWasExecuted = false; } virtual task* execute() { _taskWasExecuted = true; _myTask->execute(); return NULL; } ~TbbTaskWrapper() { if(!_taskWasExecuted) { _asm { int 3; // user breakpoint } } assert(_taskWasExecuted); // release MyTask _myTask->release(); } }; }; class TestTask : public MyTask { // execute the task virtual void execute() { } // release reference to task virtual void release() { delete this; } }; // Submit many tasks from multiple threads struct TbbMultiSubmit { MyTaskExecutor& _executor; void operator()( const tbb::blocked_range & range ) const { for( int i = range.begin(); i < range.end(); i++) { _executor.submit( new TestTask() ); } } TbbMultiSubmit( MyTaskExecutor& executor ): _executor( executor ) {} }; int main(int argc, char* argv[]) { tbb::task_scheduler_init init; // Run the test 1000 times for (int i = 0; i < 1000; i++) { std::cout << i << std::endl; MyTaskExecutor executor; tbb::parallel_for( tbb::blocked_range ( 0, 1000 ), TbbMultiSubmit( executor ) ); // ~TestExecutor() waits for FAF tasks to complete } return 0; } [/cpp]

Reply
Topic Options
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page