- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello
I am experimenting with the sequencer_node. The application is pretty simple: events come in from a data source, they are shipped to function(s) nodes in parallel, then sequenced as they need to reach the output in order.
My issue is that the sequencer_node does not appear to pass events to the output node I have. If I remove the node from the graph, the output gets triggered.
Here's a code sample:
#include "tbb/flow_graph.h"
#include <atomic>
std::atomic< long > counter;
using namespace tbb::flow;
struct my_struct{
int x;
int y;
};
void flow() {
graph g;
// this node acts as a producer to successors in the graph
broadcast_node<my_struct> input(g);
function_node<my_struct,my_struct> processor( g, unlimited, []( const my_struct& v ){ v.y = 2*v.x; return v; } );
sequencer_node<my_struct> sequencer(g, []( const my_struct& t )->size_t{ return t.x; });
function_node<my_struct> output( g, serial,
[]( const my_struct& t ){
std::atomic_fetch_add( &counter, 1L );
if( counter %1000000 == 0 )
std::cout << counter << std::endl;
});
make_edge( input, processor );
make_edge( processor, sequencer );
make_edge( sequencer, output );
int i =0;
while ( i < 20000000 ){
i++;
my_struct s;
s.x = i;
std::atomic_fetch_add( &counter, 1L );
input.try_put(s);
}
g.wait_for_all();
return 0;
}
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am sorry, the lambda for processor should be: []( my_struct v ){ v.y = 2*v.x; return v; }
I had a global function before and changed it before compiling, my bad...
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello,
The functor for the sequence_node must return a dense set of ordinals starting at 0. If any values are missing the node will not forward any more items. So your definition of the sequencer_node should be:
sequencer_node<my_struct> sequencer(g, []( const my_struct& t )->size_t{ return t.x - 1; });
because the x field starts at 1.
I made some changes to the test case:
#include <tbb/flow_graph.h> // #include <atomic> // the machine I was using didn't have <atomic>, so I used our implementation #include "tbb/atomic.h" #include <iostream> tbb::atomic< long > counter; using namespace tbb::flow; static const int nitems = 2000; struct my_struct{ int x; int y; }; class MyFunctor { // keep track of the last value we've seen, to detect out-of-order reception of items. // I made this a reference because I wanted to examine the last value processed by the // output function_node, but we copy the bodies of the functors. I could have used // copy_body<MyFunctor>(output) to fetch the last value instead. (We have to specify the // type of the body in copy_body<> because it can't be inferred from the type of output.) long &last_cnt; public: MyFunctor(long &_last_cnt) : last_cnt(_last_cnt) { last_cnt = 0; } void operator()( my_struct t ) { // std::cout << "mmmm"; ++last_cnt; if ( t.x != last_cnt) std::cout << "Bad sequence; expected " << last_cnt << ", got " << t.x << std::endl; // (void)counter.fetch_and_add( 1L ); // not sure what this test is for. If you'd like to have each value of // counter without races, the increment should look like // long old_counter = counter.fetch_and_add(); // otherwise the references below are in a race with the increment in the main program. //if( counter %1 == 0 ) // std::cout << t.x << " " << counter << std::endl; } }; class MySequencer { public: size_t operator()( const my_struct& t ) { //std::cout << "t.x = " << t.x << std::endl; // The sequence IDs must start at 0 and all values must appear. // Any "holes" in the sequence will stop the graph at the first hole. // the values of the x field start at 1 return (size_t)t.x - 1; } }; // I just split the lambda into a struct struct my_parallel_functor { my_struct operator()(const my_struct &in) { my_struct out; out.y = 2*in.x; out.x = in.x; return out; } }; int main() { graph g; // this node acts as a producer to successors in the graph broadcast_node<my_struct> input(g); function_node<my_struct,my_struct> processor( g, unlimited, my_parallel_functor() ); MySequencer seq; sequencer_node<my_struct> sequencer(g, seq); long my_last = 0; MyFunctor fc(my_last); function_node<my_struct> output( g, serial, fc ); make_edge( input, processor ); make_edge( processor, sequencer ); make_edge( sequencer, output ); int i = 0; while ( i < nitems ){ i++; my_struct s; s.x = i; (void) counter.fetch_and_add( 1L ); input.try_put(s); } g.wait_for_all(); std::cout << "Finished( "; if (my_last == nitems) std::cout << "succeeded"; else std::cout << "failed"; std::cout << " ) my_last == " << my_last << "\n"; //getchar(); return 0; }
Regards,
Chris
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page