Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
Announcements
This community is designed for sharing of public information. Please do not share Intel or third-party confidential information here.

tbb flow sequencer_node

abellina
Beginner
155 Views

Hello
I am experimenting with the sequencer_node. The application is pretty simple: events come in from a data source, they are shipped to function(s) nodes in parallel, then sequenced as they need to reach the output in order.

My issue is that the sequencer_node does not appear to pass events to the output node I have. If I remove the node from the graph, the output gets triggered.

Here's a code sample:

#include "tbb/flow_graph.h"
#include <atomic>

std::atomic< long > counter;
using namespace tbb::flow;

struct my_struct{
   int x;
   int y;
};

void flow() {

   graph g;

   // this node acts as a producer to successors in the graph
   broadcast_node<my_struct> input(g);
   function_node<my_struct,my_struct> processor( g, unlimited, []( const my_struct& v ){ v.y = 2*v.x; return v; } );
   sequencer_node<my_struct> sequencer(g, []( const my_struct& t )->size_t{ return t.x; });
   function_node<my_struct> output( g, serial,
       []( const my_struct& t ){
           std::atomic_fetch_add( &counter, 1L );
           if( counter %1000000 == 0 )
           std::cout << counter << std::endl;
       });
 
   make_edge( input, processor );
   make_edge( processor, sequencer );
   make_edge( sequencer, output );
   int i =0;

   while ( i < 20000000 ){
       i++; 
       my_struct s;
       s.x = i;
       std::atomic_fetch_add( &counter, 1L );
       input.try_put(s);
   }
   g.wait_for_all();

return 0;
}

0 Kudos
2 Replies
abellina
Beginner
155 Views

I am sorry, the lambda for processor should be:  []( my_struct v ){ v.y = 2*v.x; return v; }

I had a global function before and changed it before compiling, my bad...

Christophe_H_Intel
155 Views

Hello,

The functor for the sequence_node must return a dense set of ordinals starting at 0.  If any values are missing the node will not forward any more items.  So your definition of the sequencer_node should be:

sequencer_node<my_struct> sequencer(g, []( const my_struct& t )->size_t{ return t.x - 1; });

because the x field starts at 1.

I made some changes to the test case:

#include <tbb/flow_graph.h>
// #include <atomic>
// the machine I was using didn't have <atomic>, so I used our implementation
#include "tbb/atomic.h"
#include <iostream>

tbb::atomic< long > counter;
using namespace tbb::flow;

static const int nitems = 2000;

struct my_struct{
   int x;
   int y;
};

class MyFunctor {
    // keep track of the last value we've seen, to detect out-of-order reception of items.
    // I made this a reference because I wanted to examine the last value processed by the
    // output function_node, but we copy the bodies of the functors.  I could have used
    // copy_body<MyFunctor>(output) to fetch the last value instead.  (We have to specify the
    // type of the body in copy_body<> because it can't be inferred from the type of output.)
    long &last_cnt;
public:
    MyFunctor(long &_last_cnt) : last_cnt(_last_cnt) { last_cnt = 0; }
    void operator()( my_struct t ) {
       // std::cout << "mmmm";
       ++last_cnt;
       if ( t.x != last_cnt) std::cout << "Bad sequence; expected " << last_cnt << ", got " << t.x << std::endl;
       // (void)counter.fetch_and_add( 1L );

       // not sure what this test is for.  If you'd like to have each value of
       // counter without races, the increment should look like

       //     long old_counter = counter.fetch_and_add();

       // otherwise the references below are in a race with the increment in the main program.
       //if( counter %1 == 0 )
           // std::cout << t.x << " " << counter << std::endl;
    }
};

class MySequencer {
public:
    size_t operator()( const my_struct& t ) {
        //std::cout << "t.x = " << t.x << std::endl;
        // The sequence IDs must start at 0 and all values must appear.
        // Any "holes" in the sequence will stop the graph at the first hole.
        // the values of the x field start at 1
        return (size_t)t.x - 1;
    }
};

// I just split the lambda into a struct
struct my_parallel_functor {
    my_struct operator()(const my_struct &in) {
        my_struct out;
        out.y = 2*in.x;
        out.x = in.x;
        return out;
    }
};

int main() {

   graph g;

   // this node acts as a producer to successors in the graph
   broadcast_node<my_struct> input(g);
   function_node<my_struct,my_struct> processor( g, unlimited,  my_parallel_functor() );
   MySequencer seq;
   sequencer_node<my_struct> sequencer(g, seq);
   long my_last = 0;
   MyFunctor fc(my_last);
   function_node<my_struct> output( g, serial, fc );
 
   make_edge( input, processor );
   make_edge( processor, sequencer );
   make_edge( sequencer, output );
   int i = 0;

   while ( i < nitems ){
       i++; 
       my_struct s;
       s.x = i;
       (void) counter.fetch_and_add(  1L ); 
       input.try_put(s);
   }

   g.wait_for_all();

   std::cout << "Finished( ";
   if (my_last == nitems) std::cout << "succeeded";
   else std::cout << "failed";
   std::cout << " ) my_last == " << my_last << "\n";
   //getchar();

return 0;
}

Regards,

Chris

Reply