Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Is there a way to reject messages in a flow graph node?

Xin_H_1
Beginner
356 Views

I'm currently working on a legacy system which has a computation model of dependency graph. I tried tbb::flow::graph, which works good.

However, there is a special case in our system: A node could start to reject all future messages after it "sees a certain kind of messages"*. (Its children will also reject messages if one of its parents start to reject) Is there a way to reject messages?

I can't find a way for a node to reject messages on the Internet. For now the implementation is when a node enters the rejection state, instead of processing the message and passing out the output, it will send a special message which tells its children to stop processing.

Because this message rejection happens quite often in parts of the graph, (also it's not predictable) the overhead processing these special stop-processing message is quite huge.

The node behavior is not predictable, and changing how the system runs requires a lot of work. Although I tend to remove such message rejection, it won't happen any time soon. So I'm looking for a solution that can bear all these constraints.

I'm wondering if there is a way for a node to tell its parent that it will not receive any more message?

 

One way I can think of is to remove the edge when a node start to reject message. But not sure if tbb::flow::graph will expect `remove_edge` during its computation. There is nothing about the constraint of `remove_edge` function. (Can I assume it's safe to call at any time? What will be the possible result? I tried in a test program and it seems working.) However one big disadvantage is this will make the runtime behavior quite complicated and difficult to debug.

Many thanks for your help!

By "sees a certain kind of messages" it means the node enters a state that will no longer process any message.

0 Kudos
1 Reply
Christophe_H_Intel
356 Views

Hello, Xin,

I hope I understand your problem.  Is it essential that the node actually reject the message, or can it just ignore it?  If ignoring is okay, then the following solution will work.

The multifunction_node passes the output ports as a parameter to the functor call.  So the body of the method can choose whether to forward a message to another node or not.

struct mf_functor {

    tbb::atomic<bool> stop_processing;
    mf_functor( ) { stop_processing = false; }
    mf_functor( const mf_functor &f ) { stop_processing = f.stop_processing; }

    template<typename MsgType, typename output_ports_type>
    void operator()( MsgType &m, output_ports_type &p ) {
        if(msg_is_stop(m)) stop_processing = true;
        if(!stop_processing) {
            // do computation, creating result
            (void)tbb::flow::get<0>(p).try_put(result);
            // the try_put returns whether the put was successful.  You can also
            // react to that if you like
        }
    }
};

// ...
    tbb::flow::graph g;
    tbb::flow::concurrency concurrency_level = tbb::flow::unlimited;
    tbb::flow::multifunction_node< InputType, tbb::flow::tuple<OutputType> > mf_node( g, concurrency_level, mf_functor() );
    // ...
    // to make an edge from this node to a successor
    tbb::flow::make_edge(tbb::flow::output_port<0>(mf_node), successor_node);
    // the multifunction_node has only one input, so making an edge to it is easy.

This is the solution I would suggest, as long as ignoring is acceptable.

Regards,
Chris

 

0 Kudos
Reply