Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Oversubscribe - Correct Implementation?

ronag89
Beginner
310 Views

Would this be a correct implementation for oversubscribing the TBB thread pool?

What are the disadvantages of using this over moving IO to non tbb-threads? The latter of cource being much more complicated.

void oversubscribe(bool value)
{
	class worker
	{
		tbb::concurrent_bounded_queue<std::function<void()>> queue_;
		std::thread thread_;
	public:
		worker()
			: thread_([this]
			{
				while (true)
				{
					std::function<void()> func;
					queue_.pop(func);
					if (!func)
						break;
					func();
				}
			})
		{
		}
		~worker()
		{
			queue_.push(nullptr);
			thread_.join();	
		}

		template<typename F>
		void run(F&& func)
		{
			queue_.push(std::forward<F>(func));
		}
	};

	static tbb::concurrent_bounded_queue<std::shared_ptr<worker>> workers;
	static tbb::concurrent_bounded_queue<std::shared_ptr<tbb::task>> task_refs;

	if (value)
	{
		auto task = new (tbb::task::allocate_root()) tbb::empty_task;
		task->increment_ref_count(); // Inc ref for oversubscription.

		std::shared_ptr<worker> w;
		if (!workers.try_pop(w))
			w = std::make_shared<worker>();
		
		std::shared_ptr<tbb::task> task_ref(task, [](tbb::task* task)
		{
			task->decrement_ref_count(); // Dec ref for oversubscription.
		});

		w.run([=]
		{
			task->increment_ref_count(); // Inc ref for spawn.
			tbb::task::spawn_root_and_wait(task); // Start stealing tasks.
			worker.push(w);
		});

		task_refs.push(task_ref);
	}
	else
	{
		std::shared_ptr<tbb::task> task_ref;
		task_refs.try_pop(task_ref);
	}
}

struct scoped_oversubscription
{
	scoped_oversubscription()
	{
		oversubscribe(true);
	}

	~falsescoped_oversubscription()
	{
		oversubscribe(true);
	}
};

 

0 Kudos
0 Replies
Reply