Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

blocking try_put flow graph

Sergey_N_1
Beginner
2,284 Views

Hello

How can I achieve a blocking try_put on a node with a concurrency limiter (or if it's limited by other reasons) ? For instance, if I have a function node with a concurrency limit of 3 tasks and I would like try_put to block (and do work-stealing if needed) until one of the tasks completes and I have an available slot for another task to push. Atm. tasks get queued as many as I can push regardless the limit value or I get 'false' returned by try_put if I use 'rejecting' policy.

Thanks.

0 Kudos
1 Solution
Mariya_P_Intel
Moderator
2,036 Views

Sergey,

Please see more details from developers on your case.

There is no possibility to call a function that allows the thread to participate in TBB task execution and yet return once single message is processed. TBB is designed for processing bulk tasks hence any blocking methods wait until all the associated tasks (to graph or to parallel algorithm) are processed. There are two options:

  1. Have a variable that would tell how many messages are in flight at the moment and, if the threshold is reached, just do the processing of the newly acquired message (perhaps, serially) by main thread itself. That is, do not push the message into the graph. If the message is delivered to the graph by single thread then this should allow to limit the memory consumption and yet keep the main thread busy with the task. User can organize parallel processing of this single task by pushing it (and immediately making blocking wait() call) into separate graph or task group that repeats the processing steps from the first graph and guarantees that only one message is placed in it.
    1. However, if that newly acquired message is processed serially and relatively long, then threads that process messages in the graph might starve for work. If this is undesired then see bullet 2.
  2. In order to have efficient message processing in terms of memory consumption, parallelism, and avoiding repeating steps in other entities the user will need to make message acquisition to be a part of the graph or pipeline algorithm.

Thanks, Mariya


View solution in original post

0 Kudos
8 Replies
Mariya_P_Intel
Moderator
2,181 Views

Hi Sergey,


Could you please clarify why blocking `try_put()` is necessary in your usecase? Engineering team is pointing out that the desired blocking (with work-stealing enabled) happens inside `tbb::flow::graph::wait_for_all()` method. So, in case of a node with concurrency limit set and queueing policy, the messages are put into the internal queue and are not retrieved from that queue until the internal threshold drops below the concurrency limit. In case of rejecting policy, the messages are not get enqueued and are considered lost if not buffered. The suggestion is to put one of the buffering nodes (e.g. tbb::flow::queue_node) before the node with concurrency limit, do all of the necessary `try_put()`s in it and then call blocking `graph::wait_for_all()`.


Regards,

Mariya


0 Kudos
Mariya_P_Intel
Moderator
2,133 Views

Hi Sergey,

 

Please explore the following solutions and let me know if some of them are suitable for you.

The interface of these solutions differs depending on the TBB version you use (TBB 2020 or oneTBB).

  • Using limiter_node, which has a threshold that controls the number of messages in flight in a particular point of the graph.

Documentation: https://software.intel.com/content/www/us/en/develop/documentation/tbb-documentation/top/intel-threading-building-blocks-developer-guide/parallelizing-data-flow-and-dependence-graphs/flow-graph-tips-and-tricks/flow-graph-tips-for-limiting-resource-consumption/use-limiternode.html

If oneTBB is used, decrement interface can be retrieved using limiter_node::decrementer() method.

  • Using input_node, which is connected to the processing node with rejecting policy – instead of the bounding queue. Once one of the messages has been processed by the node it will pull the new one from the input_node. In this case, the memory usage comprises of the messages in flight of processing node, which number is restrained by its concurrency limit, plus one buffered message in input_node. input_node does not produce new messages until its produced message is accepted by one of its successors.

Documentation:

In case of oneTBB, input_node’s body has a bit different requirements. Instead of accepting reference to the user type, it accepts reference to tbb::flow_control. Instead of returning boolean that indicates the end of the stream, it now returns the output value, and tbb::flow_control is used for end of stream indication.

  • Using token-based scheme for controlling the number of messages in flight. The tokens represent the work items. The more tokens in the graph, the more messages can be processed simultaneously. The idea is to use buffer_node for tokens, input_node for messages to be processed, join_node with reserving policy and the processing node. This scheme is flexible as it can dynamically adjust the concurrency in the whole graph.

Documentation: https://software.intel.com/content/www/us/en/develop/documentation/tbb-documentation/top/intel-threading-building-blocks-developer-guide/parallelizing-data-flow-and-dependence-graphs/flow-graph-tips-and-tricks/flow-graph-tips-for-limiting-resource-consumption/create-a-tokenbased-system.html

In all of the cases, user calls blocking `graph::wait_for_all()` so that main thread can also participate in processing.

 

Best regards, Mariya

0 Kudos
Sergey_N_1
Beginner
2,079 Views

Hi Mariya

If  I understand you correctly, I need to rework my whole top-level program logic and put my simulation algorithm (which feeds the graph on each iteration) inside a tbb input_node with function representing the body of the top-level simulation's "for" loop. This would end up being quite a change as I would need to decompose existing relatively straight forward simulation loop into it's body plus the state required for each iteration plus a probably bunch of other classes to represent other logical blocks and transition states. Honestly this is a bit too much and too intrusive for a relatively small processing step with memory constraints.

Thanks.

Sergey.

0 Kudos
Mariya_P_Intel
Moderator
2,068 Views

Hi Sergey,

If you would like to use current TBB version, you need to perform one of the changes suggested above.

Your feedback is being processed by the engineering team.

Thanks, Mariya

0 Kudos
Mariya_P_Intel
Moderator
2,037 Views

Sergey,

Please see more details from developers on your case.

There is no possibility to call a function that allows the thread to participate in TBB task execution and yet return once single message is processed. TBB is designed for processing bulk tasks hence any blocking methods wait until all the associated tasks (to graph or to parallel algorithm) are processed. There are two options:

  1. Have a variable that would tell how many messages are in flight at the moment and, if the threshold is reached, just do the processing of the newly acquired message (perhaps, serially) by main thread itself. That is, do not push the message into the graph. If the message is delivered to the graph by single thread then this should allow to limit the memory consumption and yet keep the main thread busy with the task. User can organize parallel processing of this single task by pushing it (and immediately making blocking wait() call) into separate graph or task group that repeats the processing steps from the first graph and guarantees that only one message is placed in it.
    1. However, if that newly acquired message is processed serially and relatively long, then threads that process messages in the graph might starve for work. If this is undesired then see bullet 2.
  2. In order to have efficient message processing in terms of memory consumption, parallelism, and avoiding repeating steps in other entities the user will need to make message acquisition to be a part of the graph or pipeline algorithm.

Thanks, Mariya


0 Kudos
Sergey_N_1
Beginner
2,027 Views

@Mariya_P_Intel wrote

In order to have efficient message processing in terms of memory consumption, parallelism, and avoiding repeating steps in other entities the user will need to make message acquisition to be a part of the graph or pipeline algorithm.


Hi Mariya, thanks for your answer.  What bothers me is that this (seem to be the only) approach is way too intrusive in regard to the code which has nothing to do with tbb.

But it looks like that's how life is anyway.

Thank you for your time!

 

Regards,

Sergey.

0 Kudos
Mariya_P_Intel
Moderator
2,005 Views

Hi Sergey,

Thank you for feedback on TBB usage.

Do you need some more assistance on this case or my above post contains a solution for you ("In order to have efficient message processing in terms of memory consumption, parallelism, and avoiding repeating steps in other entities the user will need to make message acquisition to be a part of the graph or pipeline algorithm.")?

Thanks, Mariya


0 Kudos
Sergey_N_1
Beginner
1,996 Views

Hi Maria

I've got an understanding of how my goal can be achieved, no further assistance needed. Thank you for your help!

 

Regards,

Sergey.

0 Kudos
Reply