Community
cancel
Showing results for 
Search instead for 
Did you mean: 
tbbnovice
Beginner
56 Views

concurrent_queue freezes after 65535 pops

I have a bizarre problem with TBB concurrent queues and would appreciate any help.

I have two tasks running in parallel (triggered in a parallel_for). The first task ("producer") pushes an event pointer to an eventQueue and waits for (pops) an acknowledgement on an ackQueue. The second task ("consumer") pops the eventQueue and pushes an acknowledgement to the ackQueue once it is done. This way I can have multiple consumers subscribing to the same set of events.

The problem that I have is that after the 65535th event, the pop fails (freezes) even though there is an event on the queue.I put some debug statements. Before I push to the eventQueue, its size is -1 (indicating that a pop is pending on the queue). After I insert in the producer the eventQueue shows a size() of 0 - as expected. But the pop() never happens because I don't see the debug statement I put immediately after the pop(). CPU usage is 100% until I kill the process.

I looked at my code and am sure this is not a problem with overflows, because all my variables are unsigned long. When I hit ctrl-break I. I am pretty sure the code works until event 65535 - are there any consts within the concurrent_queue that assume a max push/pop count? Is this a thread stack overflow? I couldn't find anyone else reporting this problem on this forum before. What is so unique about 65535?

I am using tbb21_012oss. I am not using set_capacity or clear. The capacity (default I guess) shows 536870911. I am using Visual Studio 2005 and I see the problem in both Debug and "Start without Debugging" modes.

Thanks in advance for any suggestions.

0 Kudos
13 Replies
Dmitry_Vyukov
Valued Contributor I
56 Views

Quoting - tbbnovice

I have a bizarre problem with TBB concurrent queues and would appreciate any help.

I have two tasks running in parallel (triggered in a parallel_for). The first task ("producer") pushes an event pointer to an eventQueue and waits for (pops) an acknowledgement on an ackQueue. The second task ("consumer") pops the eventQueue and pushes an acknowledgement to the ackQueue once it is done. This way I can have multiple consumers subscribing to the same set of events.

The problem that I have is that after the 65535th event, the pop fails (freezes) even though there is an event on the queue.I put some debug statements. Before I push to the eventQueue, its size is -1 (indicating that a pop is pending on the queue). After I insert in the producer the eventQueue shows a size() of 0 - as expected. But the pop() never happens because I don't see the debug statement I put immediately after the pop(). CPU usage is 100% until I kill the process.

I looked at my code and am sure this is not a problem with overflows, because all my variables are unsigned long. When I hit ctrl-break I. I am pretty sure the code works until event 65535 - are there any consts within the concurrent_queue that assume a max push/pop count? Is this a thread stack overflow? I couldn't find anyone else reporting this problem on this forum before. What is so unique about 65535?

I am using tbb21_012oss. I am not using set_capacity or clear. The capacity (default I guess) shows 536870911. I am using Visual Studio 2005 and I see the problem in both Debug and "Start without Debugging" modes.

Thanks in advance for any suggestions.

I think that 2 bugs in concurrent_queue are mixed in such interesting way.

See bug III that I've described here:

http://software.intel.com/en-us/forums/showthread.php?t=60617

and bug I that I've described here:

http://software.intel.com/en-us/forums/showthread.php?t=60623

I.e. n_waiting_consumers and n_waiting_producers declared as uint16_t and are never decremented. So when n_waiting_consumers overflows and drops to 0, producers think that that there are no waiting consumers and don't signal. Consumers blocked indefinitely. Bang!

Dmitry_Vyukov
Valued Contributor I
56 Views

Quoting - Dmitriy V'jukov

I think that 2 bugs in concurrent_queue are mixed in such interesting way.

See bug III that I've described here:

http://software.intel.com/en-us/forums/showthread.php?t=60617

and bug I that I've described here:

http://software.intel.com/en-us/forums/showthread.php?t=60623

I.e. n_waiting_consumers and n_waiting_producers declared as uint16_t and are never decremented. So when n_waiting_consumers overflows and drops to 0, producers think that that there are no waiting consumers and don't signal. Consumers blocked indefinitely. Bang!

It seems that both issues are fixed in latests development release (tbb21_20081019). I.e. variables declared as uint32 and decremented after wait.

tbbnovice
Beginner
56 Views

Quoting - Dmitriy V'jukov

It seems that both issues are fixed in latests development release (tbb21_20081019). I.e. variables declared as uint32 and decremented after wait.

Dmitry, thanks a lot for the reply. I was really worried!

Can someone at Intel post a stable release DLL that fixes this problem - if that is not coming out soon, can someone give me a Visual C++ project so I can build the development release on my own?

Btw the following program reproduces this error. Incidentally, the event count goes to a value greater than 65535; on two runs I saw the deadlock at 65949 and 65890. Is there something else going on?

As an aside, is it a smart idea to use a concurrent_queue for acknowledgements? It seems to be overkill (because the queue size is always either -1 or 0) but since there is no monitor/wait mechanism in tbb I wasn't sure how else I can do this.

#include
#include
#include "tbb/blocked_range.h"
#include "tbb/parallel_for.h"
#include "tbb/concurrent_queue.h"
#include "tbb/task_scheduler_init.h"
#include

using namespace std;
using namespace tbb;

class Task;
typedef boost::shared_ptr TaskPtr;
typedef std::vector TaskVector;
typedef boost::shared_ptr > QueuePtr;

class Task{
public:
Task(QueuePtr eventQueue, QueuePtr ackQueue):eventQueue_(eventQueue), ackQueue_(ackQueue) {eventCount_=0;}
virtual void processEvent(){cout << "Hi" << endl;};
protected:
long eventCount_;
QueuePtr eventQueue_;
QueuePtr ackQueue_;
};

class Producer:public Task{
public:
Producer(QueuePtr eventQueue, QueuePtr ackQueue):Task(eventQueue, ackQueue){}
void processEvent(){
long ack;
while (eventCount_<= 100000){
cout << "(P) about to push event " << endl; flush(cout);
eventQueue_->push(eventCount_);
cout << "(P) pushed event " << eventCount_++ << " " << endl; flush(cout);
ackQueue_->pop(ack);
cout << "(P) popped ack " << ack << " " << endl; flush(cout);
}
cout << "(P) completed" << endl; flush(cout);

}
};

class Consumer:public Task{
public:
Consumer(QueuePtr eventQueue, QueuePtr ackQueue):Task(eventQueue, ackQueue){}
void processEvent(){
long inEvent;
long ack=0;
bool done=false;
while (!done){
cout << "(C) waiting for event "<< endl; flush(cout);
eventQueue_->pop(inEvent);
cout << "(C) popped event " << inEvent << " " << endl; flush(cout);
ackQueue_->push(ack);
cout << "(C) pushed ack" << ack++ << " " << endl; flush(cout);
if (inEvent == 100000)
done=true;
}
cout << "(C) completed" << endl; flush(cout);
}
};

class ParallelStarter {
public:
ParallelStarter(const TaskVector& TaskVector): taskSubset_(TaskVector){}

void operator()(const tbb::blocked_range<:ITERATOR>& r) const {
for(TaskVector::iterator i=r.begin(); i!=r.end(); i++){
TaskPtr ip = (*i);
ip->processEvent();
}
}

protected:
TaskVector taskSubset_;
};

int main(){
task_scheduler_init init;

QueuePtr pEventQueue = QueuePtr(new concurrent_queue);
QueuePtr pAckQueue = QueuePtr(new concurrent_queue);
TaskVector taskVector;

TaskPtr consumer = TaskPtr(new Consumer(pEventQueue, pAckQueue));
TaskPtr producer = TaskPtr(new Producer(pEventQueue, pAckQueue));

taskVector.push_back(consumer);
taskVector.push_back(producer);

tbb::parallel_for(
tbb::blocked_range<:ITERATOR>
(taskVector.begin(), taskVector.end()),
ParallelStarter(taskVector), tbb::auto_partitioner());

cout << "DONE" << endl;

return 0;
}

Dmitry_Vyukov
Valued Contributor I
56 Views

Quoting - tbbnovice
Btw the following program reproduces this error. Incidentally, the event count goes to a value greater than 65535; on two runs I saw the deadlock at 65949 and 65890. Is there something else going on?

I think that No. It's Ok that dead-lock occurs only on 65949 (or 65890) iteration. Because n_waiting_consumers incremented only when consumer actually blocks, and if consumer fetches item w/o blocking (item is already in the queue when consumer calls pop() ), then n_waiting_consumers is not incremented.

So if you deadlock on iteration 65949, this just means that (65949 - 65536) = 413 items was consumer w/o blocking.

Dmitry_Vyukov
Valued Contributor I
56 Views

Quoting - tbbnovice
As an aside, is it a smart idea to use a concurrent_queue for acknowledgements? It seems to be overkill (because the queue size is always either -1 or 0) but since there is no monitor/wait mechanism in tbb I wasn't sure how else I can do this.

At first glance, you better use plain threads (tbb::thread) for your producer and consumer. parallel_for is really not the best way to spawn long-running constantly blocking tasks.

Regarding acknowledgements. There are many alternatives.

You can send all events to the consumer, and then wait for all acknowledgements. This makes usage of queue much more well-founded. Or probably you better send a batches of events, trying to keep the number of unconsumed events between low_watermark and high_watermark.

Or you can create third task (thread) to process acknowledgements. I.e.: producer -> queue1 -> consumer -> queue2 -> consumer2. A kind of pipeline.

Or you can use TBB tasks, and use continuations for acknowledgement processing.

Wooyoung_K_Intel
Employee
56 Views

Hi,
I have tested your code in the latest TBB source tree and it completed as expected.
As Dmitry noted, the fixes to the issuesDmitry uncovered seem to resolve your issue as well.
I used VS 2008.
BTW, do you need the DLL immediately? Could you wait for a couple of releases?
We plan to make the one following the next release a stable release... (though it is notinscribed on a stone)
-wooyoung
Alexey_K_Intel3
Employee
56 Views

Quoting - tbbnovice
...
int main(){
task_scheduler_init init;

QueuePtr pEventQueue = QueuePtr(new concurrent_queue);
QueuePtr pAckQueue = QueuePtr(new concurrent_queue);
TaskVector taskVector;

TaskPtr consumer = TaskPtr(new Consumer(pEventQueue, pAckQueue));
TaskPtr producer = TaskPtr(new Producer(pEventQueue, pAckQueue));

taskVector.push_back(consumer);
taskVector.push_back(producer);

tbb::parallel_for(
tbb::blocked_range<:ITERATOR>
(taskVector.begin(), taskVector.end()),
ParallelStarter(taskVector), tbb::auto_partitioner());

cout << "DONE" << endl;

return 0;
}

You should better explicitly use threads (e.g. tbb_thread) instead of this trick with parallel_for. The difference is that your producer-consumer pattern mandates concurrency (i.e. there should be at least two threads to run it correctly) but parallel_for (and TBB task schedulerin general) was designed for so-called relaxed sequential semantics where concurrency is optional, not mandate. The problem will happen if you run the code on a single-socket single-core machine - TBB will not create any worker threads by default, and your code will block waiting e.g. for acknowledgment from the consumption task that was not even started because there is no worker.

Or you could look at the pipeline pattern, with producer code being in the first (or input) pipeline stage and consumer code being the second. You might even have no need for acknowledgment (depending on what do you use it for, though), because the tbb::pipeline is designed in such a way that a worker thread strives to execute all stages in the pipeline, i.e. after producing some item the same thread then starts consuming it, while another thread enters producer stage to get an item for itself. This also gives the guarantee that the code will work even with a single thread, which in this case will just pass each item through the whole pipeline before going for the next one.

tbbnovice
Beginner
56 Views

You should better explicitly use threads (e.g. tbb_thread) instead of this trick with parallel_for. The difference is that your producer-consumer pattern mandates concurrency (i.e. there should be at least two threads to run it correctly) but parallel_for (and TBB task schedulerin general) was designed for so-called relaxed sequential semantics where concurrency is optional, not mandate. The problem will happen if you run the code on a single-socket single-core machine - TBB will not create any worker threads by default, and your code will block waiting e.g. for acknowledgment from the consumption task that was not even started because there is no worker.

Or you could look at the pipeline pattern, with producer code being in the first (or input) pipeline stage and consumer code being the second. You might even have no need for acknowledgment (depending on what do you use it for, though), because the tbb::pipeline is designed in such a way that a worker thread strives to execute all stages in the pipeline, i.e. after producing some item the same thread then starts consuming it, while another thread enters producer stage to get an item for itself. This also gives the guarantee that the code will work even with a single thread, which in this case will just pass each item through the whole pipeline before going for the next one.

First of all, let me thank you all for your prompt help - I really appreciate it! I am working on a prototype event processing framework and am championing TBB for this project. I would have been killed by this bug (because we do process millions of events) and am glad it is resolved.

I wasn't aware of tbb::thread until I read your posts - apparently the Reinders book needs an update! I will get rid of the parallel_for. However I have tens of short-running, constantly blocking producers/consumers (each consumer is also a producer for the next stage) but not all of them are active at the same time. From your documentation I understand that I shouldn't be creating too many tbb::threads. What would be really helpful is a threadpool class like http://threadpool.sourceforge.net. Then I can have all producers/consumers in a given stage in one threadpool and I will have three or four such pools (stages) in my app. Any thoughts?

I think the producer-consumer pattern I listed above has an interesting application: Assuming the producer would never send the next event until it got an ack; if the consumer posts an ack only after processing the event, it results in synchronous processing but if the consumer buffers the event and posts the ack immediately, we have asynchronous processing. So we can mix/match sync/async processing at run-time trivially (I am sure someone thought of this before). I can build a complete event processing framework easily using TBB's queues and threads. However I cannot send all events to the consumer in a batch; I can use a thread to process acks but because I have too many "tasks", wouldn't it degrade performance? I guess I cannot use pipelines since the stages are sequential. What is the "right" design for a problem like this?

What are your recommendations if I am using custom objects (instead of primitive types) - I think I can use a scalable allocator only for primitive types. Essentially, I want to do a new shared pointer to my custom object millions of times and I think I will have a performance hit if I use the standard new.

Lastly, I think it would really help if we can get some tbb design patterns/best practices guidance.

Wooyoung, I don't need a stable release dll immediately, but any dll that fixes this problem will be most useful. Can you upload a debug dll for the latest dev release? If possible, can you post the VC 2008 solution to build development releases so I can do my own build? Also why do commercial aligned releases have a later date than the most recent stable release? I thought only stable releases become commercial aligned.

Dmitry_Vyukov
Valued Contributor I
56 Views

Quoting - tbbnovice

I think the producer-consumer pattern I listed above has an interesting application: Assuming the producer would never send the next event until it got an ack; if the consumer posts an ack only after processing the event, it results in synchronous processing but if the consumer buffers the event and posts the ack immediately, we have asynchronous processing. So we can mix/match sync/async processing at run-time trivially (I am sure someone thought of this before).

IMVHO, the design is busted. If you need asynchronous processing then it will be better if producer will send directly "message which doesn't require ack". In current design producer unnecessary blocks/unblocks, receives ack; consumer sends message, signals producer. Overheads are huge, potentially tens of thousands of cycles.

If you need synchronous processing, then there is completely no need for several threads. Thread creates message, and then directly processes it. Simple, no need for threads, blocking, unblocking, waits, messages etc. Once again, you are imposing unnecessary huge overheads.

Quoting - tbbnovice

I can build a complete event processing framework easily using TBB's queues and threads. However I cannot send all events to the consumer in a batch; I can use a thread to process acks but because I have too many "tasks", wouldn't it degrade performance?

For sure! Oversubscription of threads do will degrade performance.

You have to completely separate "physical level" (threads) and "logical level" (tasks, producers, consumers etc). Number of threads must be determined by hardware, while number of logical entities will be determined by end-user application. It not very good if design of end-user application will have some influence on "physical level" (number of threads).

Dmitry_Vyukov
Valued Contributor I
56 Views

Quoting - tbbnovice

What are your recommendations if I am using custom objects (instead of primitive types) - I think I can use a scalable allocator only for primitive types. Essentially, I want to do a new shared pointer to my custom object millions of times and I think I will have a performance hit if I use the standard new.

Definitely you will have performance (and scalability) hit, if you will be using standard new. Al least for now, MSVC2010 will have built-in scalable allocator.

It's perfectly Ok to use TBB's scalable malloc for user-defined objects. The simplest thing you can do is to define global ::operator new(), ::operator delete() to use tbb::scalable_malloc()/tbb::scalable_free().

tbbnovice
Beginner
56 Views

Dmitriy, thanks again for your insights. Your point about the overheads is well taken. One of the reasons why I had an ack for async processor was for flow-control but from your message it seems there might be better alternatives. I will post a separate thread for design questions.

I spent a few hours trying to build the dev release using the instructions on http://www.devx.com/go-parallel/Article/37896 but I had no luck.

Can anyone PLEASE send me the 32-bit windows debug dll that includes the fix for this problem? You can email me prasadus AT hotmail or post an attachment. THANKS A LOT!!!

tbbnovice
Beginner
56 Views

I was able to build this at last and it solved my problem! Thanks a lot Wooyoung!!

Dmitry_Vyukov
Valued Contributor I
56 Views

Quoting - tbbnovice

I was able to build this at last and it solved my problem! Thanks a lot Wooyoung!!

Actually you can link TBB statically to your code.

You have to include all *.cpp files from src/tbb, MemoryAllocator.cpp from src/tbbmalloc, and *.asm files for your platform directly into your project, define __TBB_TASK_CPP_DIRECTLY_INCLUDED=1 and USE_WINTHREAD=1 (or USE_PTHREAD=1), turn off precompiled headers, and call mallocThreadShutdownNotification() at the end of every thread. IIRC, that is all.

If you don't create/destroy many threads dynamically then you can not call mallocThreadShutdownNotification(), all memory will be returned to OS on process death.