- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Given the following example which requires a busy loop:
tbb::flow::graph g; tbb::flow::function<int, int, tbb::flow::rejecting> n0(g, body()); tbb::flow::function<int, int, tbb::flow::rejecting> n1(g, body()); tbb::flow::make_edge(n0, n1); // Thread 1 - Read from file while (!eof) n0.try_put(read_int()); // Thread 2 - Write to file while (true) { int value; while(n1.try_get(value)) { if (value == -1) return; write_int(value); } sleep(20); }
I haven't been able figure out any pattern which allows me to get around the busy loop.
The only thing I can think of is a "target_node" with similar semantics to "concurrent_bounded_queue".
i.e.
target_node<int> t; tbb::flow::make_edge(n1, t); // ... // Thread 2 - Write to file while (true) { int value; t.pop(value); if (value == -1) return; write_int(value); }
I've tried to implement "target_node" myself. Although I am not 100% sure it is correct.
template<typename T> struct target_node : public tbb::flow::receiver<T> , boost::noncopyable { tbb::spin_mutex predecessors_mutex_; std::list<predecessor_type*> predecessors_; boost::condition_variable cond_; boost::mutex mutex_; std::queue<input_type> queue_; std::size_t capacity_; target_node(std::size_t capacity = std::numeric_limits<std::size_t>::max()) : capacity_(capacity) { } tbb::task* try_put_task(const input_type& v) override { { boost::lock_guard<boost::mutex> lock(mutex_); if (queue_.size() >= capacity_) return nullptr; queue_.push(v); } cond_.notify_one(); return tbb::flow::interface7::SUCCESSFULLY_ENQUEUED; } bool register_predecessor(predecessor_type& p) override { { tbb::spin_mutex::scoped_lock lock(predecessors_mutex_); predecessors_.push_back(&p); } cond_.notify_one(); return true; } bool remove_predecessor(predecessor_type& p) override { { tbb::spin_mutex::scoped_lock lock(predecessors_mutex_); predecessors_.erase(std::find(predecessors_.begin(), predecessors_.end(), &p)); } return true; } void pop(input_type& value) { boost::unique_lock<boost::mutex> lock(mutex_); while (true) { if (!queue_.empty()) { value = std::move(queue_.front()); queue_.pop(); return; } { tbb::spin_mutex::scoped_lock lock(predecessors_mutex_); for (auto p : predecessors_) { if (p->try_get(value)) return; if (p->register_successor(*this)) remove_predecessor(*p); } } cond_.wait(lock); } } void reset_receiver() { boost::lock_guard<boost::mutex> lock(mutex_); value_.reset(); } };
Does this make sense? Am I missing something? Is there a better way?
Link Copied
0 Replies

Reply
Topic Options
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page