Community
cancel
Showing results for 
Search instead for 
Did you mean: 
disti_bbaker
Beginner
57 Views

tbb deletes task without executing it (Bug?)

We have a program that submits tasks to tbb from multiple threads. In one of our unit tests I used a parallel_for to test this functionality and found that occasionally tbb deletes a task without executing it. I have attached a smallproject that reproduces this behavior with both tbb30_20100406oss_win (vc10) and tbb22_20090809oss_win (vc9). Please let me know if I am doing something wrong or if I should submit a bug report.

Thanks!

[cpp]#include "tbb/task.h"
#include "assert.h"
#include "tbb/parallel_for.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tbb_thread.h"
#include "tbb/atomic.h"

#include 

// A task that triggers an assert if it is deleted without being execute()ed
class TestTask : public tbb::task
{
public:
	tbb::atomic _taskWasExecuted;

    TestTask( )
    {
        _taskWasExecuted = false;
    }

    virtual task* execute()
    {
        _taskWasExecuted = true;
        return NULL;
    }

    ~TestTask()
    {
        if(!_taskWasExecuted)
        {
            _asm
            {
                int 3; // user breakpoint
            }
        }
        assert(_taskWasExecuted);
    }
};

// Submit many tasks from multiple threads
struct TbbMultiSubmit {
    void operator()( const tbb::blocked_range& range ) const
    {
        for( int i = range.begin(); i < range.end(); i++)
        {
            tbb::task& tbbTask = *( new( tbb::task::allocate_root() ) TestTask() );
            tbbTask.self().spawn( tbbTask );
        }
    }
    TbbMultiSubmit()
    {
    }
};


int main(int argc, char* argv[])
{
    tbb::task_scheduler_init init;

    // Run the test 1000 times
    for (int i = 0; i < 1000; i++)
    {
        std::cout << i << std::endl;
        tbb::parallel_for( tbb::blocked_range( 0, 1000 ), TbbMultiSubmit() );
    }

    return 0;
}
[/cpp]
0 Kudos
5 Replies
Alexey_K_Intel3
Employee
57 Views

You do not wait for completion of spawned tasks. Is this intended?

disti_bbaker
Beginner
57 Views

My originalapplication unit testused condition_variables to synchronize the tasks. I had omitted the waitin the test code since the test reproduced the same error.I have updated the codebelow to use a counter with a condition_variable to wait for the tasks to complete, but some tasks are still deleted without being executed.

[cpp]#include "tbb/task.h"
#include "assert.h"
#include "tbb/parallel_for.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tbb_thread.h"
#include "tbb/atomic.h"

#include 

#include "tbb/compat/condition_variable"
#include "tbb/mutex.h"

// Count the number of tasks that have finished
unsigned int gs_taskCounter;
// Mutex to guard gs_taskCounter
tbb::mutex gs_taskCounterMutex;
// Condition that is signaled when gs_taskCounter changes
std::condition_variable gs_taskCounterCond;

// A task that triggers an assert if it is deleted without being execute()ed
class TestTask : public tbb::task
{
public:
	tbb::atomic _taskWasExecuted;

    TestTask( )
    {
        _taskWasExecuted = false;
    }

    virtual task* execute()
    {
        _taskWasExecuted = true;
        {
            std::unique_lock<:MUTEX> lk(gs_taskCounterMutex);
            gs_taskCounter++;
            gs_taskCounterCond.notify_all();
        }
        return NULL;
    }

    ~TestTask()
    {
        if(!_taskWasExecuted)
        {
            _asm
            {
                int 3; // user breakpoint
            }
        }
        assert(_taskWasExecuted);
    }
};

// Submit many tasks from multiple threads
struct TbbMultiSubmit {
    void operator()( const tbb::blocked_range& range ) const
    {
        for( int i = range.begin(); i < range.end(); i++)
        {
            tbb::task& tbbTask = *( new( tbb::task::allocate_root() ) TestTask() );
            tbb::task::spawn( tbbTask );
        }
    }
    TbbMultiSubmit()
    {
    }
};

int main(int argc, char* argv[])
{
    tbb::task_scheduler_init init;

    // Run the test 1000 times
    for (int i = 0; i < 1000; i++)
    {
        std::cout << i << std::endl;

        const unsigned int NUM_TASKS = 1000;

        // reset the task counter
        {
            std::unique_lock<:MUTEX> lk(gs_taskCounterMutex);
            gs_taskCounter = 0;
        }

        // Spawn tasks from multiple threads
        tbb::parallel_for( tbb::blocked_range( 0, NUM_TASKS ), TbbMultiSubmit());

        // wait for tasks to complete
        std::unique_lock<:MUTEX> lk(gs_taskCounterMutex);
        while(gs_taskCounter < NUM_TASKS)
        {
            gs_taskCounterCond.wait(lk);
        }
    }

    return 0;
}
[/cpp]

Alexey_K_Intel3
Employee
57 Views

I did not mean explicit synchronization via condition variable or alike, but TBB's wait_for_allloops.

Thanks for the case. I understand now what is happening there, however in order to help you best, I need to understand what you try to achieve.
So, why do you want to allocate every task as a root on its own, and not wait for task completion in the same scope where you spawned it?

We call such tasks "fire-and-forget" (FAF), and those are rather a corner case for TBB, therefore additional care should be taken.

What happens is that by default, the task at its creation is associated with the task_group_context object which is "active" in the current execution scope. When you create tasks inside the parallel_for() body, they become associated with the context object allocated on the stack of parallel_for(). That context then gets destroyed, and its memory referenced by FAF tasks can be rewritten. If that happened to the moment when a spawned task is about to start its execution, the scheduler can read a garbage value, interpret it as the command to cancel execution of this task, and proceed directly to task destruction.
disti_bbaker
Beginner
57 Views

I have some code that is working with an existing thread pool, and sinceI amincorporating tbb into the code baseIwant to modifymy thread pool class towrap the existingtasks (e.g. MyTask)with atbb task and run them under tbb to avoid oversubscription from having two separate thread pools.

My first guess was to run each of our non-tbb tasks as a separate tbb root task and avoid modifying the existing synchronization logic, and that was how I ran into this issue.

It sounds like we need to associate the FAF tasks with a tbb task_group_contextthat has a longerlifetime than the FAF task. Maybe I can createa task_group_context associated with MyThreadPool and spawn the FAF tasks as children of that task_group_context...
disti_bbaker
Beginner
57 Views

I modified the test code to illustrate the MyTask and MyTaskExecutor case that I was describing. To ensure that the parent of my FAF tasks has a long enough lifetime, I am using an empty_task parent that is created by the thread that owns the MyTaskExecutor and creating the FAF tasks as children of the empty_task. This appears to work well.

It makes sense that the root tasks allocated in a thread would be associated with a task_group_context for that thread. I wonder if it would be possible to trigger a warning or assert if a task_group_context is destroyed before all of the 'root tasks' within it are joined. It might help catch some issues where people use task::allocate_root when they should have done something else. I can see how it might be tricky to catch this though.

Thank youfor helping resolving this issue!

[cpp]#include "tbb/task.h"
#include "assert.h"
#include "tbb/parallel_for.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tbb_thread.h"
#include "tbb/atomic.h"
#include "tbb/concurrent_queue.h"

#include 

// MyTask Interface
class MyTask
{
public:
    // execute the task
    virtual void execute() = 0;
    // release reference to task
    virtual void release() = 0;
    
    virtual ~MyTask() {}
};

// Executor that wraps MyTasks with TBB fire-and-forget (FAF) tasks
class MyTaskExecutor
{
    // dummy parent to hold tbb tasks
    tbb::empty_task* _parent;
public:
    MyTaskExecutor()
    {
        // parent is allocated in the task_group_context
        // of the thread that owns the TestExecutor
        _parent = new( tbb::task::allocate_root() ) tbb::empty_task;
        _parent->set_ref_count( 1 );    
    }
    
    ~MyTaskExecutor()
    {
        // Destructor waits for any remaining tasks to complete
        _parent->wait_for_all();
        _parent->destroy(*_parent);
    }
    
    void submit( MyTask* myTask )
    {
        // create a task wrapper and submit to tbb for execution
        tbb::task& tbbTask = *( new( _parent->allocate_child() ) TbbTaskWrapper(*this, myTask) );
        _parent->increment_ref_count();
        _parent->spawn( tbbTask );
    }
    
protected:
    
    // Wrap a MyTask in a tbb::task for execution
    class TbbTaskWrapper : public tbb::task
    {
    public:
        // Task to execute
        MyTask* _myTask;
        // executor that owns this task
        MyTaskExecutor& _executor;

        // Test to verify tbb executes task before deleting it
        tbb::atomic _taskWasExecuted;

        TbbTaskWrapper( MyTaskExecutor& executor, MyTask* myTask ):
            _executor( executor),
            _myTask( myTask )
        {
            _taskWasExecuted = false;
        }

        virtual task* execute()
        {
            _taskWasExecuted = true;
            _myTask->execute();
            return NULL;
        }

        ~TbbTaskWrapper()
        {
            if(!_taskWasExecuted)
            {
                _asm
                {
                    int 3; // user breakpoint
                }
            }
            assert(_taskWasExecuted);

            // release MyTask
            _myTask->release();
        }
    };

};

class TestTask : public MyTask
{
    // execute the task
    virtual void execute()
    {
    }
    
    // release reference to task
    virtual void release()
    {
        delete this;
    }
};

// Submit many tasks from multiple threads
struct TbbMultiSubmit {

    MyTaskExecutor& _executor;
    
    void operator()( const tbb::blocked_range& range ) const
    {
        for( int i = range.begin(); i < range.end(); i++)
        {
            _executor.submit( new TestTask() );
        }
    }

    TbbMultiSubmit( MyTaskExecutor& executor ):
        _executor( executor )
    {}
};


int main(int argc, char* argv[])
{
    tbb::task_scheduler_init init;

    // Run the test 1000 times
    for (int i = 0; i < 1000; i++)
    {
        std::cout << i << std::endl;

        MyTaskExecutor executor;

        tbb::parallel_for( tbb::blocked_range( 0, 1000 ), TbbMultiSubmit( executor ) );

        // ~TestExecutor() waits for FAF tasks to complete
    }

    return 0;
}
[/cpp]
Reply