[cpp]///////////////////////////////////////////// // Create pipeline stages as tbb::flow graph ///////////////////////////////////////////// tbb::flow::graph cbir; tbb::flow::source_node
input(cbir, Read( query_dir, &cnt_enqueue ) ); tbb::flow::function_node segmenter( cbir, tbb::flow::unlimited, SegmentImage() ); tbb::flow::function_node extracter( cbir, tbb::flow::unlimited, ExtractFeatures() ); tbb::flow::function_node querier( cbir, tbb::flow::unlimited, QueryIndex( ... ) ); tbb::flow::function_node ranker( cbir, tbb::flow::unlimited, RankCandidates( ... ) ); tbb::flow::function_node writer( cbir, 1, Write( ... ) ); /////////////////// // Chain up stages /////////////////// tbb::flow::make_edge(input, segmenter); tbb::flow::make_edge(segmenter, extracter); tbb::flow::make_edge(extracter, querier); tbb::flow::make_edge(querier, ranker); tbb::flow::make_edge(ranker, writer); //////////////// // Run pipeline //////////////// cbir.wait_for_all();[/cpp]
[cpp]// Declare a limiter_node tbb::flow::limiter_node
limiter(cbir, PIPELINE_MAX_TOKENS ); // Wire it between the input and segmenter graph_nodes tbb::flow::make_edge(input, limiter); tbb::flow::make_edge(limiter, segmenter); tbb::flow::make_edge(segmenter, extracter); // Inside the write node's operator() function ... limiter.decrement.try_put( tbb::flow::continue_msg() ); ...[/cpp]
I've just posted a blog article that talks about the limiter_node and other subtle differences betweena pipeline anda flow graph, you can find it here.
While you can explicitly put to the limiter_node as you do in your code, the blog article shows that you can also have your last stage output a continue_msg and do a make_edge between that node and the decrement in the limiter_node. Either approach will work.
As for your last question... We are working on debugging/diagnostic tools for use with the flow graph, but currently there's nothing that will give you the level of detail that you're looking for. In your particular application, I think the limiter_node should be sufficient and the rest of your parallel nodes can just be set to unlimited concurrency, since that will mimic what the pipeline version is doing. The limiter_nodewill keep the number of tasks to atmost one per item in the graph.
And as you work with the flow graph, if you see the need for other performance-related metrics, please let us know.
I am following the example in Michael's blog post using the limiter_node, but I am finding that the graph stops processing events. I've removed any unessencial nodes and narrowed it down to the limiter_node having an issue. It appears to dead lock when the concurrency parameter in the output node is set to anything other than unlimited. I am using 4.1 update 4 on MacOS mountain lion.
source_node<msg*> source(_graph, SourceNodeBody(_source), false);
function_node<msg*,msg*> transform(_graph, unlimited, TransformNodeBody());
function_node<msg*,continue_msg> output(_graph, unlimited /*set this to serial, and deadlock*/, SinkNodeBody(_handler));
I hope I understood your description. I tried making a toy program that just passes floats from stage to stage but otherwise is similar to the sketch you outlined. If you set DEBUG_OUTPUT to 1 you will see each of the nodes are executed for all the values. I am attaching the program. (I am using TBB 4.1 update 4)
Can you give me more details about the failure you are seeing?