Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2464 Discussions

RFC: pipeline vs. scheduler+tasks for streaming

janwassenberg
Beginner
536 Views
Hello,

having hand-rolled the parallelization of previous HPC kernels (which did have the advantage of NUMA- and shared-cache-aware partitioning), I am faced with a more complex task and figure TBB is preferable to reinventing more wheels.

Since I am not experienced with the API beyond the tutorial and patterns cookbook, some advice to prevent walking too far down the wrong path would be appreciated.
In short: would you prefer the low-level task route for the following application (despite lots of apparent gotchas and details), or is there a simple way to cleanly express it with the higher-level parallel_do or pipeline APIs?

The application consists of a consumer in a separate, third-party DLL that continually asks for 100..500 KB packets (fixed-size, but I can influence it) out of a total of several GB. Those requests go to my IO thread that issues asynchronous reads for corresponding blocks in a file (whose format I control). When IOs have completed, I want to farm out LZOP decompression to one or several (which implies packets are stored on disk as several independent compressed streams) worker threads.
The consumer will probably request multiple, equally important packets at once, but the latency between requesting and receiving decompressed packets is important, so the number of streams/workers per packet needs to be tunable. The worst-case latency caused by always assigning worker N to stream N of each packet should be avoided, since the consumer can derive some benefit from timely delivery of the first fully-decoded packet.

Now it would be nice to express this via higher-level constructs. It looks like parallel_do would permit parallel_do_feeder::add, but the iterator interface would require hackery (an infinite range). pipeline() avoids this problem, but would require blocking in the first stage until a request arrives (ugh).

Am I correct in assuming it's a bad idea to have a single worker thread run a 'dummy' parallel_do/pipeline until it gets work (and receives reinforcements from other TBB-created threads)?

The lack of control over prioritization/ordering would also seem to imply the need for tasks that pop a stream from the first packet from a FIFO (concurrent_queue). Is this correct, or is there a simpler/more efficient way?

Thanks for any advice and happy holidays!
Jan
0 Kudos
9 Replies
RafSchietekat
Valued Contributor III
536 Views
The problem is not completely transparent from your description, so I'll start with what you're saying/asking about TBB.

"Now it would be nice to express this via higher-level constructs. It looks like parallel_do would permit parallel_do_feeder::add, but the iterator interface would require hackery (an infinite range)."
Seems rather like standard C++ STL fare, even if it is fairly advanced: have a look at std::istream_iterator.

"pipeline() avoids this problem, but would require blocking in the first stage until a request arrives (ugh)."
It is a customary workaround to oversubscribe by one thread to compensate for a blocking input filter, assuming that this doesn't have a lot of work to do compared to the CPU-bound threads.

"Am I correct in assuming it's a bad idea to have a single worker thread run a 'dummy' parallel_do/pipeline until it gets work (and receives reinforcements from other TBB-created threads)?"
Does the workaround described above resemble what you mean?

"The lack of control over prioritization/ordering would also seem to imply the need for tasks that pop a stream from the first packet from a FIFO (concurrent_queue). Is this correct, or is there a simpler/more efficient way?"
Enqueued tasks (as opposed to spawned tasks) are scheduled roughly in FIFO order, but if my interpretation that you want to favour collaboration on the most recent packet before tackling the next packet is correct you would have to enqueue tasks for the individual streams.

0 Kudos
janwassenberg
Beginner
536 Views
"The problem is not completely transparent from your description"
I've tried to distill it down to the abstract essence, but am happy to expand upon the description :)

"Seems rather like standard C++ STL fare, even if it is fairly advanced: have a look at std::istream_iterator."
OK, yes, we can define an iterator that never compares equal to the 'end' iterator, thus giving an infinite range. However, this parallel_do approach would still suffer from a suboptimal ordering of the tasks (:= decompressing a stream from a packet), because parallel_do_feeder_impl 'spawns' tasks, thus tending towards LIFO order.

"It is a customary workaround to oversubscribe by one thread to compensate for a blocking input filter"
Ah, thanks for confirming this is standard practice.

"Does the workaround described above resemble what you mean?"
Yes, but I hadn't thought of oversubscription. Nice!

"Enqueued tasks (as opposed to spawned tasks) are scheduled roughly in FIFO order, but if my interpretation that you want to favour collaboration on the most recent packet before tackling the next packet is correct you would have to enqueue tasks for the individual streams."
Right; either way (pipeline or direct use of tasks) would "enqueue" tasks covering 1/Nth of a packet.
If I understand correctly, pipeline has a slightly stronger guarantee, in that tasks begin in precise FIFO order (I don't mind if they retire out-of-order, since they can decompress directly into their portion of the output packet).
It seems that pipeline is easier to use, and the blocking-first-stage problem is solved; are there any other pitfalls lurking?

Best Regards
Jan

0 Kudos
jimdempseyatthecove
Honored Contributor III
536 Views
The problem as you have described is that the 3rd party DLL will request (uncompressed) data from an unknown position(s)/location(s) distributed within your compressed archive(s). IOW the 3rd party DLL will not run through the data in a serial manner (excepting for the 100...500KB packets).

Should the 3rd party DLL read request follow a serial pattern, then parallel_pipeline would be good selection. As the parallel_pipeline is designed forthroughput.However, since the next read location(s) is unknown until the 3rd party DLL makes a decision, your problem is more of a latency issue.

What is unknown is:

Is the consumer application in the 3rd party DLL single threaded or multi-threaded?
If multi-threaded which threading model?
Is the access a preponderance of random seek, followed byserial access?

Jim Dempsey
0 Kudos
RafSchietekat
Valued Contributor III
536 Views
"OK, yes, we can define an iterator that never compares equal to the 'end' iterator, thus giving an infinite range."
Or you can use parallel_while, which has a more approachable demeanour. Maybe somebody could come up with an adapter in case deprecation ever becomes effective termination.

"However, this parallel_do approach would still suffer from a suboptimal ordering of the tasks (:= decompressing a stream from a packet), because parallel_do_feeder_impl 'spawns' tasks, thus tending towards LIFO order."
I wouldn't write it off just yet, although it would be nice to have some more information about its behaviour with a non-random iterator. But why would you use the feeder here for something that you wouldn't want to be computed ASAP?

"are there any other pitfalls lurking?"
"There are known knowns; there are things we know that we know. There are known unknowns; that is to say, there are things that we now know we dont know. But there are also unknown unknowns; there are things we do not know we dont know."
0 Kudos
janwassenberg
Beginner
536 Views
"The problem as you have described is that the 3rd party DLL will request (uncompressed) data from an unknown position(s)/location(s) distributed within your compressed archive(s). IOW the 3rd party DLL will not run through the data in a serial manner (excepting for the 100...500KB packets)."
Yes, that is accurate (there is only 1 archive).

"Should the 3rd party DLL read request follow a serial pattern, then parallel_pipeline would be good selection. As the parallel_pipeline is designed for throughput. However, since the next read location(s) is unknown until the 3rd party DLL makes a decision, your problem is more of a latency issue."
Agreed, though it could also turn into a throughput problem if packets are chopped into too many parts (thus causing compression ratio to suffer), or if multiple workers decompressing into the final buffer cause RFOs.
However, this seems to be affected more by the number of streams per packet than the scheduling, so parallel_pipeline wouldn't be at a disadvantage vs. direct use of the scheduler, right?

"Is the consumer application in the 3rd party DLL single threaded or multi-threaded?"
It's multi-threaded and under the control of a co-worker, who probably wouldn't begrudge some changes :)

"If multi-threaded which threading model?"
It's using Qt; the main thread owns the window and reacts to events. A separate OpenGL thread spun off via Qt is responsible for rendering and deciding which packets it wants next/soon. The communication is to be via Qt events and concurrent_queue.

"Is the access a preponderance of random seek, followed by serial access?"
All accesses are at packet granularity. We are currently going through gyrations to define a good locality-preserving mapping from the original 3 dimensions to the 1-D file, so that the next packet(s) to be requested might be near the last one. However, nothing is perfect, so these accesses are mostly random. This is where the Fusion-IO SSD will come in handy :D

"Or you can use parallel_while, which has a more approachable demeanour."
Ah yes, that is easier. (I had assumed it disappeared in 3.0)

"I wouldn't write it off just yet, although it would be nice to have some more information about its behaviour with a non-random iterator. But why would you use the feeder here for something that you wouldn't want to be computed ASAP?"
I was digging through the source and saw calls to spawn(). The problem with that is the following:
- the consumer might request several packets within a short timeframe

- if N workers decompress each part of N packets in turn, latency is increased vs. finishing up one packet immediately and passing that on.
Therefore, a near-FIFO ordering of the packets' parts would be good.

"There are known knowns; there are things we know that we know. There are known unknowns; that is to say, there are things that we now know we dont know. But there are also unknown unknowns; there are things we do not know we dont know."
hehe, true. The last category is unavoidable, but it's definitely good to look before leaping. Thanks for the feedback so far!

Best Regards
Jan
0 Kudos
jimdempseyatthecove
Honored Contributor III
536 Views

Jan,

>>>

"Is the consumer application in the 3rd party DLL single threaded or multi-threaded?"

It's multi-threaded and under the control of a co-worker, who probably wouldn't begrudge some changes :)

"If multi-threaded which threading model?"

It's using Qt; the main thread owns the window and reacts to events. A separate OpenGL thread spun off via Qt is responsible for rendering and deciding which packets it wants next/soon. The communication is to be via Qt events and concurrent_queue.

"Is the access a preponderance of random seek, followed by serial access?"

All accesses are at packet granularity. We are currently going through gyrations to define a good locality-preserving mapping from the original 3 dimensions to the 1-D file, so that the next packet(s) to be requested might be near the last one. However, nothing is perfect, so these accesses are mostly random. This is where the Fusion-IO SSD will come in handy :D

if N workers decompress each part of N packets in turn, latency is increased vs. finishing up one packet immediately and passing that on.

Therefore, a near-FIFO ordering of the packets' parts would be good.

<<<

It is generally counter productive to mix thread management technologies (in this case Qt and TBB) unless both technologies share the same underlying thread pool. Therefore, consider choosing one technique or the other.

While I am not a Qt programmer, you should be able to do something like the following sketch code (OpenMP-ish style):

[cpp]packe_t* packets[nUncompressionThreads];
for(int i=0; i < nUncompressionThreads; ++i)
   packets[nUncompressionThreads] = NULL;
#pragma omp parallel num_threads(nUncompressionThreads)
{
	int myThreadNum = omp_get_thread_num();
	if(myThreadNum == 0)
	{
		// do 0 last
		for(int i=1; i < nUncompressionThreads; ++i)
			packets = getPacket(i);
		packet[0] = getPacket(0);
	}
while(packets[myThreadNum] == 0)
_mm_pause(); unPack(packets[myThreadNum]); } // omp parallel [/cpp]

The strategy here is to start your unpack thread pool prior to starting the read. In this manner you get some overlap of decompression during 2nd and later reads _while_ eliminating thread startup-to-begin decompressionlatency after each read.

Jim Dempsey
0 Kudos
janwassenberg
Beginner
536 Views
"It is generally counter productive to mix thread management technologies
(in this case Qt and TBB) unless both technologies share the same underlying
thread pool."

I agree in general. However, in this case, we'd have an OpenGL thread
running at full throttle, and the high-priority IO thread and main thread sharing
a core. We're planning on requiring at least a quad-core processor, so that
leaves 2..N-2 cores dedicated to decompression. Assuming the OS isn't stupid
about scheduling them (if it is, there's always SetThreadIdealProcessor and
affinity), I don't see any problem with mixing threading packages in this way.
Since TBB claims to be interoperable and Qt makes no such guarantee (we also
need to post Qt events to the main thread), it is probably safer this way :)

"The strategy here is to start your unpack thread pool prior to starting the
read. In this manner you get some overlap of decompression during 2nd and later
reads _while_ eliminating thread startup-to-begin decompression latency after each read."

That sounds good. However, I was hoping that TBB pipeline would
also provide the avoid-startup-latency benefit while avoiding some of
the complexity that seems to creep into do-it-yourself OpenMP code.
For example, specifying nUncompressionThreads+1 tokens and a blocking serial
input stage automatically makes for non-spinning waits in the other workers
(http://software.intel.com/en-us/blogs/2008/06/09/the-user-driven-evolution-of-tbbpipeline/),
which would have to be done manually (possibly OS-specific, too) with the OpenMP approach.

BTW, out of curiosity, what is the rationale for wanting to set
packets[0] last? I don't see any harm in having worker i>0 start
unpacking before worker #0.
0 Kudos
jimdempseyatthecove
Honored Contributor III
536 Views
>>That sounds good. However, I was hoping that TBB pipeline would
also provide the avoid-startup-latency benefit while avoiding some of
the complexity that seems to creep into do-it-yourself OpenMP code.

I did not mean to suggest you use OpenMP, I merely used that to present a better sketch code. You should be able to do the same thing using PThreads, Qt, TBB, etc...

If you were to use TBB then consider

[cpp]#define nUncompressionThreads 2

packe_t* packets[nUncompressionThreads];
for(int i=0; i < nUncompressionThreads; ++i)
   packets[nUncompressionThreads] = NULL;
parallel_invoke(
  [&]()
  {
    for(int i=0; i < nUncompressionThreads; ++i)
      packets = getPacket(i);
    unPack(packets[nUncompressionThreads-1]);
  }
#if(nUncompressionThreads > 1)
  ,
  [&]()
  {
    while(packets[0] == 0)
      _mm_pause();
    unPack(packets[0]);
  }
#endif
#if(nUncompressionThreads > 2)
  ,
  [&]()
  {
    while(packets[1] == 0)
      _mm_pause();
    unPack(packets[1]);
  }
#endif
#if(nUncompressionThreads > 3)
  ,
  [&]()
  {
    while(packets[2] == 0)
      _mm_pause();
    unPack(packets[2]);
  }
#endif
#if(nUncompressionThreads > 4)
#error "Add more code here"
#endif
); // parallel_invoke [/cpp]
>>BTW, out of curiosity, what is the rationale for wanting to set
packets[0] last? I don't see any harm in having worker i>0 start
unpacking before worker #0

It made the reading of the pseudo (sketch) code easier.
The above code has the reader thread unpacking the last buffer...
... and the 2nd thread reading the 1st buffer, 3rd thread 2md buffer...

Also note the parallel_invoke, when knowing a fixed number of uncompression threads in advance, will provide for the least overhead (lowest latency).

You might want to experiment as to which thread in the invoke list that performs the reads. I am not sure as to which invoke list "task" is (attempted to be) picked first by the main thread. It would make sense that either the 1st or last "task" is run directly by the parallel_invoke-ing thread (thus avoiding one scheduling enque/deque). I do this with my QuickThread parallel_invoke, TBB should do it too (easy performance tweek).

For TBB

#define

nUncompressionThreads 2


should probably be better suited with

#define nUncompressionTasks 2


Then set the number of "tasks" to the number of compressed blobs you use for a data item. This task list can exceed the number of available threads (and/or expand to use more threads on higher core count processors).

Jim Dempsey
0 Kudos
janwassenberg
Beginner
536 Views
Delayed update:The pipeline approach turned out to be elegant (not too much boilerplate) and works well.
Thanks again for that suggestion!

0 Kudos
Reply