Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

processing of infinite loop

Gospodarencu__Ruslan
537 Views

Is there a pattern (or algorithm) for infinite processing a container (suppose a concurrent_queue, that is dynamical completed with data)?

It's more like a manager for data that comes continuously from network. The data is read from the queue (reading), do some processing and analysis (processing) and then just post some details to a database (posting). The most work is in the "processing" part and in most it is a serial one (no way divide).

Currently I do with standard C++ threads, but the problem is in the multiprocessing. After reading some about TBB, i've found out that Tasks could help a lot, and the time for transferring data to Tasks is faster that to Threads.

I have tried using parallel_pipeline and tasks, but don't really understand how to make it infinite.

Any help or advises?

0 Kudos
4 Replies
Alexei_K_Intel
Employee
537 Views

Can you clarify what you currently do with C++ threads and what multiprocessing problems you face? Are you trying to process multiple data units form network simultaneously? Do you use multiple threads to read from data queue?

0 Kudos
Gospodarencu__Ruslan
537 Views

The application on start, a thread is created that reads data from network and arranges it in a concurrent_queue. 

After that I use a CTPL library that helps me to create a threadpool (with max_threads I give).

In an infinite loop I read data from concurrent_queue and push that data to the pool and it is processed in threads. The out put is just a record in a database. The appication runs forever.

Here is a small example of main code:

int main() {
  ctpl::thread_pool p(numthreads);
  DataStreamer data;
  while(true){
    qItem item;
    if(data.frameQueue->try_pop(item)){
      p.push(process_data, item); // process_data is the processing function 
    }
  }
}

A try I  have done is getting data from queue from within the "process_data" function, but it doesn't change a lot.

As I read in TBB the tasks works better that threads, and in a thread could be run more than one task, that's why I decided to use TBB.

The problem for me now is to choose a right algorithm or pattern from tbb.

Here is my last example to use. The question for me is whe should I put my infinite loop "while(true) ..." ?

DataStreamer data;

class TTask : public tbb::task {
public:
	task* execute( ) {
		qItem item;
		while(true){ // THAT IS WRONG, SO WHERE TO PUT IT?????
			if(data.frameQueue->try_pop(item)){
				process_data(item);
			}
		}
		return NULL;
	}
};

static void TaskProcess(int nThreads){
	TTask& f = *new(tbb::task::allocate_root()) TTask();
	tbb::task::task::spawn_root_and_wait(f); //enqueue(f);//
}

int main(){
	try {
		utility::thread_number_range threads(tbb::task_scheduler_init::default_num_threads,1);
		if ( threads.first ) {
			for (int p = threads.first; p <= threads.last; p = threads.step(p)) {
				// Construct task scheduler with p threads
				tbb::task_scheduler_init init(MaxThreads);
				// Execute parallel algorithm using task or template algorithm here
				TaskProcess(MaxThreads);
				// Implicitly destroy task scheduler
			}
		}else{
			tbb::task_scheduler_init init;
			TaskProcess(0);
		}
		return 0;
	} catch(std::exception& e) {
		std::cerr<<"error occurred. error text is :\"" <<e.what()<<"\"\n";
	}
}

Can parallel_do be used in my way?

0 Kudos
Vladimir_P_1234567890
537 Views

I suggest to look at a tbb router example https://software.intel.com/en-us/articles/using-intel-tbb-in-network-applications-network-router-emulator. there is a place to put the right 'while' statement.

Vladimir

0 Kudos
Gospodarencu__Ruslan
537 Views

Used the example provided in the link above and looks very good.

But still have a problem. I use dlib for processing some data in GPU so I use a global variable to define a neural network and send the data to process into that network. The problem is in too many acceses to that variable from tasks. In a first method where I've been using process pooling a create a network for each thread. In the previous model with tasks I create a network in each task and reuse it. In pipeline model I can't figure out how to manage that. It's not a solution to create and use that network in a parallel filter block because of time and resources consumming (I guess it's because of tasks create it).

So the question, what is the correct way of define global variables (or maybe vectors of them) and use them from within parallel filter blocks?

0 Kudos
Reply