Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

consuming TBB concurrent_queue

cinar_e_
Beginner
1,470 Views

Hello,

I work on Linux environment.

in the current application i work on, I will have several consumers for some form of data. In my design each consumer will have their own concurrent_queues and data generator will push data in to the corresponding queues. When the consumer receives the data, it will call parallel_for to push its own consumers to make some calculations with the data.

In order to have these separate queues work on their separate thread/task, may i create TBB tasks? or should i go with pthreads? Shall i consider queue consuming as an I/O bounded task so i shouldnt consider TBB tasks?

thanks in advance.

0 Kudos
14 Replies
Aleksei_F_Intel
Employee
1,470 Views

Hello,

From my perspective, the application you described resembles several pipeline schemes applied simultaneously. Specifically for such application designs, TBB Flow Graph was introduced in the library. With Flow Graph there is no point to introduce concurrent_queues, but use flow graph nodes for this purpose. Consider multifunction_node that has many queue_nodes as successors. In its body multifunction_node decides in which successor(s) (queue_node) to put the next message. After every queue_node goes function_node that makes some calculations with the data received. In the beginning of the graph there could be a source_node that fetches the data from some source (i.e. consumes it) and passes it to the multifunction_node.

Regards, Aleksei

0 Kudos
cinar_e_
Beginner
1,470 Views

Thank you for the prompt reply. It is correct that flow graph or pipeline might help here for my requirements. However, i am not used to those so in order to have a fast solution, i wanted to continue with the queue. 

So do you have any comments for the solution with the concurrent queue?

 

thanks and regards.

0 Kudos
Aleksei_F_Intel
Employee
1,470 Views

Hi,

Could you please provide more info regarding your design then. I want to understand why you really need concurrent queues? Why not simply call parallel_for whenever the data is received and not pass it through the concurrent queue at first?

Regards, Aleksei

0 Kudos
cinar_e_
Beginner
1,470 Views

Hello,

This is an option pricing engine. Application has several underlying instruments and each instrument has several options defined under them. Market Data is listened by the application. Each change in market data for a specific instrument triggers pricing calculations for the options defined under that specific instrument. 

So Market Data listener will put the market data change events in the corresponding queue of the specific underlying and when the underlying gets that market data update from its specific queue will trigger price calculations for its options. (through parallel_for)

In conclusion, parallel_for will be used for the calculations of a specific instruments' options. But in order to have the concurrency for different instruments, i plan to have the queues defined per instrument. As soon as market data listener reads a market data update, it will write it to the specific queue and continue with parsing the next message. (which might be an update for another instrument.)

regards.

cinar.

 

0 Kudos
Aleksei_F_Intel
Employee
1,470 Views

Then whenever the data is put in the queue you can enqueue TBB task that will retrieve that data from that queue and do further processing. However, please keep in mind that TBB tasks is a low level interface and its usage is discouraged.

The other variant that I see is to create additional threads that will wait on the "pop" operation of concurrent_bounded_queues. But in this case every thread will be dedicated to service its own queue and you would need to manage threads synchronization on your own. Whenever the data is available the thread could call, for example, parallel_for algorithm for price calculation. 

I still encourage you to read TBB Flow Graph component's documentation (https://www.threadingbuildingblocks.org/docs/help/hh_goto.htm?index.htm#tbb_userguide/Basic_Flow_Graph_concepts.html) as it should be very useful in your case.

Regards, Aleksei

0 Kudos
cinar_e_
Beginner
1,470 Views

Hello Aleksei,

 

regarding tbb flow graph usage for my usecase; as soon as new market data is received, i will create a graph and fire it. But shouldnt i wait for that graph to finish (wait_for_all()) and then process the next market data in this usecase? Then i cant have parallel processing of market data for different instruments. Could you help me here?

0 Kudos
Aleksei_F_Intel
Employee
1,470 Views

Hello cinar,

Glad to hear that you decided to give a chance to flow graph-based design for you app.

First of all, you do not need to create a graph each time you received new market data. It is enough to create it once and then put the data into it (through try_put() API) each time the new portion of data needs to be processed. Or course, in order to be sure that a particular try_put() has made all the way through the graph, graph.wait_for_all() must be called. However, it is not required that every wait_for_all() call should correspond to every start of the graph. One can start the graph many times by putting the messages (perhaps, concurrently) in it starting node(s) and then call wait_for_all() only once. This guarantees that all the messages that were put have been processed by the graph. In this sense, all the messages will be processed in parallel relatively to themselves by the graph.

As far as I understand, in your particular case with several instruments and several options defined for each instrument, you would like to separate the thread that listens to the changes in market data from the thread that "consumes these updates" from the listener. I would try to have sort of concurrent queue between them. The listener puts the data into the queue, while the consumer thread gets the data and try_put() it into the graph and calls wait_for_all() on it. Now, once the thread returns from this blocking call, it goes back to the concurrent queue to consume (put them into the graph again), attention, all the updates that have been put into the queue by the listener while the consumer was busy in wait_for_all() call. In addition, you could make sort of a dispatching node (try multifunction_node) at the beginning of the graph, that will decide what specific instrument the changed data belogs to, and put the message into the successor node, that corresponds to the calculation of options for that specific instrument.

Regards,
Aleksei

 

0 Kudos
cinar_e_
Beginner
1,470 Views

Hello Aleksei,

I plan to create the solution as you suggested. I will have the multifunction_node and it will send the market data to the related underlying's function node in which i will have the parallel_for for the option calculations.

I wanted to get experienced with the multifunction_node and created the following solution as a poc. As the function_nodes are created with serial flag, i expected the numbers output to be in ascending order.  But this isnt the case; numbers are written in a random order. What do i miss here? Could you comment?

regards.

#include "tbb/flow_graph.h"
#include <chrono>
#include <thread>

using namespace tbb::flow;
typedef multifunction_node<int, tbb::flow::tuple<int,int> > multi_node;

struct MultiBody {

  void operator()(const int &i, multi_node::output_ports_type &op) {
    if(i % 2)
      bool x = std::get<1>(op).try_put(i); // put to odd queue
    else {
      bool x = std::get<0>(op).try_put(i); // put to even queue
     }
  }
};

int main() {
  graph g;


  tbb::flow::function_node< int, tbb::flow::continue_msg, tbb::flow::queueing >
            first_worker(g, tbb::flow::serial, [] (const int& data) {
            printf("Process data with first worker: %d\n", data);


    });

   tbb::flow::function_node< int, tbb::flow::continue_msg, tbb::flow::queueing >
            second_worker(g, tbb::flow::serial, [] (const int& data) {
                 printf("Process data with second worker: %d\n", data);
    });

  multi_node node1(g,unlimited,MultiBody());


  make_edge(output_port<0>(node1), first_worker);
  make_edge(output_port<1>(node1), second_worker);

  for(int i = 0; i < 1000; ++i) {
    node1.try_put(i);
  }
  g.wait_for_all();
}

0 Kudos
Aleksei_F_Intel
Employee
1,470 Views

Hello cinar,

multifunction_node has unlimited parallelism, which means that invocations of its body could be done concurrently. And when things are happening concurrently all the bets are off. Therefore, one possible situation is when the second body invocation of multifunction_node completes first and puts the message to the corresponding function_node. Now imagine that similar thing could happen with 1000 invocations. So, depending on how OS schedules threads, their number, core states, etc., it could be perfectly legal for the thousandth message to come first to the function_node. In order to have ordering of messages one would need to use "sequencer_node" (https://www.threadingbuildingblocks.org/docs/help/hh_goto.htm?index.htm#reference/flow_graph/sequencer_node_cls.html). This is the node that maintains global ordering across the messages.

Regards,
Aleksei

0 Kudos
cinar_e_
Beginner
1,470 Views

Hello Aleksei,

Right after i wrote the message, i understood the situation. Thanks for your comment.

If i dont use the multifunction node and i create one function node (configured to be run serially) per underlying and then as soon as i get the market data, i can find the corresponding  underlying and then try_put this market data to that node; inside the node i will call parallel_for for each of the options owned by that underlying.

So, in summary N function_nodes for N underlyings with parallel_for inside the function_nodes will be the solution for my requirements. Do you agree?

regards.

0 Kudos
Aleksei_F_Intel
Employee
1,470 Views

Hi cinar,

Yes, sounds reasonable to me except that I cannot understand why you need to make function_nodes to have serial semantics? Do these nodes are really processing shared data? I would also try to make them with unlimited concurrency (perhaps, with unavoidable data copying) and look which solution provides more throughput of the system.

Regards, Aleksei.

 

0 Kudos
cinar_e_
Beginner
1,470 Views

hello Aleksei,

 

Market data for a specific underlying instrument must be processed serially (i.e. which price happens first must be processed first.)

Consider the following scenario: for a specific underlying, following two prices come one after the other; 10.0 and then 10.1

If i allow the function_node (which exists to be separate for each underlying instrument) to have parallel semantics, first 10.1 might be processed and then 10.0. The last processed price is 10.0 but the last price for the underlying is 10.1.

so there is no shared data but order of the market data is important.

do i miss something here?

regards.

0 Kudos
Aleksei_F_Intel
Employee
1,470 Views

Hi cinar,

You got it right, if function_node.try_put() is done from the same thread first for 10.0 and then for 10.1 price.

Regards, Aleksei

0 Kudos
cinar_e_
Beginner
1,470 Views

hello Aleksei,

After your recommendations, i started to use flow graph for my async needs. I have several classes that has funciton nodes. All of the function nodes are leaf nodes, i.e. they are not connected to any other nodes. You told that i can only have one graph in the entire application. 

I have several questions but first one: 

I have such a code:

Underlying* ul = ulit->second;

            if (depthData.side == Side::Buy) {
                bool b = (ul->getBidworker())->try_put(depthData);
            } else if (depthData.side == Side::Sell) {
                bool b = (ul->getAskworker())->try_put(depthData);
            }
Inside the worker codes, i made a mistake and caused an infinite loop. I thought that those ul objects that have the infinite loops would have the problem. (i.e. not processing any more data through these workers) But application completely stopped, i.e. no other ul object received data through their workers. What do i miss here? Then shall i have separate graph objects? Isnt this design correct to have async behavior?

 

regards.

0 Kudos
Reply