Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

async callbacks

zturner
Beginner
1,113 Views
Suppose I have N arbitrary functions I wish to execute in parallel. Normally I would use parallel_invoke.


[cpp]tbb::parallel_invoke(func_1, func_2, ..., func_n);[/cpp]
However, in this case I want to return immediately and specify a callback to be invoked in arbitrary thread context when all work is complete. e.g.
[cpp]tbb::begin_parallel_invoke(completion_handler, func_1, func_2, ..., func_n);[/cpp]
Is there a way to achieve this? I feel like it may involve manipulating tasks directly, but I'm curious if anyone has yet solved this and has a simple generic solution.
Thanks
0 Kudos
19 Replies
RafSchietekat
Valued Contributor III
1,113 Views
If you require concurrency, bring your own thread, or a very good reason. Normally, a TBB program should work with any degree of available parallelism, including none (serial execution on a single hardware thread), and that implies a blocking parallel_invoke.
0 Kudos
zturner
Beginner
1,113 Views
Maybe I misunderstand, but I don't think that conflicts with what I was trying to achieve.
In the case of a single hardware thread, sure it can block, and then invoke my callback on the same thread. I think it's a hugely common pattern however to execute work in the background and then be notified via callback when said work is complete.
For example, suppose I'm writing a game and I want to read 10 large binary files from disk, process each one of them and build some complicated structure in each case. Once all 10 have been loaded, I can display some new object in the world. But I certainly don't want to delay my game loop just to wait for these files to finish loading. So I need to load them in the background, and be notified when work is complete. There are plenty of ways to do this, it would just be convenient if I could leverage tbb's already powerful framework for parallel algorithms and not have them block, but instead just notify me when they're complete.
Obviously the functions don't support this behavior out of the box, I'm just wondering how difficult it would be to achieve.
0 Kudos
RafSchietekat
Valued Contributor III
1,113 Views

I'm not sure that automatically blocking if only a single thread is available would be a valid solution.

TBB's scheduling algorithmassumes that tasks do not block for any significant fraction of their execution time, with direct use of threads being the recommended workaround.

(Added) Question (the reference isn't entirely clear): are enqueued tasks executed after a finite delay even if all the worker threads are kept busy with local work?

0 Kudos
Alexey-Kukanov
Employee
1,113 Views
I think you need to use tasks directly, and use a continuation task as the completion callback. The function tasks should be enqueued (which guarantees asynchronous execution), but the continuation task should not. It's reference counter should be equal to the number of function tasks that are its children, so that it is invoked automatically after all children complete.

(Added) Under "guarantees asynchronous execution", I mean that the enqueued tasksare executed by TBB worker thread(s) and so do not require the main thread to call a waiting function. However I do not mean any guarantees of making progress on enqueued tasks - as I noted below answering Raf, if worker threads are busy with other work (e.g. TBB algorithms) then enqueued tasks are unattended. Also, when an enqueued background task is executed, the worker thread won't switch to other tasks until done, as the task scheduler is still non-preemptive.
If you want more strict guarantees for simultaneous progress on both background and foreground work, use a separate thread for background work(e.g. std::thread in TBB 3.0) - the OS scheduler is preemptive and more fair, so to say.
0 Kudos
Alexey-Kukanov
Employee
1,113 Views
> (Added) Question (the reference isn't entirely clear): are enqueued tasks executed after a finite delay even if all the worker threads are kept busy with local work?

No, tasks in local pools take priority over enqueued tasks.
0 Kudos
zturner
Beginner
1,113 Views
Thanks for your response, I'm going to give this a try.
Our solution currently is to manually distribute work to different threads, but this is cumbersome for obvious reasons because once you have a sufficiently complicated situation, you end up almost reproducing a lot of the work that tbb already does for you.
0 Kudos
jimdempseyatthecove
Honored Contributor III
1,113 Views
You may have a couple of archetectural problems with this in TBB. If the parallel_invoke functor list contains Lambda functions, the Lambda closure objects are (or can be) on the stack. Therefore your new "begin_parallel_invoke(..." and continue, could not exit any scoping context prior to completion of the functions. This limits how you can use your new function. Locating it in the scope of your main loop may be the only option. A second problem (workable) is you would have to pass scope visible variables by value instead of reference (if you were to exit the scope of the begin_parallel_invoke).

Different threading tools do handle this situation. In QuickThread

qtControl Boogaloo;

void YourFunction(...
{
... // somewhere in your code
parallel_task(&Boogaloo, func_1);
...
parallel_task(&Boogaloo, func_n);
parallel_task(OnDone$, &Boogaloo, completion_handler);
... // continue executing here
return; // permitted before completion of Boogaloo
}

Although you can use Lambda functions in QuickThread, to do so in this situation could lead to the problem of the Lambda closure objects being created on the stack and then dissapearing during function execution.

You could get away with using the Lambdas provided you did not exit the scope of the invocation prior to completion (same thing with TBB)

void YourFunction(...
{
...
{ // scope
qtControl Boogaloo; // place control inside scope
// run a non-blocking task to perform the blocking invoke
parallel_task(&Boobaloo,
[&]{parallel_invoke(func_1, func_2, ..., func_n);});
// while above is running, post a completon routine
parallel_task(OnDone$, &Boobaloo, completion_handler);
// inside scope
while(YourMainLoopNotDone())
{
... code
}
// cleanup
} // end scope for qtControl Boogaloo; (blocks until all tasks complete)

The above would preserve the Lambda closure objects for the duration of the invocaton
(Note, the func_1, ... can be lambda as well).

Jim Dempsey

0 Kudos
zturner
Beginner
1,113 Views
The lambda closures exist on the stack, but couldn't the parallel_invoke function accept the lambda functor by value?
0 Kudos
Andrey_Marochko
New Contributor III
1,113 Views
TBB's parallel algorithms block until completion, and thus their stack frames remain intact during the parallel processing. Therefore they can safely accept lambdas by const ref.

But you suggestion is absolutely correct for the cases when a lambda is passed to the parallel entity that is not (or may not be) waited for in the current function. For example tbb::task_group copies lambda arguments of its run() methods.

The only other precaution is to capture local variables by value not by reference, but this is definitely pogrammer's responsibility :)
0 Kudos
zturner
Beginner
1,113 Views
Last question: Is there a non-zero chance of something like this ever being available out-of-the-box in a TBB distribution?
It certainly seems like a more common use case to want to do this with parallel_invoke, but I would imagine that someone somewhere might want to achieve these kind of semantics with parallel_scan, reduce, for, or whatever. I can reproduce the parallel_invoke semantics without too much difficulty with the suggestion of 2 enqueued tasks and a continuation task, but the other functions will be more difficult to accurately reproduce. And it doesn't seem like a terribly uncommon parallel programming pattern.
0 Kudos
Alexey-Kukanov
Employee
1,113 Views
Quoting zturner
Last question: Is there a non-zero chance of something like this ever being available out-of-the-box in a TBB distribution?
It certainly seems like a more common use case to want to do this with parallel_invoke, but I would imagine that someone somewhere might want to achieve these kind of semantics with parallel_scan, reduce, for, or whatever. I can reproduce the parallel_invoke semantics without too much difficulty with the suggestion of 2 enqueued tasks and a continuation task, but the other functions will be more difficult to accurately reproduce. And it doesn't seem like a terribly uncommon parallel programming pattern.

I think the thing you ask for is called std::thread :)

I.e. you start a thread that starts the blocking algorithm - invoke, for, reduce, scan, you name it. It blocks, but the main thread remains responsive. After the parallel algorithm completes, the thread calls any notification method you wish, and exits.

(added) And actually, you might do the same with task::enqueue. I.e. instead of creating a thread you create a task, and enqueue it. The task does the same what I described above.

0 Kudos
zturner
Beginner
1,113 Views
Well yes, it's just unfortunate to have to keep stopping & starting new O/S threads every time I want to do this, but it's certainly possible :)
0 Kudos
Alexey-Kukanov
Employee
1,113 Views
Right, that's whyI suggested to use task::enqueue the same way you'd use a thread. Should we create an std::thread-like interface over task::enqueue?
0 Kudos
zturner
Beginner
1,113 Views
Well, I could just be doing it wrong :)
I have come up with this code:
[cpp]template
class async_invoker : public tbb::task
{
public:
    async_invoker(Callback callback) 
        : callback_(*new(allocate_continuation()) tbb::internal::function_invoker(callback))
    {
    }

    tbb::task* execute()
    {
        return NULL;
    }

    void add_child(const Function& func)
    {
        children_.push_back(new(allocate_child()) tbb::internal::function_invoker(func));

        this->set_ref_count(children_.size());

        std::list<:INTERNAL::FUNCTION_INVOKER>*>::const_iterator it = children_.begin();
        std::list<:INTERNAL::FUNCTION_INVOKER>*>::const_iterator itEnd = children_.end();
        for (; it != itEnd; ++it)
        {
            tbb::internal::function_invoker* child = *it;
            this->enqueue(*child);
        }
    }

private:
    tbb::internal::function_invoker& callback_;
    std::list<:INTERNAL::FUNCTION_INVOKER>*> children_;
};

template
void async_invoke(const Callback& callback, const Function& f1, const Function& f2, const Function& f3)
{
    async_invoker& invoker = *new(tbb::task::allocate_root()) async_invoker(callback);
    invoker.add_child(f1);
    invoker.add_child(f2);
    invoker.add_child(f3);

    tbb::task::spawn(invoker);
}
[/cpp]
When I run this, I get an assert by the TBB framework.
Assertion !(prefix().extra_state & es_ref_count_active) failed on line 211 of file ../../src/tbb/task.cpp
Detailed description: ref_count race detected
Assertion !(prefix().extra_state & es_ref_count_active) failed on line 211 of file ../../src/tbb/task.cppDetailed description: ref_count race detected
Am I misusing it somehow?
0 Kudos
zturner
Beginner
1,113 Views
Somehow that didn't come out right. Move this:

[bash]        this->set_ref_count(children_.size());  
  
        std::list<:INTERNAL::FUNCTION_INVOKER>*>::const_iterator it = children_.begin();  
        std::list<:INTERNAL::FUNCTION_INVOKER>*>::const_iterator itEnd = children_.end();  
        for (; it != itEnd; ++it)  
        {  
            tbb::internal::function_invoker* child = *it;  
            this->enqueue(*child);  
        }  [/bash]


into the execute() function.
0 Kudos
RafSchietekat
Valued Contributor III
1,113 Views
Your code calls set_ref_count() after a child is... launched (normally by being spawned, here by being enqueued). A specific workaround for that would be to use allocate_additional_child_of() instead of allocate_child(), and to not use set_ref_count() at all (the reference count starts as zero and is implicitly safely incremented by allocate_additional_child_of()). set_ref_count() is preferably set from the total number of child tasks before any of them are launched, so you might also go that way by changing the program slightly.

Furthermore, invoker should not be spawned, because that is supposed to happen implicitly after the last child finishes executing.

I have not studied the code beyond spotting these obvious problems, but let's see what happens with them first.

Documentation maintenance (for TBB team): the reference currently only mentions "should not spawn" in the "Important" note.
0 Kudos
zturner
Beginner
1,113 Views
It could just be that it's 2 AM, but can you explain again? the async_invoker task is the only one that is spawned, so that happens first and set_ref_count gets executed once from async_invoker::execute() before any of the children are enqueued at all, right? Also, I cheated and for the children I actually used a tbb internal class, tbb::internal::function_invoker, not the async_invoker so no set_ref_count should happen after any child has been launched.
Due to formatting issues the code ended up a little screwy, but the entire sample should have been this:
[cpp]template  
class async_invoker : public tbb::task  
{  
public:  
    async_invoker(Callback callback)   
        : callback_(*new(allocate_continuation()) tbb::internal::function_invoker(callback))  
    {  
    }  
  
    tbb::task* execute()  
    {  
        this->set_ref_count(children_.size());  
  
        std::list<:INTERNAL::FUNCTION_INVOKER>*>::const_iterator it = children_.begin();  
        std::list<:INTERNAL::FUNCTION_INVOKER>*>::const_iterator itEnd = children_.end();  
        for (; it != itEnd; ++it)  
        {  
            tbb::internal::function_invoker* child = *it;  
            this->enqueue(*child);  
        }  
        return NULL;
    }  
  
    void add_child(const Function& func)  
    {  
        children_.push_back(new(allocate_child()) tbb::internal::function_invoker(func));  
    }  
  
private:  
    tbb::internal::function_invoker& callback_;  
    std::list<:INTERNAL::FUNCTION_INVOKER>*> children_;  
};  
  
template  
void async_invoke(const Callback& callback, const Function& f1, const Function& f2, const Function& f3)  
{  
    async_invoker& invoker = *new(tbb::task::allocate_root()) async_invoker(callback);  
    invoker.add_child(f1);  
    invoker.add_child(f2);  
    invoker.add_child(f3);  
  
    tbb::task::spawn(invoker);
}[/cpp]
0 Kudos
RafSchietekat
Valued Contributor III
1,113 Views
Make the child tasks have the callback task as their parent. Having async_invoker be the parent won't work, and you might as well get rid of it because it does nothing useful.

(Also, prefer std::vector to std::list unless you have a valid reason to use the latter.)
0 Kudos
jimdempseyatthecove
Honored Contributor III
1,113 Views
The OP queried on the possiblility of a non-blocking parallel_invoke.

While you canconstruct aLambda call using [=] (by value), and thus use the value at the time of call as opposed to time of execution, the closure object (the thing containing the values) is a stack object. The current parallel_invoke (both TBB and QuickThread) are blocking, thus are safe from destruction of closure object prior to end of invoked function.

The OP wanted a suggestion for how to have a non-blocking function similar to parallel_invoke. I merely layed out a possible solution and listed the caveats about use of Lambda. In QuickThread you can use a string of parallel_tasks (in addition to or in place of parallel_invoke). parallel_task is non-blocking (although it has slighty higher overhead than parallel_invoke).

An alternate means to reduce overhead, yet maintain non-blocking is: when the task list is large, then an allocated (or static)vector of functors can be walked using a non-blocking parallel_for or parallel_for_each or parallel_distribute or parallel_list.

An additional implementation issue for the OP is if/when these desired functions and completion routine are I/O type of functions then they are not suitable candidates for TBB threads. Oversubscripton of threads would help, up until these threads completed their tasks (at which point theh TBB would run less efficiently).

In QuickThread, you have two classes of threads. The user can select the class of thread in the parallel task. When the task has I/O statements, simply add a flag

parallel_task( IO$, func_1, arg, arg2, ..., argn);
parallel_task( IO$, func_2, arg, arg2, ..., argn);
...
parallel_task( IO$, func_n, arg, arg2, ..., argn);
parallel_task( IO$+OnDone$, CompletionRoutine);

or use lambda's if you want. But be careful of destruction of Lambda closure object prior to task completion.

You generally would add a control object to the parallel_task if/when the enqueued tasks are to have a lifetime longer than the task performing the enqueu operations. (you can also monitor the progress using the control object).

Jim Dempsey
0 Kudos
Reply