Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

possible pipeline use case

tettema
Beginner
451 Views
I'm considering tbb::pipeline for a particular problem, though I'm not sure it's the right way to go (just a parallel_do might be better?)

I will receive buffers of input serially, then process A "filters" it into a buffer of data type A, then buffer B "filters" that data into a buffer of data type B, and so on, and then it is finally consumed. I'm not sure if this matches the pipeline model, since the examples I read have the filters passing around pointers to the same types of objects. In my case, each filter would convert the input buffer to an output buffer of entirely new data and data types. I've definitely found parallelization within each of these filters, but now i'm looking to tie up the whole process into something like a pipeline to kick off parallelization at the highest levels. Any ideas? Thanks!
0 Kudos
8 Replies
RafSchietekat
Valued Contributor III
451 Views

TBB filters are not aware of types, they only know about void* pointers, and execution may return a different value than the input value. You could rely on the scalable allocator's good thread-local behaviour to allocate new objectsas return values(but serial filters may sabotage that to some degree), or allocate all of them at once next to each other in a struct or so (less work for the allocator, but less "pure" perhaps because a filter has to know about input and output fields), and see which performs best (then tell us about your findings).

0 Kudos
tettema
Beginner
451 Views
Quoting - Raf Schietekat

TBB filters are not aware of types, they only know about void* pointers, and execution may return a different value than the input value. You could rely on the scalable allocator's good thread-local behaviour to allocate new objectsas return values(but serial filters may sabotage that to some degree), or allocate all of them at once next to each other in a struct or so (less work for the allocator, but less "pure" perhaps because a filter has to know about input and output fields), and see which performs best (then tell us about your findings).


Your second suggestion was my instinct, since it would seem to be faster, and I'm avoiding dynamic allocation for this project. So then is the idea to create a buffer of a giant data structure (large enough to support the in-flight number for pipeline) that has as its fields the buffers of the various types for each filter to store its results in?

Part of the problem is that filter A on each item in buffer A might yield 2 items in buffer B for filter B to process, and I want the two items in buffer B to be processed concurrently by B filter. How can pipeline support this if each filter can only pass a single void*? It seems each filter would have to have the parallelism contained within it to support this, and the pipeline itself wouldn't process the previous filter's output items in parallel using simply the "parallel" argument to the filter's constructor.

Also, I want to make sure I'm still realizing some benefit to pipelines. If I do the above, what benefit do I get over doing a parallel_do around a method that receives the next input and does all its processing until completion, putting the results in a concurrent_queue?
0 Kudos
ARCH_R_Intel
Employee
451 Views
Quoting - tettema

Your second suggestion was my instinct, since it would seem to be faster, and I'm avoiding dynamic allocation for this project. So then is the idea to create a buffer of a giant data structure (large enough to support the in-flight number for pipeline) that has as its fields the buffers of the various types for each filter to store its results in?

Part of the problem is that filter A on each item in buffer A might yield 2 items in buffer B for filter B to process, and I want the two items in buffer B to be processed concurrently by B filter. How can pipeline support this if each filter can only pass a single void*? It seems each filter would have to have the parallelism contained within it to support this, and the pipeline itself wouldn't process the previous filter's output items in parallel using simply the "parallel" argument to the filter's constructor.

Also, I want to make sure I'm still realizing some benefit to pipelines. If I do the above, what benefit do I get over doing a parallel_do around a method that receives the next input and does all its processing until completion, putting the results in a concurrent_queue?

Yes, creating a structure large enough to support the in-flight number will work. If the first pipline stage allocates the items, and the last stage deallocates them, and both states are serial, then simple round-robin allocation works, because by the time the round-robin has to recycle an item, the last stage will have freed it from its previous use.

Do the items have to be processed in order? The pipline template scales only if there is at least one parallel filter in the pipeline. If all filters are serial, then a parallel_do will likely work better.

The current pipeline template is limited to one-to-one mappings in each filter. So yes, you would need to put parallelism within a stage to process two items in it. E.g., filter A could return a pointer to an array with the two items, and filter B could use a parallel_do or parallel_for to process those items.
0 Kudos
tettema
Beginner
451 Views

Yes, creating a structure large enough to support the in-flight number will work. If the first pipline stage allocates the items, and the last stage deallocates them, and both states are serial, then simple round-robin allocation works, because by the time the round-robin has to recycle an item, the last stage will have freed it from its previous use.

Do the items have to be processed in order? The pipline template scales only if there is at least one parallel filter in the pipeline. If all filters are serial, then a parallel_do will likely work better.

The current pipeline template is limited to one-to-one mappings in each filter. So yes, you would need to put parallelism within a stage to process two items in it. E.g., filter A could return a pointer to an array with the two items, and filter B could use a parallel_do or parallel_for to process those items.

I believe that all stages after the first one can be parallel, so it sounds like pipeline works.

Implementation question -- what do you think is better:

A) A buffer of large objects, where each object supports all the data in/out of each filter for a single pass through the pipeline

B) Separate buffers for each type of object, so each filter writes a range of objects to its respective buffer. If this option, any ideas on best implementation? Can't be a queue obviously, since pipeline stages will advance independently. Seems to require a specific pointer to memory with an object count.

I just what to make sure I assist the pipeline as much as possible to "move the workers through the data," among its other behind-the-scenes optimizations.

Thanks!
0 Kudos
ARCH_R_Intel
Employee
451 Views
Quoting - tettema

I believe that all stages after the first one can be parallel, so it sounds like pipeline works.

Implementation question -- what do you think is better:

A) A buffer of large objects, where each object supports all the data in/out of each filter for a single pass through the pipeline

B) Separate buffers for each type of object, so each filter writes a range of objects to its respective buffer. If this option, any ideas on best implementation? Can't be a queue obviously, since pipeline stages will advance independently. Seems to require a specific pointer to memory with an object count.

I just what to make sure I assist the pipeline as much as possible to "move the workers through the data," among its other behind-the-scenes optimizations.

Thanks!

I don't have all the specifics, but I'd generally be inclined towards A. It's makes book keeping simple and would seem to get the cache affinity right.

Note that if two or more consecutive stages are parallel, the pipeline template effectively merges the stages. That is, when each stage in the sequence completes operating on an item, the return value is immediately fed into the subsequent stage. So there's no performance gain to be had by breaking up a parallel stage into consecutive parallel stages, albeit there may be significant advantages in terms of logical code structure. So one option to consider (if performance is the only issue) is a three-stage pipeline of the form serial-parallel-serial. The first stage would read an item and allocate a buffer from a pool. The second stage would do the work. The last stage would recycle the buffer.

If the first stage is doing round-robin allocation, the last stage's operator()can even be a noop. It just exists to synchronize recycling. I've attached an example to show this somewhat surprising result.
0 Kudos
RafSchietekat
Valued Contributor III
451 Views
#2 "the idea to create a buffer of a giant data structure"
As data unit size increases, relative allocation overhead decreases and idle-memory overhead increases (got RAM?), so the idea of one "giant" long-lived buffer quickly becomes far less appealing.

#2 "filter A on each item in buffer A might yield 2 items in buffer B for filter B to process"
You can easily simulate multiplication inside the input filter by internally storing one of the results: if the store is non-empty, its contents are returned, otherwise, the input is read, one of the results is stored and the other is returned. Simulating multiplication later on in the pipeline would be more cumbersome, perhaps based on inserted ghost work in earlier filters, perhaps on passing an array to a later filter (but see below). Maybe it is suboptimal if one of the results might be created earlier than the other one, because the input filter won't return before the second result has been created, but at least this should get you started.

#2 "It seems each filter would have to have the parallelism contained within it to support this, and the pipeline itself wouldn't process the previous filter's output items in parallel using simply the "parallel" argument to the filter's constructor."
You can nest potential parallelism, but doing that instead of my suggestion above would create many more gates/barriers, which may adversely affect performance, although I don't know to what degree.

#2 "If I do the above, what benefit do I get over doing a parallel_do around a method that receives the next input and does all its processing until completion, putting the results in a concurrent_queue?"
How exactly do you mean that?

#3 "Yes, creating a structure large enough to support the in-flight number will work."
I was only thinking of allocating in the input filter and deallocating in the output filter, which would decrease allocation overhead by a factor proportionate to pipeline length. Managing pointers in a ring buffer for further reuse may be beneficial, but requires a decision about the size of the ring buffer. If all memory is allocated at once instead of on demand, that decision becomes crucial, which makes me doubt such a solution. But I'm always happy to revise my assumptions if it means learning something.

#3 "Do the items have to be processed in order? The pipline template scales only if there is at least one parallel filter in the pipeline. If all filters are serial, then a parallel_do will likely work better."
If all filters are serial, how would a parallel_do, which always attempts parallel operation, be applicable?

#4 "what do you think is better"
With option B you may incur padding overhead to avoid false sharing. Otherwise I don't see much of a difference relevant for performance, assuming it's not better to allocate objects only on demand.

#5 "It's makes book keeping simple and would seem to get the cache affinity right."
Can you explain how this works well for the cache if the object is large compared to cache line size? How do you know that the next use will be on the same thread?

#5 "If the first stage is doing round-robin allocation, the last stage's operator() can even be a noop. It just exists to synchronize recycling. I've attached an example to show this somewhat surprising result."
Presumably because serial, which is deprecated, is understood to be serial_in_order. I don't think that serial_out_of_order would work?
0 Kudos
tettema
Beginner
451 Views
Quoting - Raf Schietekat
#2 "the idea to create a buffer of a giant data structure"
As data unit size increases, relative allocation overhead decreases and idle-memory overhead increases (got RAM?), so the idea of one "giant" long-lived buffer quickly becomes far less appealing.

#2 "filter A on each item in buffer A might yield 2 items in buffer B for filter B to process"
You can easily simulate multiplication inside the input filter by internally storing one of the results: if the store is non-empty, its contents are returned, otherwise, the input is read, one of the results is stored and the other is returned. Simulating multiplication later on in the pipeline would be more cumbersome, perhaps based on inserted ghost work in earlier filters, perhaps on passing an array to a later filter (but see below). Maybe it is suboptimal if one of the results might be created earlier than the other one, because the input filter won't return before the second result has been created, but at least this should get you started.

#2 "It seems each filter would have to have the parallelism contained within it to support this, and the pipeline itself wouldn't process the previous filter's output items in parallel using simply the "parallel" argument to the filter's constructor."
You can nest potential parallelism, but doing that instead of my suggestion above would create many more gates/barriers, which may adversely affect performance, although I don't know to what degree.

#2 "If I do the above, what benefit do I get over doing a parallel_do around a method that receives the next input and does all its processing until completion, putting the results in a concurrent_queue?"
How exactly do you mean that?

#3 "Yes, creating a structure large enough to support the in-flight number will work."
I was only thinking of allocating in the input filter and deallocating in the output filter, which would decrease allocation overhead by a factor proportionate to pipeline length. Managing pointers in a ring buffer for further reuse may be beneficial, but requires a decision about the size of the ring buffer. If all memory is allocated at once instead of on demand, that decision becomes crucial, which makes me doubt such a solution. But I'm always happy to revise my assumptions if it means learning something.

#3 "Do the items have to be processed in order? The pipline template scales only if there is at least one parallel filter in the pipeline. If all filters are serial, then a parallel_do will likely work better."
If all filters are serial, how would a parallel_do, which always attempts parallel operation, be applicable?

#4 "what do you think is better"
With option B you may incur padding overhead to avoid false sharing. Otherwise I don't see much of a difference relevant for performance, assuming it's not better to allocate objects only on demand.

#5 "It's makes book keeping simple and would seem to get the cache affinity right."
Can you explain how this works well for the cache if the object is large compared to cache line size? How do you know that the next use will be on the same thread?

#5 "If the first stage is doing round-robin allocation, the last stage's operator() can even be a noop. It just exists to synchronize recycling. I've attached an example to show this somewhat surprising result."
Presumably because serial, which is deprecated, is understood to be serial_in_order. I don't think that serial_out_of_order would work?

Thanks for the suggestions. To help out, a picture's worth a thousand words, so I illustrated my process, along with approximate sizes of each object. It is essentially a sensor, with serial reads of input (A) from a sensor that outputs batches of sensor data, which can each be processed in parallel. All the translations for a given buffer (B, C, D) happen serially with respect to the given buffer, but can be in parallel with respect to other input buffers. The very last stage correlates results (D) with existing data objects (E) and updates them based on the latest sensor results. This needs to happen in the same order that the input was received.

Based on this diagram, I would guess a pipeline structure like the following would work:

input stage is serial_in_order, reads buffer, produces object Bs
processing stage is parallel, reads object B's, processes all the way up to Object D's
output stage is serial_in_order, reads Object D's to do the correlation/update to Object Es, then returns NULL

Meanwhile, each stage operates on a single object that stores As, Bs, Cs, and Ds for a given input package that is created or recycled by the input stage.

Sounds reasonable?
0 Kudos
RafSchietekat
Valued Contributor III
451 Views
"The very last stage correlates results (D) with existing data objects (E) and updates them based on the latest sensor results. This needs to happen in the same order that the input was received."
This part still seems somewhat mysterious.

"Sounds reasonable?"
Good enough to give it a try, I suppose.
0 Kudos
Reply