- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
[cpp]class active_object { public: active_object() { queue_.set_capacity(3); thread_ = tbb::tbb_thread([this]{run();}); // Another edit. } ~active_object() { queue_.clear(); // problem? queue_.push(nullptr); thread_.join(); // EDIT. Forgot this. } void push(const std::function& func) { queue_.push(func); } private: void run() { std::function func = []{}; while(func != nullptr) { queue_.pop(func); func(); } } tbb::tbb_thread thread_; tbb::concurrent_bounded_queue<:FUNCTION> > queue_; };[/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I would implement it with separate out-of-band flag:
[cpp]class active_object { public: active_object() : thread_([this]{run();}) , stop_(0) { } ~active_object() { stop_ = 1; queue_.push(nullptr); thread_.join(); } void push(const std::function& func) { queue_.push(func); } private: void run() { for (;;) { std::function func; queue_.pop(func); if (stop_ == 1) break; func(); } } tbb::atomic stop_; tbb::tbb_thread thread_; tbb::concurrent_bounded_queue<:FUNCTION> > queue_; };[/cpp]
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I think that without further guarantees for tbb::concurrent_bounded_queue it is problematic to (drop out of run() and therefore destroy this and therefore) call clear() before the thread knows that the client that pushed nullptr from another thread has safely returned from push() (which can be remedied by locking a mutex in active_object::push(), but at some cost to or possibly defeating queue_'s scalability), silly to push nullptr at the end of the destructor (right?), and problematic to destroy queue_ as part of running ~active_object() for similar but even strongerreasons as/than for calling clear(), i.e., the queue is still being using by multiple threads.
This has been discussed in another thread ("atomicity" of push()). Maybe I'll add a reference later, unless somebody else does it first.
(Added)Maybe I'm mistaken about what the queue type guarantees, but a better workaround seems to lock a mutex to a special end-of-input operation that pushes nullptr, with active_object briefly locking that same mutex when it receives nullptr.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I would implement it with separate out-of-band flag:
[cpp]class active_object { public: active_object() : thread_([this]{run();}) , stop_(0) { } ~active_object() { stop_ = 1; queue_.push(nullptr); thread_.join(); } void push(const std::function& func) { queue_.push(func); } private: void run() { for (;;) { std::function func; queue_.pop(func); if (stop_ == 1) break; func(); } } tbb::atomic stop_; tbb::tbb_thread thread_; tbb::concurrent_bounded_queue<:FUNCTION> > queue_; };[/cpp]
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
(Added) No, wait, apparently the queue's capacity is set while the thread is already running, which seems like another problem. But I was wrong about when active_object is destroyed (some confusion with Java Threads, I thought that the consumer would destroy active_object), if the intention is that one thread creates the object, performs a number of push() operations, and then destroys it, because that creator/producer thread does not know when the consumer thread has finished.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The thread is not yet joined/destroyed when clear() is called.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
"The thread is not yet joined/destroyed when clear() is called."
Right, there's a synchronisation missing, see my addition above.
(Added) Ah, I see a join() has magically appeared in the original question...
(Added) Sorry, I'm not helping (too much distraction); Dmitriy's answer seems to be right. Don't forget to set capacity before starting the thread, though.
(Added) Apparently it's even worse: the thread is possibly launched before the queue is even created...
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Agree. Implicit thread launch is not a good idea anyway.
> Also, use value-initialisation (sic?) for stop_, i.e., empty argument list, which makes the initial value zero, because an atomic has no constructor that takes an argument of 0.
I don't remember what initialization it provides. Use whatever works for you. I would prefer:
atomic_store_explicit(&stop_, 0, memory_order_relaxed);

- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page