Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Flow Graph - target_node?

ronag89
Beginner
218 Views

Given the following example which requires a busy loop:

tbb::flow::graph g;
tbb::flow::function<int, int, tbb::flow::rejecting> n0(g, body());
tbb::flow::function<int, int, tbb::flow::rejecting> n1(g, body());
tbb::flow::make_edge(n0, n1);

// Thread 1 - Read from file

while (!eof)
   n0.try_put(read_int());

// Thread 2 - Write to file

while (true)
{
    int value;
    while(n1.try_get(value))
    {
        if (value == -1)
          return;
        write_int(value);
    }
    sleep(20);
}

I haven't been able figure out any pattern which allows me to get around the busy loop.

The only thing I can think of is a "target_node" with similar semantics to "concurrent_bounded_queue".

i.e.

target_node<int> t;
tbb::flow::make_edge(n1, t);

// ...

// Thread 2 - Write to file

while (true)
{
   int value;
    t.pop(value);
    if (value == -1)
       return;
    write_int(value);
}

I've tried to implement "target_node" myself. Although I am not 100% sure it is correct.

template<typename T>
struct target_node 
	: public tbb::flow::receiver<T>
	, boost::noncopyable
{		
	tbb::spin_mutex					predecessors_mutex_;
	std::list<predecessor_type*>	predecessors_;

	boost::condition_variable		cond_;
	boost::mutex					mutex_;
	std::queue<input_type>			queue_;
	std::size_t						capacity_;
		
	target_node(std::size_t capacity = std::numeric_limits<std::size_t>::max())
		: capacity_(capacity)
	{
	}

	tbb::task* try_put_task(const input_type& v) override
	{
		{
			boost::lock_guard<boost::mutex> lock(mutex_);

			if (queue_.size() >= capacity_)
				return nullptr;

			queue_.push(v);
		}

		cond_.notify_one();

		return tbb::flow::interface7::SUCCESSFULLY_ENQUEUED;
	}

	bool register_predecessor(predecessor_type& p)  override
	{
		{
			tbb::spin_mutex::scoped_lock lock(predecessors_mutex_);
			predecessors_.push_back(&p);
		}

		cond_.notify_one();

		return true; 
	}

	bool remove_predecessor(predecessor_type& p) override
	{
		{
			tbb::spin_mutex::scoped_lock lock(predecessors_mutex_);
			predecessors_.erase(std::find(predecessors_.begin(), predecessors_.end(), &p));
		}

		return true; 
	}

	void pop(input_type& value)
	{					
		boost::unique_lock<boost::mutex> lock(mutex_);
					
		while (true)
		{
			if (!queue_.empty())
			{
				value = std::move(queue_.front());
				queue_.pop();
				return;
			}

			{
				tbb::spin_mutex::scoped_lock lock(predecessors_mutex_);

				for (auto p : predecessors_)
				{
					if (p->try_get(value))
						return;

					if (p->register_successor(*this))
						remove_predecessor(*p);								
				}
			}
												
			cond_.wait(lock);
		}
	}

	void reset_receiver()
	{
		boost::lock_guard<boost::mutex> lock(mutex_);
		value_.reset();
	}
};

Does this make sense? Am I missing something? Is there a better way?

0 Kudos
0 Replies
Reply