Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Waiting for Local Serializer

nagy
New Contributor I
438 Views
I have implemented a task serializer based on the "Local Serializer" from the Design Patterns document.
However I have problems with implementing a "wait" function which waits for all the task to finish, and I'm hoping to get some assistance on solving it. Currently I am getting either an exception or an assert when calling wait() (more details in a moment).
EDIT: REMOVED INCORRECT IMPLEMENTATION
EDIT:
On second thought, does following code even make sense? im trying to execute an executing task?

[cpp] while(executing_ != nullptr && executing_->state() == tbb::task::executing)  
            executing_->wait_for_all(); // Can fail since ref_count could be decremented before calling [/cpp]
Im guessing I should use an empty task as a root node and call wait_for_all on that task? Not sure how I would do that?
0 Kudos
4 Replies
nagy
New Contributor I
438 Views
Tried using an empty task as root. Seems to work. Not sure how well task::enqueue and wait_for_all behave together.
[cpp]class function_task_serializer
{ 
	template
	class packaged_task : public tbb::task
	{
	public:
		packaged_task(boost::packaged_task&& task, function_task_serializer& s) : task_(boost::move(task)), s_(s){}	
	private:
		tbb::task* execute() 
		{
			task_();
			s_.note_completion();
			return nullptr;
		}

		boost::packaged_task task_;
		function_task_serializer& s_;
	};
		
	template
	class simple_task : public tbb::task
	{
	public:
		simple_task(Func&& func, function_task_serializer& s) : func_(std::move(func)), s_(s){}	
	private:
		tbb::task* execute() 
		{
			func_();
			s_.note_completion();
			return nullptr;
		}

		Func func_;
		function_task_serializer& s_;
	};
public:
	function_task_serializer()
	{
		count_ = 0;		
		parent_ = new(tbb::task::allocate_root()) tbb::empty_task;
		parent_->set_ref_count(1);
	}
	
	void wait()
	{
		parent_->wait_for_all();
		parent_->destroy(*parent_);
	}

	template
	void enqueue(Func&& func)
	{
		parent_->increment_ref_count();
		queue_.push(new(parent_->allocate_child()) simple_task(std::forward(func), *this));
		if(++count_ == 1)
			execute_item();
	}
	
	template
	auto begin_invoke(Func&& func) -> boost::unique_future
	{			
		auto task = boost::packaged_task(std::forward(func));
		auto future = task.get_future();
		
		parent_->increment_ref_count();
		queue_.push(new(parent_->allocate_child()) packaged_task(boost::move(task), *this));
		if(++count_ == 1)
			execute_item();

		return std::move(future);		
	}
		
	template
	auto invoke(Func&& func) -> decltype(func())
	{	
		return begin_invoke(std::forward(func)).get();		
	}

private:

	void execute_item()
	{
		tbb::task* task;
		queue_.try_pop(task);
		tbb::task::enqueue(*task);
	}

	void note_completion()
	{
		if(--count_ > 0)
			execute_item();
	}

	tbb::empty_task* parent_;
	tbb::concurrent_queue<:TASK> queue_;
	tbb::atomic count_;
};[/cpp]
0 Kudos
Alexey-Kukanov
Employee
438 Views

Unlike spawned tasks, you don't have to wait on enqueued tasks (this is why their other name is fire-and-forget tasks), but if you are willing to then you do it the same way as for spawned ones. I.e. task manipulation in your second implementation is correct.

0 Kudos
nagy
New Contributor I
438 Views
I need to wait to wait for the tasks in my case since some of the tasks use objects that might be destroyed before execution otherwise and cause access violations.


There seems to be a problem with the second implementation though. Sometimes the entire applications stops/is blocked at parent_->wait_for_all(), seems like ref_count never reaches 1. Any ideas as how that might occur? Maybe if one if one of theexecutingtasks is waiting for it's own task_serializer and somehow this nested case can become a problem? Can't really figure it out.
EDIT:
I should probably also mention that I'm calling "wait" in a tbb task.

EDIT:

It might also be a better idea to spawn instead of enqueue while waiting?
[cpp]	void execute_item()
	{
		tbb::task* task;
		queue_.try_pop(task);
		if(!isWaiting_)
			tbb::task::enqueue(*task);
		else
			parent_->spawn(*task); // Execute as fast as possible while waiting
	}[/cpp]
0 Kudos
Alexey-Kukanov
Employee
438 Views

Oh, if you call wait() in another task the you should only spawn(), not enqueue(). Due to FIFO-like execution, enqueued tasks are bad for nested parallelism, and currently there is a restriction that only a thread with no work on its stack can "dequeue" tasks (to prevent for risks of unbound stack growth and tasks blocked forever on the stack). Basically it means that waiting for enqueued tasks is only allowed for the main thread, otherwise deadlock can happen (and you seem to see it indeed happens).

0 Kudos
Reply