Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

An always pulling multiple-in-multiple-out tab::flow node

Florian_J_
Beginner
395 Views
Hi,

I would like to use tbb::flow in the context of a Software Defined Radio (SDR)
application. This seems like a perfect fit, because I need to pipeline complex 
algorithms.

I have read that TBB nodes use a push-pull process for communication---the 
sender will push as long as the receiver is able to accept. If this is not 
possible because the node is still running, the edge goes in a pull mode with 
the receiver pulling results from the sender.

The problem I have is that some algorithms have multiple inputs which are 
produced asynchronously. Thus, a join node is not feasible, because one input 
might only be produced very rarely and I might not need it on every iteration. 
I would like to have something similar to a multifunction node but with 
multiple inputs which I pull in manually. An example use of such a 
multi_io_node could be:
#include <cstdlib>  // for rand()

#include <chrono>   // for sleep in ms
#include <thread>   // for sleep

#include <tbb/flow_graph.h>

using namespace ::tbb::flow;

int main(int argc, char* argv[]) {
  graph g;
  
  // Create two source nodes
  auto source0 = source_node<int>{g, [] (int& out) {
    out = 0;
    return true;
  }, false};
  auto source1 = source_node<int>{g, [] (int& out) {
    out = 1;
    return true;
  }, false};
  
  // Create two sink nodes
  auto print_node0 = function_node<int, int>{g, 1, [] (int in) {
    std::cout << "print_node0 received input " << in << "\n";
    return 0;
  }};
  auto print_node1 = function_node<int, int>{g, 1, [] (int in) {
    std::cout << "print_node1 received input " << in << "\n";
    return 0;
  }};
  
  // TODO: This needs to be implemented
  auto selective_io = multi_io_node<
      tuple<int, int>,    // input ports 
      tuple<int, int>> {  // output ports
    g,                    // graph 
    1,                    // concurrency (should probably be always 1)
    // lambda that reads and writes selectively
    [] (multi_io_node::input_port_type& ip,     // input ports
        multi_io_node::output_port_type& op) {  // output ports
      auto input_idx = static_cast<size_t>(rand() % 2);
      int value{0};
      // read from one random input until its available
      // write to the corresponding output
      if (input_idx == 0) {
        while (!std::get<0>(ip).try_get(value)) {
          std::this_thread::sleep_for(std::chrono::milliseconds{1});
        }
        std::get<0>(op).try_put(value);
      }
      if (input_idx == 1) {
        while (!std::get<1>(ip).try_get(value)) {
          std::this_thread::sleep_for(std::chrono::milliseconds{1});
        }
        std::get<1>(op).try_put(value);
      }
    }
  };
  
  // connect ports
  make_edge(source0, input_port<0>(selective_io));
  make_edge(source1, input_port<1>(selective_io));
  make_edge(output_port<0>(selective_io), print_node0);
  make_edge(output_port<1>(selective_io), print_node1);
  
  // start the (infinite) processing
  source0.activate();
  source1.activate();
  g.wait_for_all();
}
What do you think? Does this make sense? How would you implement this?
0 Kudos
1 Solution
Alexei_K_Intel
Employee
395 Views

Hi Florian,

Thank you for the question. Unfortunally, I do not know the whole algorithm, however, according to your description I can suggest several options:

  • You can use a separate function_node for each type of input. If you need to share the state you can pass the same reference/pointer to each function_node's body to have an access to the common data;
  • Another option is to use indexer_node that adds tags for each input port and immediately forwards it to the successor;

You can combine this set of nodes to your own composite_node that will look like a multiinput and multioutput node.

Why do you need explicit pulling inside the node? It might lead to inefficiency and deadlocks.

If I missed something or you have questions, feel free to contact us in this thread.

Regards, Alex

 

View solution in original post

0 Kudos
2 Replies
Alexei_K_Intel
Employee
396 Views

Hi Florian,

Thank you for the question. Unfortunally, I do not know the whole algorithm, however, according to your description I can suggest several options:

  • You can use a separate function_node for each type of input. If you need to share the state you can pass the same reference/pointer to each function_node's body to have an access to the common data;
  • Another option is to use indexer_node that adds tags for each input port and immediately forwards it to the successor;

You can combine this set of nodes to your own composite_node that will look like a multiinput and multioutput node.

Why do you need explicit pulling inside the node? It might lead to inefficiency and deadlocks.

If I missed something or you have questions, feel free to contact us in this thread.

Regards, Alex

 

0 Kudos
Florian_J_
Beginner
395 Views

Thank you very much, Alex.

 

Both of your ideas sound really nice and I will try them as soon as I find the time. I haven't thought about indexer_node, yet, and I am sure it could work out quite well.

The reason I wanted a node that always pulls is because thats how our current inefficient SDR framework works. Every processing step runs in a separate thread and pulls its input in from shared, lock-free queues. I hoped to plug in TBB as a more efficient replacement, at least until I could rewrite the algorithms to work with regular TBB nodes. However, all my current implementations suffer from deadlocks, indeed.

I will try to port our current algorithms to your implementation and do some profiling. I am certain that TBB could improve our performance massively.

0 Kudos
Reply