Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

task_groups on single core machines

nagy
New Contributor I
464 Views
As far as I have understood a task_group will make no progress until wait() is called when running on a single core machine? Since no worker thread is created in this case?
I find this a bit bug prone since applications that seem to run fine on a dual+-core machine will suddenly stop working properly when run on a single core machine. Most cases I have seen task_group.run used its used as a fire and forget method and task_group.wait() is used in the destructor similarly to thread.join().
From reading these forums I have found two solutions to this, either use task::enqueue or start the task_scheduler with max(1, N) threads. However I'm unsure which solution is recommended?
Using task::enqueue some kind of wait has to be manually implemented.
Using task_scheduler with max(1, N), gives someover-subscription, but on the other handguarantees progress also in similar cases?
I would like that the automatic task_scheduler init always creates at least one worker thread, at the cost ofover-subscriptionon single core machines (which should be unusual)
EDIT: replaced min with max
0 Kudos
7 Replies
RafSchietekat
Valued Contributor III
464 Views
It's called required concurrency (sic?) vs. optional parallelism. I haven't seen any indication that enqueue provides additional concurrency, I doubt it does, and I don't think it should, although quite probably there's a case to be made for well-defined and isolated additional concurrency for tasks. It would be interesting to know what you were thinking of, here, because one would need to be careful about what to provide and how to educate the user about best practices (without implying that I'm in a position to do so, myself).
0 Kudos
nagy
New Contributor I
464 Views
Well the scenario I am thinking of goes something like this.
class MyWorker
{
public:
~MyWorker()
{
taskGroup_.wait(); // Wait for all tasks to finish before destroying
}
std:future AsyncWork()
{
std::promise promise;
taskGroup_.run([=] { /*Do work*/ promise.set_value(result); });
return promise.get_future();
}
private:
/* Some shared state variables */
tbb::task_group taskGroup_;
};
From my understanding such a solution would work just fine on a dual+core machine, but will not make any progress when run on a single core until it enters the destructor. How would one solve this?
0 Kudos
Alexey-Kukanov
Employee
464 Views
> I haven't seen any indication that enqueue provides additional concurrency

It does (in the sense that one worker will be allocated if there is none), and that's documented as far as I remember.
In the current implementation, only worker threads can "dequeue" the tasks that were enqueued. This method was created for fire-and-forget scenarios that require concurrency.

The task_group-like way to wait for completion of enqueued tasks is to allocate those as "children" of some special task which ref_count should be set to n_children+1 - just like for "normal" tasks. Then you can call wait_for_all on that task when ready. This will cause the main thread to enter TBB scheduler, and process "normal" tasks if any while waiting for completion of enqueued tasks.
0 Kudos
ARCH_R_Intel
Employee
464 Views
The scenario suggests that you wish to implement general futures. A hazard to be aware of is that general futures (specifically ones that express arbitrary dependence graphs) are not implementable on top of the TBB task scheduler, even with the new task::enqueue.

See "Leapfrogging: a portable technique for implementing efficient futures" for a description of the problem and one proposed solution based on restricted stealing. Currently, the only available solution in TBB is to use threads instead of tasks (expensive!). It seems that no matter what, general futures are an expensive feature given the constraints of calling conventions based on stacks and tightly coupling stacks and threads, an unfortunate inheritance from serial processor days.
0 Kudos
nagy
New Contributor I
464 Views
That was interesting regarding the futures. However I am still interested in this scenario when futures is removed.
class MyWorker
{
public:
~MyWorker()
{
taskGroup_.wait(); // Wait for all tasks to finish before destroying
}
void AsyncWork()
{
taskGroup_.run([=] { /*Do work*/ });
}
private:
/* Some shared state variables */
tbb::task_group taskGroup_;
};
I guess tbb::enqueue with some sort of semaphore to wait for completion of all fired tasks is the recommended solution?
0 Kudos
nagy
New Contributor I
464 Views
Here is a somewhat flawed try to solve what I want.
namespace internal
{
class function_task : public tbb::task
{
public:
function_task(std::function&& f ) : my_func(std::move(f))
{
assert(my_func != nullptr);
}
template
static void enqueue(T&& f)
{
tbb::task::enqueue(*new(tbb::task::allocate_root()) function_task(std::forward(f)));
}
private:
std::function my_func;
tbb::task* execute()
{
my_func();
return NULL;
}
};
}
class safe_task_group
{
public:
safe_task_group()
{
count_ = 0;
isRunning_ = new tbb::atomic();
*isRunning_ = true;
isCanceled_ = false;
}
~safe_task_group()
{
if(!isCanceled_)
cancel();
}
template
void run(F&& f)
{
assert(*isRunning_);
++count_;
internal::function_task::enqueue([=]() mutable
{
if(*isRunning_)
{
f();
if(count_.fetch_and_decrement() == 0)
cond_.notify_all();
}
else if(count_.fetch_and_decrement() == 0) // last task safe to delete
delete isRunning_;
});
}
void cancel()
{
assert(!isCanceled_);
if(isCanceled_.fetch_and_store(true))
return;
if(count_ == 0)
{
delete isRunning_;
isRunning_ = nullptr;
}
else
{
*isRunning_ = false;
cond_.notify_all();
}
}
void wait()
{
tbb::interface5::unique_lock<:MUTEX> lock(mutex_);
cond_.wait(lock, [&] { return count_ == 0 || !*isRunning_;});
}
private:
tbb::interface5::condition_variable cond_;
mutable tbb::mutex mutex_;
tbb::atomic count_;
tbb::atomic* isRunning_;
tbb::atomic isCanceled_;
};
EDIT: updated code
0 Kudos
jimdempseyatthecove
Honored Contributor III
464 Views
Don't you mean MAX(2,N) where N=number of hardware threads available to the program?

IOW you init to at least 2 threads.

Also, when running on single hardware thread system (one core no HT) you might experiment with inserting a bubble task that reduces priority, spinwaits looking for completion condition, restores priority, exitst task (then cleans up in the dtor). This task is not launched when system has more that 1 HW thread.

Jim
0 Kudos
Reply