Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2442 Discussions

Request for a Runtime or Dynamic join_node

I am using flow_graph to build dependency trees to the evaluation of lots of nested functions.

To do this I use a templated framework that automatically creates and connects the following graph_nodes:

*continue_nodes for nullary functions
*function_nodes for unary functions
* join_nodes connected to function_nodes for functions with higher arity
For some of my functions, the arity is only known at runtime. While this can be done theoretically (using runtime polymorphism and runtime selection of the corresponding join_node), it can be quite a hustle.
Thats why I build my own runtime_join_node. Its a receiver and a sender>. The recieved messages will be packed into a vector and if a threshold is reached, the vector is broadcasted to all successors.
In order to save time I implemented this by inheriting from function_node> and overwriting the try_put. try_put uses a spin_mutex, and checks if the threashold has been reached. In this case it returns false. If not, the counter is increased and a the input is written to the output vector (there are several ways the position in the vector could be determined, e.g. similar to the sequence number for the sequencer_node). If the threashold has been reached by this last input, put the vector to all successors.
While this implementation required only 20 LOC and seems easy to maintain I am still not sure, if this really works the way it should.
Now I have three questions:
1. What are better ways to implement this runtime_join_node, which do not require to much programming?
2. What do you think of such a runtime_join_node?
3. The function_node, of course is more than needed. Would be safe to use the internal::function_output<..> instead of the whole function_node?
0 Kudos
10 Replies

You might usea multioutput_function_node to do something like this. A multioutput_function_node canhave one or more outputs andthe body can conditionally send to 0 or more of the outputs during each invocation of the body.Themultiple outputs aren't needed for this case, but the conditional puts are. So it would be possible to write to the vector and only put to the output when an incoming item completes thevector. I wrote a tiny example that does this, and I've include the code below.

Let me know if this is helpful, or if a custom node type would make more sense.

#include "tbb/flow_graph.h"


using namespace tbb;
using namespace tbb::flow;

typedef std::vector< std::pair > vector_t;
typedef multioutput_function_node< std::pair, std::tuple< vector_t > > runtime_join_node_t;

const int threshold = 4;

struct runtime_join_body {

vector_t my_vector;

void operator() ( const std::pair &input, runtime_join_node_t::output_ports_type &ports ) {
// Add item to vector

// Put it to the output if complete
if ( my_vector.size() == threshold ) {


struct print_body {
void operator() ( const vector_t &v ) {
printf("Starting vector dump:\n");
for ( vector_t::const_iterator i = v.begin(); i != v.end(); ++i ) {
printf("%d:%d\n", i->first, i->second);

int main() {
graph g;

runtime_join_node_t my_runtime_join( g, serial, runtime_join_body() );
function_node< vector_t, continue_msg > my_print_node( g, serial, print_body() );
make_edge( output_port<0>(my_runtime_join), my_print_node );

for (int i = 0; i < 3; ++i)
for (int j = 0; j < threshold; ++j)
my_runtime_join.try_put( std::make_pair(i,j) );


return 0;

Thanks you so much for your detailed answer!
I don't know if you believe me, but that was exactly what I was doing before. If you had read my question earlier, you would have seen exactly that solution. But I edited the post to reflect my new solution.

What I did not like about this multioutput_function_node, is that fact that for these three instructions a whole task has to be created every time a vector entry arives!

My current solution simply overwrites the funtion_nodes "try_get" to achieve the same(?) more efficiently:
//! Overwriting
		bool try_put( const input_type &t ) {

			for(uint i=0;i == t.first){

					//WRONG FUNCTIONALITY: Make sure, that vec->at(i) is only written once!
					// Otherwise the node is finished too early, as the "n" calls did not really fill every vector spot
					//!!!!! also make sure there are no duplicates in arg_map!"!!!!
						spin_mutex::scoped_lock lock(my_mutex);

							return false;					

							output_type output(vec);

						vec->at(i) = t.second; 
						return true;


			return false;
Sorry, but the forum's editor really sucks. I cant change my post!
Anyway, even if the code does not look as he should, do you get the idea?
What do you think of this design?
Or is the overhead implied by use of the multioutput_function_node neglectable?

Im a little confused by your code because you talk about overriding the try_get but then you provide code for try_put... However, you are correct that the multioutput_function_node will create a task as each item arrives, and this would incur extra overhead compared to overriding try_put. The trickiness in overriding these methods comes in making sure that you dont break the messaging protocol used by the flow graph ( ). Its not impossible to override these methods correctly but you need to understand how other nodes respond to a try_put or try_get. In the case of a function_node with a buffer at the input, its pretty simple. The try_put should always return true and consume the incoming item. So as long as you are doing that, you should be ok.

What Id prefer though is to better understand your problem and see if an extension to the API is warranted to handle this case. Ive already seen someone else do something very similar to what you are trying, so it looks like this may be a general problem.

Sorry for the confusion, as I said I cant change the code in the post due to the f**cked up editor. (Toggle HTML does not work anymore on this post).

I thought my question already contained some motivation, but if you need more:

My software computes the solution to differential variational inequalities with initial and boundary conditions. Its basically a Finite Difference Method but instead solving an equation that involves the solution at a previous timestep, an inequality involving the previous solution and a thrid field is solved. These third fields can be a lot of things. E.g. other solutions, explicitely given functions,.... Also the boundary conditions may depend on these fields, which makes the whole thing quite interdependent. This dependency also exhibits very corse-grained parallization possibilities, which I thought, are perfectly suited for the flow_graph building an acyclic dependency tree.

To get the most out of this framework I implement another abstraction layer on top of TBB's flow:

Mathematically the whole problem can be seen as the evaluation of a very extensively nested function:

The basic mathematical operation corresponding to each type of field can be thought of as functions f(x,y,z,...), g(x,y,z...), ...

Where the x,y,z... represent the (nuermically heavy) result from other functions.

The whole program does simply something like: result = f( g( h(), f( h(), h() ) ) )
So what I wanted was an abstraction of this basic numerical "function object" with an arbitrary number (including 0) of arguments.
Of course, every mathematical function "f_a(x,y,z)" will be a graph node body performing the required computations.
The abstraction is to provide templated node_factories. They need to know the number and types of x,y,z and provide the following factory function:
Given a matching body object (matchgin in the functoins signature) they will create a custom node_graph object implementing sender.
Besides the body a factory function needs to be passed the graph_nodes that will send the result for x,y,z. It uses them to automatically connect the node's senders to the correspoding receivers of the newly created node.
I implemented this using a auto_connecting_body template, which is templated on the number and types of function arguments. Bodies deriving from it, will have a static node factory function that has the desired properties.
The custom graph_node that they will create are:
* continue_nodes (for nullary function, with receivers connected to a root broadcasting_node)
* function_nodes (for unary functions, the sender of the required single argument will be connected to this nodes receiver))
* a custom node, implementing function_node with a join_node member (for functions with more than one arg. The senders of the multiple arguments will be connected to the join_node which will be connected to the function_node's receiver)

The result of this additional layer of factory functions: I can write expressions like the above ("result = f...") diretly like this in source code without having to worry about edges, graphs, and stuff. To start the whole computaton I call
Two lines of code! (+plus the bodies)

Now my problem is this: while the arity is more or less known at compile time there is one function, which takes 1,2 or 4 arguments depending on runtime values. Of course theoreitically it boils down to a runtime select.
case 1: return body<1>(...)
case 2: return body<2>(...)
case 3: return body<4>(...)

But I would have to write so much duplicated, redundant code, also at the side of the bodies!

One solution seemed the runtime_join_node, although is does not integrate too nicely in this scheme!

Thanks for your help,

Ok, I just had the followingidea: To get the scheme consistent I need a runtime_join_node, which should be to a join_node, what a std::vector is to a std::tuple!
In detail:
  1. TBB: The runtime_join_node would (in strict analogy to the join_node) have a vector of input_ports and the output would be vector. The rest stays exactly the same.
  2. My Framework: I would introduce some dummy wrapper template vector_arg to indicate that a function argument should be a vector of arguments. (to be passed to the auto_connecting_body templates).
  3. My Framework:The automatically created node factory function would take a vector> to create and connect a node. In a simple loop all senders are connected to the corresponding input_ports of the runtime_join_node. (This way also the assignement of inputs to certain positions in the vector is solved: The i-th position of the vector accesible by the function body, will contain the result of the i-th sender in the passed vector>. There is no need to pass around this information through bodies, or totatly get rid of the order in which they are passed (as your example does))
The solution I found completes my desired layer of abstraction.
What does this imply for TBB? The fact that it worked out that well shows two things:
  1. The fact that the consistency of the newly required functionality carries over so transparently from my framework to the TBB layer, speaks for it self: We definitley need a runtime_join_node (name to be changed) in TBB!
  2. My framework is a comprehensive and consistentabstraction layer to be put onto the flow_graph API. (Just the same way as the flow_graph was the next logical step on top of the tast / task_scheduler). All the variadic (workaround) and curiously recurring template patternstuff I to achieve this, should not be written by the users of such a library, but instead be part of it. We need the functional_dependency_graph API in TBB
Thanks for helping me work that out!

We've thought about specializing our join_nodes for identical types but have not yet done that.When thinking about this, the primary motivation was theconvenience of having an output_type that is a vector, and the ease with which you could access a particular input port.The ability to grow the number of input ports dynamically was not the primary motivation. But yes,if we specialize the join_nodes for a single input type T, thenit would be possible to grow thenumber of input ports dynamically. Since you have a use case for this, I'llraise this with the rest of the TBB team...

As for the functional_dependency_graph API, I'll also raise this with the rest of the team. We've talked about things similar to this internally in the past.

I do not see where I require dynamic growing of the input vector. If its fix from construction of the node that will be fine, as its still at runtime and thus a whole dimension more dynamic than a tuple!
Ok. It will be nice hearing from you

Yes, your application would be fine if the number of ports is set at construction time. If wespecialize the join_nodes for a single type T, we'llhave to decide whetherto fix thenumber of portsat construction time or if we would also allow them to grow/shrink after that.A vector would allows us to support both use case if we choose.

Black Belt
>>Mathematically the whole problem can be seen as the evaluation of a very extensively nested function
>>The whole program does simply something like: result = f( g( h(), f( h(), h() ) ) )

Have you considered using Cilk++?

Jim Dempsey