Community
cancel
Showing results for 
Search instead for 
Did you mean: 
nagy
New Contributor I
61 Views

Waiting for Local Serializer

I have implemented a task serializer based on the "Local Serializer" from the Design Patterns document.
However I have problems with implementing a "wait" function which waits for all the task to finish, and I'm hoping to get some assistance on solving it. Currently I am getting either an exception or an assert when calling wait() (more details in a moment).
EDIT: REMOVED INCORRECT IMPLEMENTATION
EDIT:
On second thought, does following code even make sense? im trying to execute an executing task?

[cpp] while(executing_ != nullptr && executing_->state() == tbb::task::executing)  
            executing_->wait_for_all(); // Can fail since ref_count could be decremented before calling [/cpp]
Im guessing I should use an empty task as a root node and call wait_for_all on that task? Not sure how I would do that?
0 Kudos
4 Replies
nagy
New Contributor I
61 Views

Tried using an empty task as root. Seems to work. Not sure how well task::enqueue and wait_for_all behave together.
[cpp]class function_task_serializer
{ 
	template
	class packaged_task : public tbb::task
	{
	public:
		packaged_task(boost::packaged_task&& task, function_task_serializer& s) : task_(boost::move(task)), s_(s){}	
	private:
		tbb::task* execute() 
		{
			task_();
			s_.note_completion();
			return nullptr;
		}

		boost::packaged_task task_;
		function_task_serializer& s_;
	};
		
	template
	class simple_task : public tbb::task
	{
	public:
		simple_task(Func&& func, function_task_serializer& s) : func_(std::move(func)), s_(s){}	
	private:
		tbb::task* execute() 
		{
			func_();
			s_.note_completion();
			return nullptr;
		}

		Func func_;
		function_task_serializer& s_;
	};
public:
	function_task_serializer()
	{
		count_ = 0;		
		parent_ = new(tbb::task::allocate_root()) tbb::empty_task;
		parent_->set_ref_count(1);
	}
	
	void wait()
	{
		parent_->wait_for_all();
		parent_->destroy(*parent_);
	}

	template
	void enqueue(Func&& func)
	{
		parent_->increment_ref_count();
		queue_.push(new(parent_->allocate_child()) simple_task(std::forward(func), *this));
		if(++count_ == 1)
			execute_item();
	}
	
	template
	auto begin_invoke(Func&& func) -> boost::unique_future
	{			
		auto task = boost::packaged_task(std::forward(func));
		auto future = task.get_future();
		
		parent_->increment_ref_count();
		queue_.push(new(parent_->allocate_child()) packaged_task(boost::move(task), *this));
		if(++count_ == 1)
			execute_item();

		return std::move(future);		
	}
		
	template
	auto invoke(Func&& func) -> decltype(func())
	{	
		return begin_invoke(std::forward(func)).get();		
	}

private:

	void execute_item()
	{
		tbb::task* task;
		queue_.try_pop(task);
		tbb::task::enqueue(*task);
	}

	void note_completion()
	{
		if(--count_ > 0)
			execute_item();
	}

	tbb::empty_task* parent_;
	tbb::concurrent_queue<:TASK> queue_;
	tbb::atomic count_;
};[/cpp]
Alexey_K_Intel3
Employee
61 Views

Unlike spawned tasks, you don't have to wait on enqueued tasks (this is why their other name is fire-and-forget tasks), but if you are willing to then you do it the same way as for spawned ones. I.e. task manipulation in your second implementation is correct.

nagy
New Contributor I
61 Views

I need to wait to wait for the tasks in my case since some of the tasks use objects that might be destroyed before execution otherwise and cause access violations.


There seems to be a problem with the second implementation though. Sometimes the entire applications stops/is blocked at parent_->wait_for_all(), seems like ref_count never reaches 1. Any ideas as how that might occur? Maybe if one if one of theexecutingtasks is waiting for it's own task_serializer and somehow this nested case can become a problem? Can't really figure it out.
EDIT:
I should probably also mention that I'm calling "wait" in a tbb task.

EDIT:

It might also be a better idea to spawn instead of enqueue while waiting?
[cpp]	void execute_item()
	{
		tbb::task* task;
		queue_.try_pop(task);
		if(!isWaiting_)
			tbb::task::enqueue(*task);
		else
			parent_->spawn(*task); // Execute as fast as possible while waiting
	}[/cpp]
Alexey_K_Intel3
Employee
61 Views

Oh, if you call wait() in another task the you should only spawn(), not enqueue(). Due to FIFO-like execution, enqueued tasks are bad for nested parallelism, and currently there is a restriction that only a thread with no work on its stack can "dequeue" tasks (to prevent for risks of unbound stack growth and tasks blocked forever on the stack). Basically it means that waiting for enqueued tasks is only allowed for the main thread, otherwise deadlock can happen (and you seem to see it indeed happens).

Reply