Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2464 Discussions

flow::buffer_node to multiple input of single join

Marc_R_5
Beginner
432 Views

Hello, 

I know it looks strange, but in the use case of ressources management (the tbb::flow::buffer_node holds pre-allocated ressources), it would be usefull to capture multiple in a join.

Example of the strange thing I tried to do and is not working is below.  Any idea how to implement something like this without having to create multiple buffers?

#include <iostream>
#include "tbb/flow_graph.h"

using namespace std;
using namespace tbb;
using namespace tbb::flow;

int main(int argc, char *argv[])
{
    graph g;

    buffer_node<int> buff1(g);
    for (int i = 0; i < 3; ++i)
    {
        buff1.try_put(i);
    }

    using input_t = tuple<int, int, int>;
    using joinNode_t = join_node< input_t >;
    using funcNode_t = function_node< input_t, int>;
    joinNode_t j(g);

    funcNode_t f1(g, 0, [](input_t i)->int
    {
        cout << "f1[" << get<0>(i) << "," << get<1>(i) << "," << get<2>(i) << "]\n";
        return get<0>(i)+get<1>(i)+get<2>(i);
    });

    // this is not working
    make_edge(buff1, input_port<0>(j));
    make_edge(buff1, input_port<1>(j));
    make_edge(buff1, input_port<2>(j));
    make_edge(j, f1);

    g.wait_for_all();
}

 

0 Kudos
2 Replies
Christophe_H_Intel
432 Views

Hi, Marc,

There are two things I notice about your program:

  1. As soon as you attach two nodes together, a task is spawned to execute the first node.  So when you attach a buffer_node to the input of a queueing join_node, a task will forward all messages in the buffer to the port you attached to.  The buffer may be empty by the time you attach the second port of the join_node.
  2. There are two different forwarding policies in nodes: broadcast-push and single-push.  Most nodes broadcast their messages, but buffering nodes will forward each message to only one successor.  So even if the timing of attaching the buffer_node to the join_node weren't a problem, you would not get the result you expect.

Remember that queueing join_nodes buffer their input until a successor is available, so, depending on your needs, you can replace the buffer_node with a broadcast_node:

 

    broadcast_node<int> broad1(g);   // change to broadcast here

    using input_t = tuple<int, int, int>;
    using joinNode_t = join_node< input_t >;
    using funcNode_t = function_node< input_t, int>;
    joinNode_t j(g);
 
    funcNode_t f1(g, 0, [](input_t i)->int
    {
        cout << "f1[" << get<0>(i) << "," << get<1>(i) << "," << get<2>(i) << "]\n";
        return get<0>(i)+get<1>(i)+get<2>(i);
    });
 
    // build the graph before putting any messages to it
    make_edge(broad1, input_port<0>(j));
    make_edge(broad1, input_port<1>(j));
    make_edge(broad1, input_port<2>(j));
    make_edge(j, f1);
 
 // now put the messages, which will be forwarded to all inputs of the join_node
    for (int i = 0; i < 3; ++i)
    {
        broad1.try_put(i);
    }
 
    g.wait_for_all();

 

Remember the queueing join_node does not try to match any aspect of its inputs, it just puts the first message it received at each port together into a tuple and forwards it (if possible) to its successor.  If you are managing resources in a flow::graph, you might look at Mike Voss's feature recognition blog.  (The example uses a tag-matching join_node to manage the use of a finite set of buffers.)

0 Kudos
Christophe_H_Intel
432 Views

Sorry, the "(round-robin)" part of the post above was a placeholder until I found the "official" name we have for the message-forwarding policy of buffering nodes.  You can look at  this section of the TBB User Guide for a bit of an explanation of it.

0 Kudos
Reply