- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have been using tbb flow graph for my async run needs. I receive realtime market data for some instruments and as soon as i receive data for a specific instrument, i want to trigger some calculations for that instrument. Calculations for different instruments should run independently. I defined one graph object in the application and all instruments have 2 workers for buy side data and sell side data. (worker->function node)
I have such a code:
Underlying* ul = ulit->second; //(ul-->instrument)
if (depthData.side == Side::Buy) {
bool b = (ul->getBidworker())->try_put(depthData);
} else if (depthData.side == Side::Sell) {
bool b = (ul->getAskworker())->try_put(depthData);
}
Inside the worker codes, i made a mistake and caused an infinite loop. I thought that those ul objects that have the infinite loops would have the problem. (i.e. not processing any more data through these workers) But application completely stopped, i.e. no other ul object received data through their workers. What do i miss here? Then shall i have separate graph objects? Isnt this design correct to have async behavior with tbb flow graph?
regards
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi cinar,
If you do an infinite loop inside the function_node (which is executed as a tbb task with the data passed to the function_node), then you will be stuck inside one task. When all the tasks in graph complete, then g.wait_for_all method returns. Also, do I understand it correctly that function_node supposed to be async? Then you can use async_node for example.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi nikita, I always assumed async behavior with function node as it was recommended here:
https://software.intel.com/en-us/forums/intel-threading-building-blocks/topic/759155#comment-1932950
It is obvious that my assumption was wrong. I will now switch to async_node for all function_nodes. As all of my nodes will be leaf nodes, what should i define the output type as simply changing function_nodes with async_nodes didnt work.
regards.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In your case, if all nodes are leafs and there is no reason to send the data back to the graph, then the output of async_node could be defined as tbb::flow::continue_msg type. Look at the example provided with async_node. In your case, you submit the data for processing to the async activity (external thread with an infinite loop) through tbb::concurrent_queue. Important things are gateway->reserve_wait() and gateway->release_wait() calls, which prevent the graph from finishing (return from g.wait_for_all method).
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
unfortunately it doesnt work. I simply changed function_node into async_node and compilation failed:
In file included from /home/vadmin/kurpro/tbb-2018_U3-dts7-20180728/include/tbb/tbb.h:53:0,
from QM.cpp:31:
/home/vadmin/kurpro/tbb-2018_U3-dts7-20180728/include/tbb/flow_graph.h: In instantiation of ‘void tbb::flow::interface10::internal::async_body<Input, Ports, Gateway, Body>::operator()(const Input&, Ports&) [with Input = std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >; Ports = std::tuple<tbb::flow::interface10::internal::multifunction_output<tbb::flow::interface10::continue_msg> >; Gateway = tbb::flow::interface10::receiver_gateway<tbb::flow::interface10::continue_msg>; Body = FillInfoBody]’:
/home/vadmin/kurpro/tbb-2018_U3-dts7-20180728/include/tbb/internal/_flow_graph_body_impl.h:171:9: required from ‘void tbb::flow::interface10::internal::multifunction_body_leaf<Input, OutputSet, B>::operator()(const Input&, OutputSet&) [with Input = std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >; OutputSet = std::tuple<tbb::flow::interface10::internal::multifunction_output<tbb::flow::interface10::continue_msg> >; B = tbb::flow::interface10::internal::async_body<std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::tuple<tbb::flow::interface10::internal::multifunction_output<tbb::flow::interface10::continue_msg> >, tbb::flow::interface10::receiver_gateway<tbb::flow::interface10::continue_msg>, FillInfoBody>]’
QM.cpp:1498:1: required from here
/home/vadmin/kurpro/tbb-2018_U3-dts7-20180728/include/tbb/flow_graph.h:3755:9: error: no match for call to ‘(FillInfoBody) (const std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >&, tbb::flow::interface10::internal::async_body_base<tbb::flow::interface10::receiver_gateway<tbb::flow::interface10::continue_msg> >::gateway_type&)’
my_body(v, *this->my_gateway);
^~~~~~~
In file included from OrderManager.hpp:17:0,
from nomxfixClient.hpp:8,
from QM.cpp:35:
BodyDefinitions.hpp:223:8: note: candidate: void FillInfoBody::operator()(std::tuple<FillInfo*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >)
void operator()(tuple<FillInfo*, string> data);
^~~~~~~~
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Also could you please have a look at the following:
https://software.intel.com/en-us/forums/intel-threading-building-blocks/topic/804117
Although i completely removed tbb graph usage from my app code for trial, high CPU usage is still there. (only parallel_for and concurrent_queueu exist in the remaining app.) Is this behavior normal?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Looks like you defined wrong input parameter for FillInfoBody functor. Make it const reference tuple:
void operator()(const tuple<FillInfo*, string>& data)
As for high CPU usage. It is not very high per core actually, but the reason that all CPU cores are utilized is that after parallel_for execution the threads are not destroyed immediately and may spin wait on the internal lock.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
making the tuple const didn't work. Same compilation error. By the way, exactly same functor works for function_node.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
And regarding high CPU usage, even if i use "task_scheduler_init (8)" in my code, still all CPUs are utilized by tbb. Cant i prevent this?
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page