Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

tbb flow sequencer_node

abellina
Beginner
413 Views

Hello
I am experimenting with the sequencer_node. The application is pretty simple: events come in from a data source, they are shipped to function(s) nodes in parallel, then sequenced as they need to reach the output in order.

My issue is that the sequencer_node does not appear to pass events to the output node I have. If I remove the node from the graph, the output gets triggered.

Here's a code sample:

#include "tbb/flow_graph.h"
#include <atomic>

std::atomic< long > counter;
using namespace tbb::flow;

struct my_struct{
   int x;
   int y;
};

void flow() {

   graph g;

   // this node acts as a producer to successors in the graph
   broadcast_node<my_struct> input(g);
   function_node<my_struct,my_struct> processor( g, unlimited, []( const my_struct& v ){ v.y = 2*v.x; return v; } );
   sequencer_node<my_struct> sequencer(g, []( const my_struct& t )->size_t{ return t.x; });
   function_node<my_struct> output( g, serial,
       []( const my_struct& t ){
           std::atomic_fetch_add( &counter, 1L );
           if( counter %1000000 == 0 )
           std::cout << counter << std::endl;
       });
 
   make_edge( input, processor );
   make_edge( processor, sequencer );
   make_edge( sequencer, output );
   int i =0;

   while ( i < 20000000 ){
       i++; 
       my_struct s;
       s.x = i;
       std::atomic_fetch_add( &counter, 1L );
       input.try_put(s);
   }
   g.wait_for_all();

return 0;
}

0 Kudos
2 Replies
abellina
Beginner
413 Views

I am sorry, the lambda for processor should be:  []( my_struct v ){ v.y = 2*v.x; return v; }

I had a global function before and changed it before compiling, my bad...

0 Kudos
Christophe_H_Intel
413 Views

Hello,

The functor for the sequence_node must return a dense set of ordinals starting at 0.  If any values are missing the node will not forward any more items.  So your definition of the sequencer_node should be:

sequencer_node<my_struct> sequencer(g, []( const my_struct& t )->size_t{ return t.x - 1; });

because the x field starts at 1.

I made some changes to the test case:

#include <tbb/flow_graph.h>
// #include <atomic>
// the machine I was using didn't have <atomic>, so I used our implementation
#include "tbb/atomic.h"
#include <iostream>

tbb::atomic< long > counter;
using namespace tbb::flow;

static const int nitems = 2000;

struct my_struct{
   int x;
   int y;
};

class MyFunctor {
    // keep track of the last value we've seen, to detect out-of-order reception of items.
    // I made this a reference because I wanted to examine the last value processed by the
    // output function_node, but we copy the bodies of the functors.  I could have used
    // copy_body<MyFunctor>(output) to fetch the last value instead.  (We have to specify the
    // type of the body in copy_body<> because it can't be inferred from the type of output.)
    long &last_cnt;
public:
    MyFunctor(long &_last_cnt) : last_cnt(_last_cnt) { last_cnt = 0; }
    void operator()( my_struct t ) {
       // std::cout << "mmmm";
       ++last_cnt;
       if ( t.x != last_cnt) std::cout << "Bad sequence; expected " << last_cnt << ", got " << t.x << std::endl;
       // (void)counter.fetch_and_add( 1L );

       // not sure what this test is for.  If you'd like to have each value of
       // counter without races, the increment should look like

       //     long old_counter = counter.fetch_and_add();

       // otherwise the references below are in a race with the increment in the main program.
       //if( counter %1 == 0 )
           // std::cout << t.x << " " << counter << std::endl;
    }
};

class MySequencer {
public:
    size_t operator()( const my_struct& t ) {
        //std::cout << "t.x = " << t.x << std::endl;
        // The sequence IDs must start at 0 and all values must appear.
        // Any "holes" in the sequence will stop the graph at the first hole.
        // the values of the x field start at 1
        return (size_t)t.x - 1;
    }
};

// I just split the lambda into a struct
struct my_parallel_functor {
    my_struct operator()(const my_struct &in) {
        my_struct out;
        out.y = 2*in.x;
        out.x = in.x;
        return out;
    }
};

int main() {

   graph g;

   // this node acts as a producer to successors in the graph
   broadcast_node<my_struct> input(g);
   function_node<my_struct,my_struct> processor( g, unlimited,  my_parallel_functor() );
   MySequencer seq;
   sequencer_node<my_struct> sequencer(g, seq);
   long my_last = 0;
   MyFunctor fc(my_last);
   function_node<my_struct> output( g, serial, fc );
 
   make_edge( input, processor );
   make_edge( processor, sequencer );
   make_edge( sequencer, output );
   int i = 0;

   while ( i < nitems ){
       i++; 
       my_struct s;
       s.x = i;
       (void) counter.fetch_and_add(  1L ); 
       input.try_put(s);
   }

   g.wait_for_all();

   std::cout << "Finished( ";
   if (my_last == nitems) std::cout << "succeeded";
   else std::cout << "failed";
   std::cout << " ) my_last == " << my_last << "\n";
   //getchar();

return 0;
}

Regards,

Chris

0 Kudos
Reply