Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

tbb flow graph - parallel run problem

cinar_e_
Beginner
1,599 Views

I have been using tbb flow graph for my async run needs. I receive realtime market data for some instruments and as soon as i receive data for a specific instrument, i want to trigger some calculations for that instrument. Calculations for different instruments should run independently. I defined one graph object in the application and all instruments have 2 workers for buy side data and sell side data. (worker->function node)

I have such a code:

Underlying* ul = ulit->second; //(ul-->instrument)

            if (depthData.side == Side::Buy) {
                bool b = (ul->getBidworker())->try_put(depthData);
            } else if (depthData.side == Side::Sell) {
                bool b = (ul->getAskworker())->try_put(depthData);
            }
Inside the worker codes, i made a mistake and caused an infinite loop. I thought that those ul objects that have the infinite loops would have the problem. (i.e. not processing any more data through these workers) But application completely stopped, i.e. no other ul object received data through their workers. What do i miss here? Then shall i have separate graph objects? Isnt this design correct to have async behavior with tbb flow graph?

regards

 

0 Kudos
8 Replies
Nikita_P_Intel
Employee
1,599 Views

Hi cinar,

If you do an infinite loop inside the function_node (which is executed as a tbb task with the data passed to the function_node), then you will be stuck inside one task. When all the tasks in graph complete, then g.wait_for_all method returns. Also, do I understand it correctly that function_node supposed to be async? Then you can use async_node for example.

0 Kudos
cinar_e_
Beginner
1,599 Views

Hi nikita, I always assumed async behavior with function node as it was recommended here:

https://software.intel.com/en-us/forums/intel-threading-building-blocks/topic/759155#comment-1932950

It is obvious that my assumption was wrong. I will now switch to async_node for all function_nodes. As all of my nodes will be leaf nodes, what should i define the output type as simply changing function_nodes with async_nodes didnt work.

regards.

0 Kudos
Nikita_P_Intel
Employee
1,599 Views

In your case, if all nodes are leafs and there is no reason to send the data back to the graph, then the output of async_node could be defined as tbb::flow::continue_msg type. Look at the example provided with async_node. In your case, you submit the data for processing to the async activity (external thread with an infinite loop) through tbb::concurrent_queue. Important things are gateway->reserve_wait() and gateway->release_wait() calls, which prevent the graph from finishing (return from g.wait_for_all method).

0 Kudos
cinar_e_
Beginner
1,599 Views

unfortunately it doesnt work. I simply changed function_node into async_node and compilation failed:

In file included from /home/vadmin/kurpro/tbb-2018_U3-dts7-20180728/include/tbb/tbb.h:53:0,
                 from QM.cpp:31:
/home/vadmin/kurpro/tbb-2018_U3-dts7-20180728/include/tbb/flow_graph.h: In instantiation of ‘void tbb::flow::interface10::internal::async_body<Input, Ports, Gateway, Body>::operator()(const Input&, Ports&) [with Input = std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >; Ports = std::tuple<tbb::flow::interface10::internal::multifunction_output<tbb::flow::interface10::continue_msg> >; Gateway = tbb::flow::interface10::receiver_gateway<tbb::flow::interface10::continue_msg>; Body = FillInfoBody]’:
/home/vadmin/kurpro/tbb-2018_U3-dts7-20180728/include/tbb/internal/_flow_graph_body_impl.h:171:9:   required from ‘void tbb::flow::interface10::internal::multifunction_body_leaf<Input, OutputSet, B>::operator()(const Input&, OutputSet&) [with Input = std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >; OutputSet = std::tuple<tbb::flow::interface10::internal::multifunction_output<tbb::flow::interface10::continue_msg> >; B = tbb::flow::interface10::internal::async_body<std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::tuple<tbb::flow::interface10::internal::multifunction_output<tbb::flow::interface10::continue_msg> >, tbb::flow::interface10::receiver_gateway<tbb::flow::interface10::continue_msg>, FillInfoBody>]’
QM.cpp:1498:1:   required from here
/home/vadmin/kurpro/tbb-2018_U3-dts7-20180728/include/tbb/flow_graph.h:3755:9: error: no match for call to ‘(FillInfoBody) (const std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >&, tbb::flow::interface10::internal::async_body_base<tbb::flow::interface10::receiver_gateway<tbb::flow::interface10::continue_msg> >::gateway_type&)’
         my_body(v, *this->my_gateway);
         ^~~~~~~
In file included from OrderManager.hpp:17:0,
                 from nomxfixClient.hpp:8,
                 from QM.cpp:35:
BodyDefinitions.hpp:223:8: note: candidate: void FillInfoBody::operator()(std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >)
   void operator()(tuple<FillInfo*, string> data);
        ^~~~~~~~
 

0 Kudos
cinar_e_
Beginner
1,599 Views

Also could you please have a look at the following:

https://software.intel.com/en-us/forums/intel-threading-building-blocks/topic/804117

Although i completely removed tbb graph usage from my app code for trial, high CPU usage is still there. (only parallel_for and concurrent_queueu exist in the remaining app.) Is this behavior normal?

 

0 Kudos
Nikita_P_Intel
Employee
1,599 Views

Looks like you defined wrong input parameter for FillInfoBody functor. Make it const reference tuple:

void operator()(const tuple<FillInfo*, string>& data)

 

As for high CPU usage. It is not very high per core actually, but the reason that all CPU cores are utilized is that after parallel_for execution the threads are not destroyed immediately and may spin wait on the internal lock.

0 Kudos
cinar_e_
Beginner
1,599 Views

making the tuple const didn't work. Same compilation error. By the way, exactly same functor works for function_node.

 

0 Kudos
cinar_e_
Beginner
1,599 Views

And regarding high CPU usage, even if i use "task_scheduler_init (8)" in my code, still all CPUs are utilized by tbb. Cant i prevent this?

 

0 Kudos
Reply