- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hey,
I'm new to Intel TBB and the Flow Graph. I’m sorry if the answer to my problem is obvious. I couldn’t find anything on the internet that could help me.
The Problem is that I have to reorder messages that have been processed in parallel. The sequencer_node works without problems but if a message got missing the buffer will grow and grow. Since the processing I liked to do doesn’t stop and new messages are generated, the buffer grows until the program dies. I’m looking for a way to cancel/restart the graph if the sequence could not be restored (after an amount of messages) with a sequencer_node because of a missing message or at least skip that message and start fresh with the sequence at the next message.
Thank you for the help!
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hey Martin,
You can cancel the work in several ways:
1) Call "task_group_context_instance.cancel_group_execution()". Of course, you need to create the graph with explicit task_group_context instance at first. Please see graph and task_group_context reference for more details.
2) Inside the body of some graph node call "self().cancel_group_execution()". The description of the method is here.
3) Throw an exception from inside the body of a node. In this case, the exception will be rethrown at the point of "graph_instance.wait_for_all()", where it needs to be caught. Please read about this here.
Independently on the way the graph is cancelled, it needs to be reset using "graph_instance.reset()" before it can be used again.
However, I wonder why you are having message loss. I would like to make sure you understand forwarding, buffering and reception policy and the message loss does not occur because of the absence of some buffering node between two nodes, in which case the messages are merely discarded.
Regards, Aleksei
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Aleksei Fedotov (Intel) wrote:You can cancel the work in several ways:
1) Call "task_group_context_instance.cancel_group_execution()". Of course, you need to create the graph with explicit task_group_context instance at first. Please see graph and task_group_context reference for more details.
2) Inside the body of some graph node call "self().cancel_group_execution()". The description of the method is here.
3) Throw an exception from inside the body of a node. In this case, the exception will be rethrown at the point of "graph_instance.wait_for_all()", where it needs to be caught. Please read about this here.Independently on the way the graph is cancelled, it needs to be reset using "graph_instance.reset()" before it can be used again.
Thank you! Hadn't thought of that.
Aleksei Fedotov (Intel) wrote:However, I wonder why you are having message loss. I would like to make sure you understand forwarding, buffering and reception policy and the message loss does not occur because of the absence of some buffering node between two nodes, in which case the messages are merely discarded.
I don't lose messages through Flow Graph but I'm using the message ID from my data structure and it can be possible that messages are missing at the start of the Flow Graph.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hey Martin,
If you are using message ID, which I guess used for obtaining sequence number for sequencer_node, then you might try to build necessary behavior using other nodes. For example, priority_queue_node can be used to order the messages using the same message ID, the only thing you need to provide is the comparison operator for priority_queue_node's template parameter. However, you would need to keep a number of messages in the priority_queue_node so that they can reorder in a way you want and then allow them to come out. The retaining of the messages could be achieved using, for example, limiter_node. Consider the following scheme:
- limiter_node1 limits the number of messages in the priority_queue_node since the source_node will not stop producing if there is no message rejection.
- priority_queue_node orders the messages in its buffer. The number of messages reordered depends on the limiter_node1 and back link from the filter.
- limiter_node2 allows keeping some of the messages in the priority_queue_node so that they can reorder. If necessary, limiter_node2 can be prefilled at the start so that it rejects the messages from the beginning.
- gatekeeper decides whether to allow message(s) from priority_queue_node to flow further through the limiter_node2 into filter or not. It could signal to the decrement of the limiter_node2 every, say, 10 messages.
- filter is necessary due to prefilling stage of the limiter_node2 and, most importantly, to allow new messages to go into priority_queue_node through its back link to the first limiter_node.
- processor does the actual data processing.
Note that this is one possible topology for weak ordering you want to achieve using sequencer_node. Other schemes also possible and even might be more efficient.
Thank you for the use case and hope my answer will help you.
Regards, Aleksei
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page