- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
I would like to use tbb::flow in the context of a Software Defined Radio (SDR)
application. This seems like a perfect fit, because I need to pipeline complex
algorithms.
I have read that TBB nodes use a push-pull process for communication---the
sender will push as long as the receiver is able to accept. If this is not
possible because the node is still running, the edge goes in a pull mode with
the receiver pulling results from the sender.
The problem I have is that some algorithms have multiple inputs which are
produced asynchronously. Thus, a join node is not feasible, because one input
might only be produced very rarely and I might not need it on every iteration.
I would like to have something similar to a multifunction node but with
multiple inputs which I pull in manually. An example use of such a
multi_io_node could be:
#include <cstdlib> // for rand() #include <chrono> // for sleep in ms #include <thread> // for sleep #include <tbb/flow_graph.h> using namespace ::tbb::flow; int main(int argc, char* argv[]) { graph g; // Create two source nodes auto source0 = source_node<int>{g, [] (int& out) { out = 0; return true; }, false}; auto source1 = source_node<int>{g, [] (int& out) { out = 1; return true; }, false}; // Create two sink nodes auto print_node0 = function_node<int, int>{g, 1, [] (int in) { std::cout << "print_node0 received input " << in << "\n"; return 0; }}; auto print_node1 = function_node<int, int>{g, 1, [] (int in) { std::cout << "print_node1 received input " << in << "\n"; return 0; }}; // TODO: This needs to be implemented auto selective_io = multi_io_node< tuple<int, int>, // input ports tuple<int, int>> { // output ports g, // graph 1, // concurrency (should probably be always 1) // lambda that reads and writes selectively [] (multi_io_node::input_port_type& ip, // input ports multi_io_node::output_port_type& op) { // output ports auto input_idx = static_cast<size_t>(rand() % 2); int value{0}; // read from one random input until its available // write to the corresponding output if (input_idx == 0) { while (!std::get<0>(ip).try_get(value)) { std::this_thread::sleep_for(std::chrono::milliseconds{1}); } std::get<0>(op).try_put(value); } if (input_idx == 1) { while (!std::get<1>(ip).try_get(value)) { std::this_thread::sleep_for(std::chrono::milliseconds{1}); } std::get<1>(op).try_put(value); } } }; // connect ports make_edge(source0, input_port<0>(selective_io)); make_edge(source1, input_port<1>(selective_io)); make_edge(output_port<0>(selective_io), print_node0); make_edge(output_port<1>(selective_io), print_node1); // start the (infinite) processing source0.activate(); source1.activate(); g.wait_for_all(); }
What do you think? Does this make sense? How would you implement this?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Florian,
Thank you for the question. Unfortunally, I do not know the whole algorithm, however, according to your description I can suggest several options:
- You can use a separate function_node for each type of input. If you need to share the state you can pass the same reference/pointer to each function_node's body to have an access to the common data;
- Another option is to use indexer_node that adds tags for each input port and immediately forwards it to the successor;
You can combine this set of nodes to your own composite_node that will look like a multiinput and multioutput node.
Why do you need explicit pulling inside the node? It might lead to inefficiency and deadlocks.
If I missed something or you have questions, feel free to contact us in this thread.
Regards, Alex
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Florian,
Thank you for the question. Unfortunally, I do not know the whole algorithm, however, according to your description I can suggest several options:
- You can use a separate function_node for each type of input. If you need to share the state you can pass the same reference/pointer to each function_node's body to have an access to the common data;
- Another option is to use indexer_node that adds tags for each input port and immediately forwards it to the successor;
You can combine this set of nodes to your own composite_node that will look like a multiinput and multioutput node.
Why do you need explicit pulling inside the node? It might lead to inefficiency and deadlocks.
If I missed something or you have questions, feel free to contact us in this thread.
Regards, Alex
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thank you very much, Alex.
Both of your ideas sound really nice and I will try them as soon as I find the time. I haven't thought about indexer_node, yet, and I am sure it could work out quite well.
The reason I wanted a node that always pulls is because thats how our current inefficient SDR framework works. Every processing step runs in a separate thread and pulls its input in from shared, lock-free queues. I hoped to plug in TBB as a more efficient replacement, at least until I could rewrite the algorithms to work with regular TBB nodes. However, all my current implementations suffer from deadlocks, indeed.
I will try to port our current algorithms to your implementation and do some profiling. I am certain that TBB could improve our performance massively.
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page