Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

pipeline problem

inshi1
Beginner
256 Views
Hi, I'm running a packet processing program using intel TBB pipeline, generally, each task is reading from the socket using epoll and returns NULL if no more processing should be done.

using the following (semi pseudo code):

tbb::pipeline pipeline;
pipeline.add_filter( *packetProc );
pipeline.run(20);

the packetProc runs in parallel mode (tbb::filter(parallel)).
now, about the filter:

void* PacketProcessor::operator()(void* )
{
int res = 1;
int ppID = counter.fetch_and_add(1);
counter++;
res = process(ppID);
counter--;
if (res == 0)
printf("process returned 0!\\n");
printf("the number of active tasks is %d\\n", counter);
return (void*)res;
}

process returns 0 if it should stop scanning the packets, 1 otherwise.
one last thing - I have Pentium QuadCore.
The questions:

1. when I run the program, I see the number of active tasks is around 10, how come? I have 4 processors. shouldn't the maximum number of parallel tasks be as the number of processors I have?
2. usually, when scanning should be stopped, I see 4 times the message "process returned 0!" and then the pipeline run() command finish, but sometimes - I see the message only 3 times and run() doesn't finish - the worse part is that I don't know where's the 4th task is stuck (if it's stuck at all). how can I find what is wrong?

Thanks,
Lior.


0 Kudos
2 Replies
jimdempseyatthecove
Honored Contributor III
256 Views
Are these counter operations atomic (thread safe)?

counter.fetch_and_add(1);
counter++;
counter--;

Also, in the time interval between counter-- by your thread and the

printf("the number of active tasks is %d\n", counter);

counter may have changed.

consider

int wasCounter = --counter; // where -- is an atomic operator--()

Then use wasCounter in your print statement

Jim Dempsey
0 Kudos
Alexey-Kukanov
Employee
256 Views

You increment the counter twice for each invocation, but reduce it only by one; no wonder that it increases. If you want to see how much real parallelism is there, you should use another atomic counter for that.

For the second question, I would use debugger to see where the threads are stuck. The things you need to remember: the function process() should be a) thread safe, i.e. allow being called concurrently from multiple threads, and b) tolerable to be called multiple times even after returning 0. The last requirement is because your input filter is parallel, so it's impossible to prevent calls to the filter even after it returned NULL. And the number of such calls cannot be predicted, so the function should not expect it willalways be 4, or 3, or whatever else.

0 Kudos
Reply