- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
[cpp]///////////////////////////////////////////// // Create pipeline stages as tbb::flow graph ///////////////////////////////////////////// tbb::flow::graph cbir; tbb::flow::source_nodeinput(cbir, Read( query_dir, &cnt_enqueue ) ); tbb::flow::function_node segmenter( cbir, tbb::flow::unlimited, SegmentImage() ); tbb::flow::function_node extracter( cbir, tbb::flow::unlimited, ExtractFeatures() ); tbb::flow::function_node querier( cbir, tbb::flow::unlimited, QueryIndex( ... ) ); tbb::flow::function_node ranker( cbir, tbb::flow::unlimited, RankCandidates( ... ) ); tbb::flow::function_node writer( cbir, 1, Write( ... ) ); /////////////////// // Chain up stages /////////////////// tbb::flow::make_edge(input, segmenter); tbb::flow::make_edge(segmenter, extracter); tbb::flow::make_edge(extracter, querier); tbb::flow::make_edge(querier, ranker); tbb::flow::make_edge(ranker, writer); //////////////// // Run pipeline //////////////// cbir.wait_for_all();[/cpp]
- Is function_node with queuing policy and flow::unlimited the right graph_node to mimic the behavior of a parallel tbb::filter?
- If I am using function_node correctly, then could someone suggest other possible sources of performance bottlenecks that I should investigate?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
A source_node will keep sending items as long as its successor accepts them. Since in this application, the Segment node has unlimited concurrency, it will always accept. So the source_node is free to pump an unlimited amount of items into the graph.
As Chris pointed out, one way to limit the amount of data in-flight in a flow graph is to introduce a limiter_node. In this case, a limiter_node could be inserted between the source_node and the "Segment" function_node. The threshold for the limiter_node could be set to the number of tokens used in the pipeline version. Lastly, the final node in the graph, "Output", could then be wired back to the "decrement" port of the limiter.This would allow a fixed number of items to be in flight in the graph at a time (equal to the same number allowed in the pipeline version). Whenever an item reaches the terminal node in the graph, "Output", the limiter_nodeis thensignaled to allow another item in to the graph.
Nick, if you have a chance to try this, please let us know if the limiter_node fixes the problem.
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
At first glance the code you wrote looks correct. You're passing pointers between the nodes of the graph, so copying data between nodes is not a problem.
The most-likely problem is too much parallelism. The pipeline limits the number of tasks by a token mechanism. flow::graph has an explicit node (the limiter node) to achieve the same result. If your source_node does not take much time, the parallelism in the intermediate stages will increase memory usage and slow down the graph.
Weare going to write a blog on this subject, and hope to get it posted this week. Thank you for using flow::graph, and please let us know your thoughts.
Regards,
Chris
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
A source_node will keep sending items as long as its successor accepts them. Since in this application, the Segment node has unlimited concurrency, it will always accept. So the source_node is free to pump an unlimited amount of items into the graph.
As Chris pointed out, one way to limit the amount of data in-flight in a flow graph is to introduce a limiter_node. In this case, a limiter_node could be inserted between the source_node and the "Segment" function_node. The threshold for the limiter_node could be set to the number of tokens used in the pipeline version. Lastly, the final node in the graph, "Output", could then be wired back to the "decrement" port of the limiter.This would allow a fixed number of items to be in flight in the graph at a time (equal to the same number allowed in the pipeline version). Whenever an item reaches the terminal node in the graph, "Output", the limiter_nodeis thensignaled to allow another item in to the graph.
Nick, if you have a chance to try this, please let us know if the limiter_node fixes the problem.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
- I re-ran the program on a different machine and discovered that it was not 4 times slower. That was a weird artifact from the way that I had checked it out from git. I checked a fresh copy of the code out again and the pthreads version and the tbb::flow::graph version perform within 10-20% difference. Sory for the false alarm.
- The limiter_node suggestion was useful. With it, I was able to more faithfully reproduced what the the tbb::pipeline version was doing. Using the limiter I was able to make the tbb::flow version perform comparably with the tbb::pipeline and pthreads version. I have only run it on my 4-core machine. I'll run it on a machine with 32 cores over the next few days to see if anything interesting comes up. Just to confirm if I'm doing this right (and for the benefit of others following this thread), this is the code that I used to wire it back to the "decrement" port of the limiter:
[cpp]// Declare a limiter_node tbb::flow::limiter_node
limiter(cbir, PIPELINE_MAX_TOKENS ); // Wire it between the input and segmenter graph_nodes tbb::flow::make_edge(input, limiter); tbb::flow::make_edge(limiter, segmenter); tbb::flow::make_edge(segmenter, extracter); // Inside the write node's operator() function ... limiter.decrement.try_put( tbb::flow::continue_msg() ); ...[/cpp] - The rejecting function_node suggestion was also useful. It did not make much difference in this case but it's something handy to know.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Nick,
I've just posted a blog article that talks about the limiter_node and other subtle differences betweena pipeline anda flow graph, you can find it here.
While you can explicitly put to the limiter_node as you do in your code, the blog article shows that you can also have your last stage output a continue_msg and do a make_edge between that node and the decrement in the limiter_node. Either approach will work.
As for your last question... We are working on debugging/diagnostic tools for use with the flow graph, but currently there's nothing that will give you the level of detail that you're looking for. In your particular application, I think the limiter_node should be sufficient and the rest of your parallel nodes can just be set to unlimited concurrency, since that will mimic what the pipeline version is doing. The limiter_nodewill keep the number of tasks to atmost one per item in the graph.
And as you work with the flow graph, if you see the need for other performance-related metrics, please let us know.
Thanks!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Also, what is the latest situation related to scheduling? Fairness sounds fabulous, but only to somebody uninitiated to the performance benefits of the opposite, and I'm still wondering about composability.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am following the example in Michael's blog post using the limiter_node, but I am finding that the graph stops processing events. I've removed any unessencial nodes and narrowed it down to the limiter_node having an issue. It appears to dead lock when the concurrency parameter in the output node is set to anything other than unlimited. I am using 4.1 update 4 on MacOS mountain lion.
My setup:
message_limiter limiter(_graph,16);
source_node<msg*> source(_graph, SourceNodeBody(_source), false);
function_node<msg*,msg*> transform(_graph, unlimited, TransformNodeBody());
function_node<msg*,continue_msg> output(_graph, unlimited /*set this to serial, and deadlock*/, SinkNodeBody(_handler));
make_edge(source, limiter);
make_edge(limiter, transform);
make_edge(transform, output);
make_edge(output, limiter.decrement);
source.activate();
_graph.wait_for_all();
Any ideas?
Thanks,
Alessandro
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello Alessandro,
I hope I understood your description. I tried making a toy program that just passes floats from stage to stage but otherwise is similar to the sketch you outlined. If you set DEBUG_OUTPUT to 1 you will see each of the nodes are executed for all the values. I am attaching the program. (I am using TBB 4.1 update 4)
Can you give me more details about the failure you are seeing?
Regards,
Chris
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page