Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Multiple Threads calling task::wait_for_all

Shankar1
Beginner
2,562 Views


Is it valid to call task.wait_for_all() from multiple threads?

If yes, will all the threads that called wait_for_all participate in the execution of the tasks under that root?

I have written a sample where I call wait on a task group from multiple threads and it seems to work on the sample that I tested with.

______
Sankar
0 Kudos
1 Solution
Alexey-Kukanov
Employee
2,562 Views
Sankar,

I looked at your code. The problem you describe has little to do with concurrent waiting. It's root case is that task_groups are not completely isolated inside the task scheduler.

Let me explain how it all works. When task_group::run() method is called for a task_group, a task is created and placed into the task pool of a thread that called run(). When task_group::wait() method is called, the calling thread enters a task processing mode (we sometimes call it "task dispatch loop") where it takes tasks one by one out of its own pool,or steals if the pool is empty, and executes. The task pool is LIFO for its owning thread and FIFO for thief threads, i.e. the owning thread starts processing from most recent tasks while thief threads steal the oldest tasks. An important note to add is that a thread will recursively enter task dispatch loop when task_group::wait() is called from a task being executed.

Now let'scan see what happens with your example. All the work is created and shared by the main thread when it calls Post() a bunch of times. If no other thread exists (e.g. on a single core system), at the end the task pool of the main thread looks like this (starting from the oldest task): [A1|A2|A3|A4|A5|B1|B2|B3|B4|B5|C1|C2|C3|C4|C5|D1|D2|D3|D4|D5]. If there are worker threads available, sometasks from the group a might get stolen already. Note that A1 is associated with WorkItem(a1), not with a1 directly; this is why I use capital letters in the names.
Next, the main thread calls a.Drain(). Here an unobvious issue lies: you expect that it will start working on the tasks belonging to the task_group a, but it just enters the task processing mode and starts from the end of the pool, as task groups are not fully isolated. So the main thread processes D5 first, which runs WorkItem<>(d5) and finds that the dependency on the task_group a is not yet resolved, and calls a.Drain() again. The situation repeats, except that D5 became "blocked" on the call stack of the main thread (as well as the previous calls to Drain(), task_group::wait() and some internal TBB functions) and the next task taken for processing is D4. For illustration, the call stack of the main thread will look somewhat like this:
[plain]D4.operator()()
tbb::internal::function_task< ... >::execute()
... other internal functions of TBB
tbb::internal::task_group_base::wait()
a.Drain()
d.PostOnDependentsCompletion( ... )
D5.operator()()
tbb::internal::function_task< ... >::execute()
... other internal functions of TBB
tbb::internal::task_group_base::wait()
a.Drain()
main()
[/plain]

You probably see that for each next task taken out of the pool the situation will repeat again and again until a task with no unresolved dependencies is found; with few worker threads and enough work per task, this will likely be A5. And every task from B1 to D5 has started execution and for now is blocked on the stack of the main thread.

Now you can probablyguess what happens next, and why replacing task_group::run(f) with direct call to f() makes such a bad difference. When all tasks belonging to a complete and B1 resumes execution, there is no free tasks for TBB worker threads to steal, as everything spawned before is blocked. If B1 calls task_group::run(b1) and so does every subsequent WorkItem task, some parallelism will appear again; however if functions are called directly, they all will be executed serially by the main thread as its call stack unwinds back.

I must say that even the "supposedly good" implementation is suboptimal. The tasks from group d could (and should) proceed right after the group a has completed, but instead remain blocked on stack by tasksk from the group cwhich are still waiting for b to complete. And only when b completes, both c and d will start execution.

Unfortunately we cannot provide complete isolation of task_groups for now; it would require a significant redesign of the task scheduler with new and currently unclear trade-offs, a lot of testing including for performance, etc. And it might end up being much less efficient in general than the current design.

I will think of a better way for you to organize the computation, and maybe suggest something different. With direct use of TBB low-level tasks and task lists,itcan be solved efficiently, but this low-level stuff is admittedly harder to use than task_group, and requires more code that must follow somewhat sophisticated rules. Of course you can give it a try if you wish.

View solution in original post

0 Kudos
3 Replies
Andrey_Marochko
New Contributor III
2,562 Views
Yes, it is valid provided that the root task being waited on is associated with a task group context object constructed with "concurrent_wait" trait, e.g.

[bash]task_group_context ctx(task_group_context::bound, 
                       task_group_context::default_traits | task_group_context::concurrent_wait);
task r = new( task::allocate_root(ctx) ) MyRootTask;
[/bash]

The answer to your second question is more involved. When a thread of your application (master thread from TBB perspective) spawns some tasks, TBB worker threads are invited to help the master executing them.

So we have two basic scenarios here. If concurrent wait_for_all() calls are made from the master and workers that already execute some of its tasks, then yes, all the waiting threads will participate in executing r's offsprings. If all r's descendants were spawned by one master (and workers servicing it), but concurrent waits happen in other master threads, then the latter will not have access to the tasks from r's tree and will be dumbly spinning until the first master (and its workers) finishes all the work.

However there is a third possible variant. If you need to wait from multiple masters, you could spawn some of r's descendants in each of them. Then a concurrent wait will execute the part of the task tree stemming from its thread. But you need to understand that this approach may (and likely will) result in significant imbalance, and consequently underutilization (mind you, this is much worse than oversubscription :) ).

Overall, concurrent waits are a very dangerous feature that may easily result in fragile design, so I'd recommend you to reconsider the logic of your application and try to get by without it (usually it turns out to be quite possible).

0 Kudos
Shankar1
Beginner
2,562 Views
Thanks for the reply. I understand the variants you have explained, but I encounter a scenario in my sample application which I cannot reason out from your explanation.

I will probably start with explaining my problem statement that I'm trying to solve and then go to my implementation so that things are clear. Later I will go to the behaviour of execution which I'm not able to explain from your explanation.

Problem Statement

Lets say I have 4 different types of data that my application needs to process, say A, B, C and D.

The data arrives something like,

A1, A2, ... Am, B1, B2, ... Bn, C1, C2, ... Co, D1, D2, ... Dp

where m, n, o and p are known only at runtime as the data arrives( so no assumptions on m, n, o, p can be made by the implementation).

Datas of a particular type can be processed in parallel. However there can be dependencies (synchronization required) between the processing of different types of data. Lets say the dependency is as follows
B's processing can start only after A's processing is complete.
C's processing can start only after both A's and B's processing is complete.
D's processing can start only after both A's processing is complete.

Implementation

So what I have done is that I have designed a class called Node which handles the scheduling of Tasks pertaining to a given type of data. So I can create multiple instances of the Node class( each instance configured to handle scheduling of a given type of data). Then I connect these Nodes based on the dependencies. So for the given example, I would connect them as follows to express the dependencies

a -> b
a -> c
b -> c
a -> d


[cpp]#include 
#include
#include
#include
#include

struct Info
{
double Result;
std::string GroupName;
};

tbb::concurrent_queue< Info> Outputs;

struct MyTask
{
MyTask( const std::string groupName) : GroupName( groupName) {}

// Some work. Each task takes roughly 500 ms
double DoWork()
{
double result = 1234567.89;
for( int i = 0; i < 1500000; ++i)
result = floor( pow( sqrt( log( result)), 4.0));
return result;
}

void operator()()
{
Info info = { DoWork(), GroupName };
Outputs.push( info);
}

private:
std::string GroupName;
};

int main()
{
tbb::task_scheduler_init init;
Node a( "A"), b( "B"), c( "C"), d( "D");

Connect( a, b);
Connect( a, c);
Connect( a, d);
Connect( b, c);

MyTask a1( "A"), a2( "A"), a3( "A"), a4( "A"), a5( "A");
MyTask b1( "B"), b2( "B"), b3( "B"), b4( "B"), b5( "B");
MyTask c1( "C"), c2( "C"), c3( "C"), c4( "C"), c5( "C");
MyTask d1( "D"), d2( "D"), d3( "D"), d4( "D"), d5( "D");

tbb::tick_count t0 = tbb::tick_count::now();
a.Post( a1); a.Post( a2); a.Post( a3); a.Post( a4); a.Post( a5);
b.Post( b1); b.Post( b2); b.Post( b3); b.Post( b4); b.Post( b5);
c.Post( c1); c.Post( c2); c.Post( c3); c.Post( c4); c.Post( c5);
d.Post( d1); d.Post( d2); d.Post( d3); d.Post( d4); d.Post( d5);
a.Drain(); b.Drain(); c.Drain(); d.Drain();
tbb::tick_count t1 = tbb::tick_count::now();
std::cout << "Time taken : " << (t1 - t0).seconds() << std::endl;

Outputs.clear();
}[/cpp]

The above code is the sample that I have created using the Node class. As seen above, the Node class provides methods Post and Drain methods. The Post method allows to post functors for execution and the Drain method waits for completion of posted functors under the respective Node instance.

Given the fact that there are dependencies, the Post method before it can post the functor for execution, has to make sure that all all its predecessor Nodes have completed the execution of their functors( i.e, it has to make sure all the predecessor Nodes are drained). Also the requirement is that the Post method should return immediately and cannot afford to do any synchronization\work( because it might have to look for more data from the data source).

So the implementation of Node class is as below

[cpp]#include 
#include

class Node
{
template
class WorkItem
{
public:
WorkItem( const Func& f, Node& node) : F( f), ParentNode( node) {}

void operator()() {
ParentNode.PostOnDependentsCompletion( F);
}

private:
Func F;
Node& ParentNode;
};

template
friend class WorkItem;

public:
Node( std::string name) : GroupName(name), Predecessors(), Successors(), TaskGroup() {}

~Node() {
Drain();
}

// Get the Group associated with this Node.
const std::string GetGroupName() const { return GroupName; }

// Method to connect Nodes
void AddSuccessor( Node& successor) {
Successors.push_back( &successor);
}
void AddPredecessor( Node& predecessor) {
Predecessors.push_back( &predecessor);
}

template
void Post( const F& f) {
TaskGroup.run( WorkItem( f, *this));
}

void Drain() {
TaskGroup.wait();
}

private:

template
void PostOnDependentsCompletion( const F& f) {
for( std::vector::const_iterator iter = Predecessors.begin();
iter != Predecessors.end(); ++iter) {
(*iter)->Drain();
}
TaskGroup.run( f);
}

std::string GroupName;
tbb::task_group TaskGroup;
std::vector Predecessors, Successors;
};

void Connect( Node& predecessor, Node& successor)
{
predecessor.AddSuccessor( successor);
successor.AddPredecessor( predecessor);
}
[/cpp]

Given this implementation of Node class, the sample scales very well on my dual core machine( equally on a quad core machine as well). Now if we look at the implementation, each Node has an associated tbb::task_group which takes care of the scheduling of functors corresponding to the data that it is meant to schedule. To look for the completion of posted functors all that needs to be done is to wait on the taskgroup( which is what the Drian method does).

Now the Post method posts a WorkItem functor as a task to the task group and returns immediately. The WorkItem functor is meant to wait untill all the predecessor Nodes have been drained( if not help them to drain) and once it is drained it will post the functor associated again to the task group.

Though this implementation is still not efficient in some sense( for example B1 task should wait on draining Node A, so would call wait_for_all on A's task group. Once that operation completes, still the execution of B2, .. B5 tasks would again call A's Drain method again), it still scales pretty well.


Scenario

Now for the given sample there are 20 Post calls made by the main thread. This will result in 40 tasks being created and executed by the scheduler( 20 WorkItems postes as task and each workitem again posts the MyTask functor as a Task).

As I see it the extra task created by the WorkItem was unnecessary since the WorkItem fucntor itself is executed in the task group. So I modified the PostOnDependentsCompletion code as follows( now this method should rather be called RunOnDependentsCompletion)

[cpp]    template
void PostOnDependentsCompletion( const F& f) {
for( std::vector::const_iterator iter = Predecessors.begin();
iter != Predecessors.end(); ++iter) {
(*iter)->Drain();
}
// Dont Post again another task instead execute the functor
// TaskGroup.run( f);
const_cast( f)();
}
[/cpp]

With this change the total scalabilty of the application is lost. The performance of the application on both the dual and quad core is almost as bad as the serial implementation. When I profiled it using the VTune Amplifier what I see is that the TBB Worker Thread participates in execution of Tasks only for a very very small amount of time and it keeps waiting. When I queried the call stack to see what the worker thread is doing, it is waiting on the line "my_thread_monitor.commit_wait(c);" in private_server.cpp file.


Questions

1. I'm not able to reason out why the scalability is totally lost because of this change. In my case there is only one master thread involved and so I'm not able to use your explanation to justify this performance hit. In fact on this approach there are only 20 tasks created in the task groups unlike 40 tasks in the earlier case. So I would expect this version to be much faster.

2. I have use the concurrent wait_for_all feature in this case( I see task_group using the concurrent trait by default). Can you please suggest me some approach wherein I can avoid the concurrent wait_for_all for this given problem statement.


I have attached the complete code as well which can be readily built and tested. I have used tbb version
tbb30_20110419oss in my tests.

Thanks
Sankar
0 Kudos
Alexey-Kukanov
Employee
2,563 Views
Sankar,

I looked at your code. The problem you describe has little to do with concurrent waiting. It's root case is that task_groups are not completely isolated inside the task scheduler.

Let me explain how it all works. When task_group::run() method is called for a task_group, a task is created and placed into the task pool of a thread that called run(). When task_group::wait() method is called, the calling thread enters a task processing mode (we sometimes call it "task dispatch loop") where it takes tasks one by one out of its own pool,or steals if the pool is empty, and executes. The task pool is LIFO for its owning thread and FIFO for thief threads, i.e. the owning thread starts processing from most recent tasks while thief threads steal the oldest tasks. An important note to add is that a thread will recursively enter task dispatch loop when task_group::wait() is called from a task being executed.

Now let'scan see what happens with your example. All the work is created and shared by the main thread when it calls Post() a bunch of times. If no other thread exists (e.g. on a single core system), at the end the task pool of the main thread looks like this (starting from the oldest task): [A1|A2|A3|A4|A5|B1|B2|B3|B4|B5|C1|C2|C3|C4|C5|D1|D2|D3|D4|D5]. If there are worker threads available, sometasks from the group a might get stolen already. Note that A1 is associated with WorkItem(a1), not with a1 directly; this is why I use capital letters in the names.
Next, the main thread calls a.Drain(). Here an unobvious issue lies: you expect that it will start working on the tasks belonging to the task_group a, but it just enters the task processing mode and starts from the end of the pool, as task groups are not fully isolated. So the main thread processes D5 first, which runs WorkItem<>(d5) and finds that the dependency on the task_group a is not yet resolved, and calls a.Drain() again. The situation repeats, except that D5 became "blocked" on the call stack of the main thread (as well as the previous calls to Drain(), task_group::wait() and some internal TBB functions) and the next task taken for processing is D4. For illustration, the call stack of the main thread will look somewhat like this:
[plain]D4.operator()()
tbb::internal::function_task< ... >::execute()
... other internal functions of TBB
tbb::internal::task_group_base::wait()
a.Drain()
d.PostOnDependentsCompletion( ... )
D5.operator()()
tbb::internal::function_task< ... >::execute()
... other internal functions of TBB
tbb::internal::task_group_base::wait()
a.Drain()
main()
[/plain]

You probably see that for each next task taken out of the pool the situation will repeat again and again until a task with no unresolved dependencies is found; with few worker threads and enough work per task, this will likely be A5. And every task from B1 to D5 has started execution and for now is blocked on the stack of the main thread.

Now you can probablyguess what happens next, and why replacing task_group::run(f) with direct call to f() makes such a bad difference. When all tasks belonging to a complete and B1 resumes execution, there is no free tasks for TBB worker threads to steal, as everything spawned before is blocked. If B1 calls task_group::run(b1) and so does every subsequent WorkItem task, some parallelism will appear again; however if functions are called directly, they all will be executed serially by the main thread as its call stack unwinds back.

I must say that even the "supposedly good" implementation is suboptimal. The tasks from group d could (and should) proceed right after the group a has completed, but instead remain blocked on stack by tasksk from the group cwhich are still waiting for b to complete. And only when b completes, both c and d will start execution.

Unfortunately we cannot provide complete isolation of task_groups for now; it would require a significant redesign of the task scheduler with new and currently unclear trade-offs, a lot of testing including for performance, etc. And it might end up being much less efficient in general than the current design.

I will think of a better way for you to organize the computation, and maybe suggest something different. With direct use of TBB low-level tasks and task lists,itcan be solved efficiently, but this low-level stuff is admittedly harder to use than task_group, and requires more code that must follow somewhat sophisticated rules. Of course you can give it a try if you wish.
0 Kudos
Reply