Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Graph flow control with multifunction nodes

Greg__Trost
Beginner
784 Views

Hi, all,

I've been working on a paper design for a flow graph and am having difficulties getting something clean, but have an idea of how a hopefully simple TBB addition would make my problem easier.

Basically, I have a graph where a serial multifunction node M is feeding a function node F (which may be serial as well). Node M may generate between 1 and 3 inputs to F for every input it receives.

  ... -> M -> F -> ...

Now, I don't want a lot of buffering in this network -- I'd rather not have any at all -- but clearly, if I generate 3 things from one thing, there has to be a buffer somewhere. Most likely, this is a queue node between the multifunction node and the function node. I also need a limiter node L in front of M to keep it from being overfed.

  ... -> L -> M -> Q -> F -> ...

Since M's body can't control whether or not the node accepts input, I can't put a limiter node in front Q. Now, what I'd like to be able to do is decrement the limiter node L when the function node F has accepted everything that M put into the queue Q since M got an input, but there isn't any obvious, clean way to do that -- the only way I've been able to come up with is to clutter up the output from M with some sort of "end of sequence" marker that F (or a new node between Q and F) can use as an indicator to call F's decrementer.

So, the question: Is there any better way to do this?

The hopefully simple addition: Could queue_node be extended with an additional output port on which it could send a continue message when it is empty? If that extension were in place, the continue-message output of the queue node could be wired to the limiter node's decrement member. Then the graph above could  regulate itself without any additional management on the part of M and F.

Actually, just to make the addition more complicated, you'd like to be able to apply this functionality to any node that buffers its input (priority queue nodes, sequencer nodes, etc.). Maybe this addition could be written as a decorator rather than cluttering up all those nodes' APIs. Also, It Would Be Nice If the threshold at which continue messages get generated were configurable -- that is, let the queue send continue messages when it has fewer than n elements rather than waiting until it gets down to zero.

Thanks,

Bill

0 Kudos
5 Replies
Christophe_H_Intel
784 Views

Hello, Bill,

Thanks for your question about flow::graph.

You brought up two points regarding flow::graph: the problem you describe, and the addition of signaling to buffering nodes (such as queue_node or priority_queue_node.)

If I understand the description of your problem, you'd like both the multifunction_node and the function_node to be serial; this would mean you need a buffer between them.  However, function_nodes (and multifunction_nodes) have an integrated buffer by default, so including an explicit queue_node is not necessary unless you wish to do something else with the data before enqueueing it.

It sounds like you don't want the multifunction_node to send more data until the function_node has processed its inputs, which means the multifunction_node and the function_node's executions will not overlap, unless the generation of the messages between the two nodes takes some time.

You could roll both activities into the body of one multifunction_node.  If you need to limit input to the node, you could add a limiter_node with a threshold of 1, and use one of the outputs of the multifunction_node as the input to the decrementer.

                Q     ->     L     ->     M   ->  ...
                                ^--------------|

The other possibility is to use some special message to signal the end of the string of messages between the nodes.  You did suggest that alternative, and it seems best if the two nodes shouldn’t be merged.

Several things to remember

  • The actions of the body of the multifunction_node are not atomic with respect to the rest of the flow::graph, so, for instance, if it emits a message, and then emits a continue_message to increment a limiter_node, an arbitrary amount of time may pass between those activities, and an arbitrary amount of time may pass between the multifunction_node’s last operation and the completion of its function body. 
  • A limiter_node behaves as a broadcast_node as long as its threshold hasn't been reached, so if the limiter_node above receives a continue_message, increments its threshold, and tries to forward a message to the multifunction_node, if the multifunction_node hasn't completed executing (and has no internal buffering), the message being forwarded will be dropped.  So in the example above (and in your example) M should have a buffer.
  • This asynchronous aspect of flow::graph also means a “signaling queue_node” will not solve the problem.  To give an example, if M is going to send three messages to F in your second example graph, the timing might be that M emits one message to the signaling queue_node and then swaps out.  F runs, receives the message from the signaling queue_node, which becomes empty.  The queue_node signals the limiter that it is empty, and the limiter forwards another message to M, though M is not finished emitting the other two messages to F.
0 Kudos
Greg__Trost
Beginner
784 Views

Hi, Christopher,

Thanks for the detailed response.

I do want the multifunction node and the function node to be able to execute at the same time -- that's why converting the existing code to TBB is of interest. What I don't want is for there to be buffers in the graph that could potentially fill without bound, causing the program to run out of memory and crash, as the ultimate source of this data is an uncontrolled network source. In a naive graph (... -> Q -> M -> F -> ...), the function node could easily fall behind and make the whole thing choke (if you'll pardon the technical language!).

Your final point about a signaling queue node not solving the problem is a good one. I hadn't considered that particular class of race conditions.

On the other hand, maybe I need to move the problem out of the graph entirely. Is the following psuedocode valid (if not necessarily a great idea)?

[cpp]

concurrent_bounded_queue bq;
flow::graph g;
flow::limiter_node lim(g, n);
...
for (;;) {

T t; bq.pop(t);
do {

if (!lim.try_send(t))

complain mightily;

} while (bq.try_pop(t));
g.wait_for_all();
process g's output...;

}

[/cpp]

0 Kudos
Greg__Trost
Beginner
784 Views

Related question:  Can limiter_node::decrement drop messages? I can see situations (where it takes one input to a multifunction node to generate a single output) where a node might send multiple continue_msg()s back-to-back to the same decrement instance, and things would go very wrong if those could get lost.

0 Kudos
Greg__Trost
Beginner
784 Views

Drat, I think the forum software dropped a message.

Thanks for your detail response, Christopher. One thing I want to clarify is that I really do want the multifunction node and the function node to operate in parallel -- that's the incentive to convert to TBB. What I don't want is unbounded buffers in the middle of the graph: The ultimate input source is the network, and I don't want too much incoming traffic to result in the program crashing because it ran out of memory, but I don't think that dropping data in the middle of the graph would be appropriate -- if I'm going to drop something, I'd like to drop it before I've spent any cycles processing it.

Your point about possible race conditions from using a signaling queue node is a good one. I hadn't considered those sorts of races.

Ultimately, I'm convincing myself that the right thing to do is some sort of end-to-end flow control with a sufficient number of tokens in the limiter node to make sure everything can stay busy, and some way for the graph to signal an external thread that it can accept more input.

(Apropos of nothing, "decrement" strikes me as backwards -- a limiter_node is initialized with a fix number of tokens, which get consumed as data flows through it; letting more data flow through must therefore be incrementing, not decrementing. But that's just trivia)

Bill

0 Kudos
Christophe_H_Intel
784 Views

Hello, Bill,

Thanks for the quick response.  The decrementer in the limiter_node operates under a lock, so no decrement messages are lost.  On the other hand, because a limiter_node behaves like a broadcast_node, if it tries to forward something and no one accepts, the message is dropped.  There is one way a limiter_node differs from broadcast_node.  If a decrement (yes, really increment) message is received and the current count is zero, an attempt is made to fetch an item from a predecessor.  If that fetch succeeds, but no successor will accept, the message is dropped.  (A broadcast_node never tries to fetch from a predecessor.)

I've seen the use of tokens to limit the parallelism in a graph; one interesting way to do it is to use a reserving join to match tokens with inputs.  I am attaching a picture of a graph I made for a game solver with incomplete knowledge.  The salient point is the first join (a reserving join).  The inputs to the join are two queues: the upper contains generated game buffers, the lower indices of solvers.  (The graph is started by feeding the addresses of game buffers to the "Generate Game" function_node and filling the "idle solvers" queue with the indices of the solvers available.)

As game buffers are filled they are placed in the game queue.  If there is both a game buffer and a solver index available, the reserving join pulls the first message from each queue and wraps them in a tuple, forwarding it to the multifunction_node ("Forward.")  If there are no solvers available the "idle solvers" queue is empty, so no buffers are forwarded.  If it takes a long time to generate games and the "game queue" becomes empty, as solvers finish their work the "idle solver" queue fills up, waiting for a game buffer to be available.

The point is one queue gets filled with the number of tokens corresponding to the level of parallelism you desire, and the reserving join makes sure no more games are forwarded than solvers are available.

If you have any questions, please let me know.

Regards,
Chris

0 Kudos
Reply