Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2477 Discussions

Expressing parallel tbb::filter using tbb::flow::function_node?

Nick_Chen
Beginner
1,483 Views
I'm new to thetbb::flow graph and am trying to learn how to use the different nodes. As part of the learning process, I'm trying to see what is the equivalent of the following program that I have written using tbb::pipeline in tbb:Lflow.


The program is FERRET, a content-based image retrieval system from the PARSEC benchmark. The image below illustrates the different stages. The first and last stages are serial; the remaining stages are parallel. I've made each stage extend tbb::filter and marked it as serial/parallel as appropriate.





I then tried to implement it using tbb::flow and the different graph_nodes available. The first stage is a source_node (serial by default); the middle stages are function_nodes with queuing policy and flow::unlimited concurrency; the last stage is a function_node with 1 as the concurrency limit (serial).Here's the pseudo-code:


[cpp]/////////////////////////////////////////////
// Create pipeline stages as tbb::flow graph
/////////////////////////////////////////////
tbb::flow::graph cbir;

tbb::flow::source_node input(cbir, Read( query_dir, &cnt_enqueue ) );
tbb::flow::function_node segmenter( cbir, tbb::flow::unlimited, SegmentImage() );
tbb::flow::function_node extracter( cbir, tbb::flow::unlimited, ExtractFeatures() );
tbb::flow::function_node querier( cbir, tbb::flow::unlimited, QueryIndex( ... ) );
tbb::flow::function_node ranker( cbir, tbb::flow::unlimited, RankCandidates( ... ) );
tbb::flow::function_node writer( cbir, 1, Write( ... ) );


///////////////////
// Chain up stages 
///////////////////

tbb::flow::make_edge(input, segmenter);
tbb::flow::make_edge(segmenter, extracter);
tbb::flow::make_edge(extracter, querier);
tbb::flow::make_edge(querier, ranker);
tbb::flow::make_edge(ranker, writer);

////////////////
// Run pipeline
////////////////

cbir.wait_for_all();[/cpp]


When I run the program, it behaves correctly. However, it is much slower (about 4 times slower on a 4-core machine) than the pthreads and tbb::pipeline version. I suspect that I must be doing something wrong i.e. I am not configuring the middle stages as parallel stages correctly. I would appreciate advice on the following:

  1. Is function_node with queuing policy and flow::unlimited the right graph_node to mimic the behavior of a parallel tbb::filter?
  2. If I am using function_node correctly, then could someone suggest other possible sources of performance bottlenecks that I should investigate?
As mentioned, I'm new to tbb::flow so it's quite possible that I'm missing something obvious.


Thanks!
0 Kudos
1 Solution
Michael_V_Intel
Employee
1,483 Views
I agree that using a "rejecting" function_node might have slightly lower overhead, but I think the issue raised by Chris is likely the mostdominant one.

A source_node will keep sending items as long as its successor accepts them. Since in this application, the Segment node has unlimited concurrency, it will always accept. So the source_node is free to pump an unlimited amount of items into the graph.

As Chris pointed out, one way to limit the amount of data in-flight in a flow graph is to introduce a limiter_node. In this case, a limiter_node could be inserted between the source_node and the "Segment" function_node. The threshold for the limiter_node could be set to the number of tokens used in the pipeline version. Lastly, the final node in the graph, "Output", could then be wired back to the "decrement" port of the limiter.This would allow a fixed number of items to be in flight in the graph at a time (equal to the same number allowed in the pipeline version). Whenever an item reaches the terminal node in the graph, "Output", the limiter_nodeis thensignaled to allow another item in to the graph.

Nick, if you have a chance to try this, please let us know if the limiter_node fixes the problem.

View solution in original post

0 Kudos
8 Replies
Christophe_H_Intel
1,483 Views
Hello, Nick,

At first glance the code you wrote looks correct. You're passing pointers between the nodes of the graph, so copying data between nodes is not a problem.

The most-likely problem is too much parallelism. The pipeline limits the number of tasks by a token mechanism. flow::graph has an explicit node (the limiter node) to achieve the same result. If your source_node does not take much time, the parallelism in the intermediate stages will increase memory usage and slow down the graph.

Weare going to write a blog on this subject, and hope to get it posted this week. Thank you for using flow::graph, and please let us know your thoughts.

Regards,
Chris
0 Kudos
Terry_W_Intel
Employee
1,483 Views
Another thing you might want to try is not using the "queueing" function_node for those nodes with unlimited concurrency, use "rejecting" function_node instead. The unlimited concurrency means that the function_node will always accept messages and handle them, so no queue should be needed, and it may be adding extra overhead.
0 Kudos
Michael_V_Intel
Employee
1,484 Views
I agree that using a "rejecting" function_node might have slightly lower overhead, but I think the issue raised by Chris is likely the mostdominant one.

A source_node will keep sending items as long as its successor accepts them. Since in this application, the Segment node has unlimited concurrency, it will always accept. So the source_node is free to pump an unlimited amount of items into the graph.

As Chris pointed out, one way to limit the amount of data in-flight in a flow graph is to introduce a limiter_node. In this case, a limiter_node could be inserted between the source_node and the "Segment" function_node. The threshold for the limiter_node could be set to the number of tokens used in the pipeline version. Lastly, the final node in the graph, "Output", could then be wired back to the "decrement" port of the limiter.This would allow a fixed number of items to be in flight in the graph at a time (equal to the same number allowed in the pipeline version). Whenever an item reaches the terminal node in the graph, "Output", the limiter_nodeis thensignaled to allow another item in to the graph.

Nick, if you have a chance to try this, please let us know if the limiter_node fixes the problem.
0 Kudos
Nick_Chen
Beginner
1,483 Views
Thanks Christopher, Terry and Michael for the suggestions.


  1. I re-ran the program on a different machine and discovered that it was not 4 times slower. That was a weird artifact from the way that I had checked it out from git. I checked a fresh copy of the code out again and the pthreads version and the tbb::flow::graph version perform within 10-20% difference. Sory for the false alarm.
  2. The limiter_node suggestion was useful. With it, I was able to more faithfully reproduced what the the tbb::pipeline version was doing. Using the limiter I was able to make the tbb::flow version perform comparably with the tbb::pipeline and pthreads version. I have only run it on my 4-core machine. I'll run it on a machine with 32 cores over the next few days to see if anything interesting comes up. Just to confirm if I'm doing this right (and for the benefit of others following this thread), this is the code that I used to wire it back to the "decrement" port of the limiter:
    [cpp]// Declare a limiter_node
    tbb::flow::limiter_node limiter(cbir, PIPELINE_MAX_TOKENS );
    
    // Wire it between the input and segmenter graph_nodes
    tbb::flow::make_edge(input, limiter);                                                                                                                
    tbb::flow::make_edge(limiter, segmenter);                                                                                                            
    tbb::flow::make_edge(segmenter, extracter);  
    
    // Inside the write node's operator() function
    ...
    limiter.decrement.try_put( tbb::flow::continue_msg() ); 
    ...[/cpp]
  3. The rejecting function_node suggestion was also useful. It did not make much difference in this case but it's something handy to know.


I have another related question: Given that it's hard to tune the values for the concurrency limit of each graph_node, are there any facilities for outputting more debugging/diagnostics messages about what the tbb::flow graph implementation is doing? For instance, when it spawns a new task, or when it switches between push/pull modes? Also, are any of these diagnostics information integrated through the other Intel tools like Parallel Amplifier or Parallel Advisor?


Thanks!
0 Kudos
Michael_V_Intel
Employee
1,483 Views

Hi Nick,

I've just posted a blog article that talks about the limiter_node and other subtle differences betweena pipeline anda flow graph, you can find it here.

While you can explicitly put to the limiter_node as you do in your code, the blog article shows that you can also have your last stage output a continue_msg and do a make_edge between that node and the decrement in the limiter_node. Either approach will work.

As for your last question... We are working on debugging/diagnostic tools for use with the flow graph, but currently there's nothing that will give you the level of detail that you're looking for. In your particular application, I think the limiter_node should be sufficient and the rest of your parallel nodes can just be set to unlimited concurrency, since that will mimic what the pipeline version is doing. The limiter_nodewill keep the number of tasks to atmost one per item in the graph.

And as you work with the flow graph, if you see the need for other performance-related metrics, please let us know.
Thanks!

0 Kudos
RafSchietekat
Valued Contributor III
1,483 Views
The example problem has a long succession of parallel filters. In a tbb::pipeline, these would execute all in the same task, with only a check for cancellation in-between (last time I looked, and unless I missed something). How efficient, compared to a pipeline, is a flow graph, and should users care to choose pipeline over flow graph for such a situation?

Also, what is the latest situation related to scheduling? Fairness sounds fabulous, but only to somebody uninitiated to the performance benefits of the opposite, and I'm still wondering about composability.
0 Kudos
abellina
Beginner
1,483 Views

I am following the example in Michael's blog post using the limiter_node, but I am finding that the graph stops processing events. I've removed any unessencial nodes and narrowed it down to the limiter_node having an issue. It appears to dead lock when the concurrency parameter in the output node is set to anything other than unlimited. I am using 4.1 update 4 on MacOS mountain lion.

My setup:

message_limiter limiter(_graph,16);
source_node<msg*> source(_graph, SourceNodeBody(_source), false);
function_node<msg*,msg*> transform(_graph, unlimited, TransformNodeBody());
function_node<msg*,continue_msg> output(_graph, unlimited /*set this to serial, and deadlock*/, SinkNodeBody(_handler));

make_edge(source, limiter);
make_edge(limiter, transform);
make_edge(transform, output);
make_edge(output, limiter.decrement);

source.activate();
_graph.wait_for_all();

Any ideas?

Thanks,

Alessandro

0 Kudos
Christophe_H_Intel
1,483 Views

Hello Alessandro,

I hope I understood your description.  I tried making a toy program that just passes floats from stage to stage but otherwise is similar to the sketch you outlined.  If you set DEBUG_OUTPUT to 1 you will see each of the nodes are executed for all the values.  I am attaching the program.  (I am using TBB 4.1 update 4)

Can you give me more details about the failure you are seeing?

Regards,
Chris

0 Kudos
Reply