Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Advice: Producer-Consumer problem

Sensei_S_
Beginner
1,028 Views

Dear all,

I have a simple problem that I need to solve, and I'd like an advice about it. The problem is quite simple, actually, but being a TBB newbie, nothing can be set aside.

With a file containing strings, I need each string to be transformed. I also need to create the output file of transformed strings, regardless of the order of strings. It's a classical producer-consumer problem.

My initial thought is this: is it possible using TBB to create a sort of "pipeline" in which I read the file and each thread (in a thread pool), if available, takes ownership of the string, process it, and write to the output file.

I've read the parallel_pipeline documentation, but it's not what I need, as far as I understand. I'd need some "parallel consumer queue" concept, but I didn't see it at first sight. There is a thread (from 2007) in the forum with a link, but the link it provided is no longer active.

Thanks for any hint you can possibly give me!

0 Kudos
1 Solution
RafSchietekat
Valued Contributor III
1,028 Views

Why would pipeline/parallel_pipeline() not suit your needs? You can have a serial stage to read a string, a parallel stage or stages to transform it, and then a serial stage to write it (your choice whether to preserve the original order).

It is a very I/O-intensive situation, though, so you might want to delegate file access to non-TBB threads that communicate using queues with the TBB code that does the CPU-intensive transformation work. But you can probably do it all inside TBB first and find out from there whether you could use oversubscription to quickly solve that issue.

View solution in original post

0 Kudos
7 Replies
RafSchietekat
Valued Contributor III
1,029 Views

Why would pipeline/parallel_pipeline() not suit your needs? You can have a serial stage to read a string, a parallel stage or stages to transform it, and then a serial stage to write it (your choice whether to preserve the original order).

It is a very I/O-intensive situation, though, so you might want to delegate file access to non-TBB threads that communicate using queues with the TBB code that does the CPU-intensive transformation work. But you can probably do it all inside TBB first and find out from there whether you could use oversubscription to quickly solve that issue.

0 Kudos
Sensei_S_
Beginner
1,028 Views

Thanks, Raf, I sincerely thought a parallel pipeline couldn't help. It does, actually, and it works like a charm. I have some questions, though, if you have spare time :)

The first is simple, as you see the code below, do you see any drawback in using a concurrent_bounded_queue that the last stage will get pop()'ed?

The second is just aesthetic: I had to use a fake bool in the return type of my parallel filters, do you think I could avoid this? I've tried void, but it won't compile.

The third and last is about the number of tokens in the pipeline. Is there a sort of rule of thumb to select one?

Thank you for clarifying my doubts!

 

    tbb::concurrent_bounded_queue<std::string> io_queue;
    
    io_queue.set_capacity(100);
    
    tbb::parallel_pipeline(
                           20,
                           tbb::make_filter<void, std::string>(tbb::filter::serial,
                                        [&](tbb::flow_control& fc)-> std::string {
                                            if (i++ < 100000)
                                                return "something";
                                            else
                                            {
                                                fc.stop();
                                                return "";
                                            }
                                        })
                           &
                           tbb::make_filter<std::string, bool>(tbb::filter::parallel,
                                        [&](std::string s){
                                            for(auto &p : s) p = std::toupper(p);
                                            io_queue.push(s);
                                            return true;
                                        })
                           &
                           tbb::make_filter<bool, void>(tbb::filter::serial,
                                        [&](bool b){
                                            std::string s;
                                            io_queue.pop(s);
                                            std::cout << s << std::endl;
                                        })
                           );
0 Kudos
RafSchietekat
Valued Contributor III
1,028 Views

Don't substitute your own queue between stages of a pipeline!

Usually it's Jim Dempsey who has some advice about the right number of tokens; I don't. ;-)

Just capitalising seems to be too little CPU-bound work for this pipeline to matter.

Hmm, strange that an ampersand operator was chosen to string those stages/filters together...

0 Kudos
ronag89
Beginner
1,028 Views

Here is an idea using the flow api.

Basically what you need are "thread bound" nodes in the flow graph. Would be nice if tbb had those, but here is a start:

tbb::flow::graph g;
thread_bound_source<std::string> s(g, [](std::string& v) 
{
  if (i++ >= 100000)
    return false;

  v = "something";

  return true;
})

tbb::limiter_node<std::string> l(g, 200);

tbb::flow::function<std::string, std::string> f(g, tbb::flow::unlimited, [](std::string s)
{
    for (auto& p : s) 
      p = std::toupper(p);
    return s;
});

thread_bound_target<std::string> t(g, [](const std::string& v)
{
  std::cout << s << std::endl;
}); 
tbb::flow::make_edge(s, l);
tbb::flow::make_edge(l, f);
tbb::flow::make_edge(f, t);
tbb::flow::make_edge(t, l.decrement);

g.wait_for_all();
template<typename T>
class thread_bound_source : public graph_node, public sender<T>
{
  std::function<bool(output_type&)>  body_;

  std::list<successor_type*>         successors_;
  tbb::spin_mutex                    successors_mutex_;

  output_type                        cached_value_;
  bool                               has_cached_value_;
  std::mutex                         cached_value_mutex_;
  std::condition_variable            cached_value_cond_;

  std::thread                        thread_;
public:

  template<typename Body>
  thread_bound_source(tbb::flow::graph& g, Body&& body)
    : graph_node(g)
    , body_(std::forward<Body>(body))
    , has_cached_value_(false)
    , thread_([this]{ run(); })
  {
     my_graph.increment_wait_count();
  }

  ~thread_bound_source()
  {
    thread_.join();
  }

  virtual bool register_successor( successor_type &r )
  {
    bool msg = false;
    {
      std::lock_guard<std::mutex> cached_value_lock(cached_value_mutex_);

      if (has_cached_value_)
      {
        if(r.try_put(cached_value_))
        {
          has_cached_value_ = false;
          msg = true;
        }
        else
        {
          if (r.register_predecessor(*this))
            return false;
          else
            assert(false); // This is some wierdness in the flow api. Not sure how to handle.
        }
      }
    }

    if (msg)
      cached_value_cond_.notify_one();

    tbb::spin_mutex::scoped_lock lock(successors_mutex_);

    successors_.push_back(&r);
  }

  virtual bool remove_successor( successor_type &r )
  {
    std::lock_guard<s<tbb::spin_mutex> successors_lock(successors_mutex_);
    auto it = std::find(successors_.begin(), successors_.end(), &r);
    successors_.erase(it);
  }

  virtual bool try_get( output_type &v )
  {
    bool msg = false;
    {
      std::lock_guard<std::mutex> cached_value_lock(cached_value_mutex_);

      if (has_cached_value_)
      {
        v = cached_value_;
        has_cached_value_ = false;
        msg = true;
      }
    }
   
    if (!msg)
       return false;

    cached_value_cond_.notify_one();

    return true;
  }
private:

  void run()
  {
    std::unique_lock<std::mutex> cached_value_lock(cached_value_mutex_);

    while (true)
    {
      while (has_cached_value_ && !my_graph.is_cancelled())
        cached_value_cond_.wait(cached_value_lock);

      if (my_graph.is_cancelled() || !body_(cached_value_))
        break;

      has_cached_value_ = true;

      std::lock_guard<s<tbb::spin_mutex> successors_lock(successors_mutex_);

      auto it = successors_.begin();
      while (it != successors_.end())
      {
        if ((*it)->try_put(cached_value_))
        {
          has_cached_value_ = false;
          break;
        }
        else if ((*it)->register_predecessor(*this))
        {
          it = successors_.erase(it);
        }
        else
        {
          assert(false); // This is some wierdness in the flow api. Not sure how to handle.
        }
      }
    }
    my_graph.decrement_wait_count();
  })
};

 

0 Kudos
Sensei_S_
Beginner
1,028 Views

Thanks Raf!

Raf Schietekat wrote:

Don't substitute your own queue between stages of a pipeline!

I don't understand, I thought I just pushed and popped items, I thought it was safe.

Raf Schietekat wrote:

Usually it's Jim Dempsey who has some advice about the right number of tokens; I don't. ;-)

Just capitalising seems to be too little CPU-bound work for this pipeline to matter.

Hmm, strange that an ampersand operator was chosen to string those stages/filters together...

Well, the capitalization is just an example, what I need to do in reality is an optimization algorithm that takes O(n^3) time :)

Yes, I found that ampersand weird too.

Thank you!

0 Kudos
jimdempseyatthecove
Honored Contributor III
1,028 Views

For the optimal number of tokens you will have to run tests. A good starting point is to provide a number of tokens that keep all compute threads busy. If I/O is performed, then the number of extra tokens will depend on latencies and I/O interference by the input stage against the output stage (plus O/S if it is trying to do you favors with read ahead and buffered writes).

Start with Number of Computing threads +2 for input +2 for output.

RE: non-TBB thread verses TBB thread

In the situation where the entire application is the parallel_pipeline, then the simple solution is to oversubscribe by 2 threads.

In the situation where the parallel_pipeline runs part of the time then either use 2 non-TBB threads for I/O or:

Oversubscribe by 2 threads. Prior to parallel_pipeline (just after initializing TBB) launch a parallel construct (task) of your choice, where 2 threads wait on condition variable (Linux) or Event (Windows). Modify your pipeline input stage such that there is a one-time signal to the condition variable or event. And at the end of file condition the input stage can schedule the parallel construct (task) of your choice, where 2 threads wait on condition variable (Linux) or Event (Windows).

Jim Dempsey

0 Kudos
RafSchietekat
Valued Contributor III
1,028 Views

Sensei S. wrote:

I don't understand, I thought I just pushed and popped items, I thought it was safe.

Maybe you can get away with this now, because the queue capacity is higher than the number of tokens and because the extra overhead is dwarfed by other things going on, but it makes no sense whatsoever to second-guess the existing mechanism for no apparent reason.

0 Kudos
Reply