Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

two ways of dealing with message rejection in Flow graph

Hawkes__Rycharde
New Contributor I
277 Views

When a message is rejected by a following node (B), does the current task (A) wait or just dropes the message and quits? Can we control that? For example, let A be a multifunction_node (with T=3) and B -- a function_node (T=6), in a cycle:

 -> multifunction_node -> function_node -
-------------------------------------------------------

The multifunction_node splits data and pushes two messages to its only successor with
get<0>(out).try_put(part1);
get<0>(out).try_put(part2);
If the successor can't accept the message at the moment I would like it to stop and wait rather than having the try_put return 'false' and exit the body function, i.e. can I just write:
while(!get<0>(out).try_put(part1));
while(!get<0>(out).try_put(part2));

Looping back, that is, when the function_node (running with concurrency 6) passes a message back to the multifunction_node (of concurrency 3) I'd like the messages to be dropped if not accepted and the body function of the function_node exit. Is this the actual behaviour? (A possibly relevant quote from the documentation "A function_node does not contain buffering of its output" and therefore the try_get on it returns false always)

Thanks,
Igor

0 Kudos
2 Replies
Christophe_H_Intel
277 Views

Hello, Igor,

You are correct: the function_node (and its relatives the continue_node and multifunction_node) have no output buffering.  If they attempt to forward a message and there is no accepting successor, the message is dropped.  If this behavior is not desired, you should put a buffering node (queue_node, buffer_node, priority_queue_node or sequencer_node) as the node's successor.

Another difference is if a non-buffering node has multiple successors, the message being emitted is forwarded to all nodes which accept it.  If a buffering node successfully forwards a message it doesn't try to forward to any other nodes.  (The idea is if a message is buffered, it is unique and we don't want to create multiple copies of it.)  Remember also that a source_node is buffered (it has a slot to hold a message which does not get successfully-forwarded), even though it is a function-based node.

This interacts with the specified concurrency levels in the following way:

  • If the function-based node is a rejecting node, and if a predecessor of the node attempts to forward a message while the full concurrency of the node is being used, the message is rejected and the attempt is made to switch the direction of the arc between the nodes from "forward" to "reverse."  Some nodes allow this (queueing nodes for example); others do not (broadcast_node is an example of this.)  This switching, if successful, will allow the node to check for available messages after its last message is successfully-processed.
  • When a function-based node finishes executing its body and handling the resulting message, and provided the concurrency limit is not exceeded, an attempt is made to fetch from any available predecessors to the node.  If successful, the concurrency level is increased and the message is passed to the node's function-body.  Any failure will cause the failing arc to reverse again to the forward direction.
  • If a function-based node has a built-in queue (this is the default), then if the concurrency level would be exceeded the message is just enqueued.

If a node is fully-parallel, then any message forwarded to it results in a task being spawned to execute the body, so unlimited nodes do not need a queue (though the queueing vs. rejecting is part of the definition, while the concurrency level is passed to the constructor, so if you don't want a function-based node to have a queue built in, you have to define it that way explicitly.)

So if you want the multifunction_node to reject messages if its concurrency level is exceeded, you should define it as "rejecting".

If you want the function_node to always receive the output of the multifunction_node, then leaving it as "queueing" is the simplest way to go.  That way all the messages passed by the multifunction_node will be accepted by the successor, and you're done.

Though the solution you proposed (spinning while the successor rejects) would work, there are two problems with it.  You will be occupying a thread with spinning, and you will be repeatedly accessing the successor list (which requires a lock in the current implementation.)  If queuing the results would be acceptable that would be preferred.

Best Regards,
Chris

0 Kudos
Hawkes__Rycharde
New Contributor I
277 Views

Thanks, Chris. Indeed spinning is a bad idea for the two reasons. And the queueing function_node is a solution that keeps the graph well saturated. Actually, too saturated: the input queue of the function_queue may grow very large. Imagine there to be always a task that completes fast in function_node, goes back to the multifunction_node, splits the next piece of data, and pushes not one but two new sub-tasks onto the  function_node's queue.

Besides, my multi_function node is actually getting its data from an external priority queue with a constantly updating threshold priority and the older objects on that inner function_node inner queue could "expire", that is, fall under the threshold; initiating a task just to touch and check those is a waste. I wish I could use the priority_queue_node but it lask the ability to be purged of the low priority tasks. I'm going to ask about it in another post.

Thanks
Igor

0 Kudos
Reply