Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Pipeline items in flight

Vivek_Rajagopalan
530 Views
So I have a vanilla pipeline organized as follows :

f1 (serial) ----> f2 (parallel) --> f3(parallel) .. etc .. -> fn(serial output)

This usually hums along, but once in a while the input filter f1 needs to do some work that require all other filters to be idle. Is there any easy way for f1 to check if there are any 'tokens' still in flight ?

At present I am shutting down the pipeline when f1 needs to do the work that required the pipeline to be clear. This is not working out too good due to work involved in restarting the pipeline.

Another minor issue : The online documentation (doxygen) does not seem to be updated for 2.2.

Thanks,

Vivek


----
Some background :

1. The application is network packet processing
2. All filters co-operate to update a set of datastructures
3. These data structures balloon up after a while (depends on current data rate and variety)
4. When they get big, the data structure needs to be flushed - this requires all filters to be quiet until flushing is complete
5. The flush involves pruning the data structure of least recent items
6. The pruned items are backed up to a sqlite database
7. I want to avoid synchronization primitives as much as possible (I guess this is the TBB way ! )
0 Kudos
9 Replies
Vivek_Rajagopalan
530 Views
So I have a vanilla pipeline organized as follows :

f1 (serial) ----> f2 (parallel) --> f3(parallel) .. etc .. -> fn(serial output)

This usually hums along, but once in a while the input filter f1 needs to do some work that require all other filters to be idle. Is there any easy way for f1 to check if there are any 'tokens' still in flight ?


I think I solved this one, it was embarassingly obvious !


The serial input filter stamps each token with an ID, the output filter (also serial) records the last processed token ID. When the flushing needs to happen, the input token checks the ID of the "please flush token" with the output filter token. It then spins until the difference is 1.

I dont think this would work well with parallel input / output filters because there would be contention to update the last seen token ID.

Thanks,
0 Kudos
Alexey-Kukanov
Employee
530 Views
Indeeda good solution.
If your app is intended to be long-running, take care of ID wraparound; once the source of IDs achieves maximal value possible for its underlying integer type, the next "increment" will turn it to the minimal value (0 for unsigned types, -2^31 for signed 32-bit integer) and the comparison should take that into account.
0 Kudos
jimdempseyatthecove
Honored Contributor III
530 Views

I think I solved this one, it was embarassingly obvious !


The serial input filter stamps each token with an ID, the output filter (also serial) records the last processed token ID. When the flushing needs to happen, the input token checks the ID of the "please flush token" with the output filter token. It then spins until the difference is 1.

I dont think this would work well with parallel input / output filters because there would be contention to update the last seen token ID.

Thanks,

The output filter may receive tokens out of order and therefor you may see your difference of 1 while one or more tokens are in flight. (assuming your output filter is permitted to write out of order) It might be better to use sequence numbers generated independent of the IDs. When the two sequence numbers are equal then it is safe to assume the pipeline is empty. (e.g. input recrords read and output records written) Now this does not account for records rejected should that be a feature of your pipeline. Then you could use records read == (records written + records rejected -records injected) something like that.

Your output filter could check for out-of-order token IDs and then buffer the out of order tokens. Then upon receipt of additional tokens check for next in sequence to output, if not, then add to buffer of out of order tokens, if so then output, and check buffer of out-of-order tokens for additional token to output. BTW this is how QuickThread performs its pipeline (when you specify that you want to maintain buffersequence).

Jim Dempsey
0 Kudos
Vivek_Rajagopalan
530 Views


Your output filter could check for out-of-order token IDs and then buffer the out of order tokens. Then upon receipt of additional tokens check for next in sequence to output, if not, then add to buffer of out of order tokens, if so then output, and check buffer of out-of-order tokens for additional token to output. BTW this is how QuickThread performs its pipeline (when you specify that you want to maintain buffersequence).

Jim Dempsey

I thought tbb takes care of this if the output filter is declared to be serial. I vaguely remember reading about it in the Reinders book. If not, I have to do the sequencing as you have described. Funny thing is I added the serial output filter only to have tbb handle the sequencing. The filter does not actually do any other real work.

So far it seems to work on my test rig, which is a lowly dual core. If I have to do the sequencing, I might actually think about shutting down the pipeline, doing the maintenance work, and restarting it. The problem I found with restarting is the supporting application machinery takes some time to get up. Also I do not know how heavy is a tbb pipeline object.

Thanks,


0 Kudos
robert-reed
Valued Contributor II
530 Views
I thought tbb takes care of this if the output filter is declared to be serial. I vaguely remember reading about it in the Reinders book. If not, I have to do the sequencing as you have described. Funny thing is I added the serial output filter only to have tbb handle the sequencing. The filter does not actually do any other real work.

TBB does resequence parallel tokens upon arrival at a serial filter stage, an advertised feature since the first implementations of pipeline that I'm aware of. I think your code should be fine.

0 Kudos
RafSchietekat
Valued Contributor III
530 Views
TBB does resequence parallel tokens upon arrival at a serial filter stage, an advertised feature since the first implementations of pipeline that I'm aware of. I think your code should be fine.
The original "serial" is now deprecated in favour of the more precise new name "serial_in_order" (for now, "serial" conservatively equals "serial_in_order"), distinct from the new "serial_out_of_order". Both serialise item processingby the filter, but relative order between items is only guaranteed to be preserved across "serial_in_order" filters, while "serial_out_of_order"filtersdonot need to wait for any missing intermediate items and can proceed immediately with any items that are already there.
0 Kudos
knmaheshy2k
Beginner
530 Views
Quoting - Raf Schietekat
The original "serial" is now deprecated in favour of the more precise new name "serial_in_order" (for now, "serial" conservatively equals "serial_in_order"), distinct from the new "serial_out_of_order". Both serialise item processingby the filter, but relative order between items is only guaranteed to be preserved across "serial_in_order" filters, while "serial_out_of_order"filtersdonot need to wait for any missing intermediate items and can proceed immediately with any items that are already there.

I've a simple doubt. Lets say i've 6 stage pipeline.

(initial stage) S1 - S2 - S3 - S4 - S5 - S6 (final stage)

These stages are independent of each other so can be executed in any order. There is no data dependency or race conditions that needs to be taken care. I need to execute this whole pipeline M times. The only part that has to be taken care of is, before executing the pipeline for Nth time, we should make sure N-1th pipeline execution is totally complete.

Any suggestions? Will the pipeline with serial_out_of_order option serve my purpose? Do I need to do some explicit synchronization?
0 Kudos
Alexey-Kukanov
Employee
530 Views
Quoting - knmaheshy2k

I've a simple doubt. Lets say i've 6 stage pipeline.

(initial stage) S1 - S2 - S3 - S4 - S5 - S6 (final stage)

These stages are independent of each other so can be executed in any order. There is no data dependency or race conditions that needs to be taken care. I need to execute this whole pipeline M times. The only part that has to be taken care of is, before executing the pipeline for Nth time, we should make sure N-1th pipeline execution is totally complete.

Any suggestions? Will the pipeline with serial_out_of_order option serve my purpose? Do I need to do some explicit synchronization?

I am not sure if I understand. By saying that you need to execute pipeline M times, do you mean you need to pass M data items through the pipeline, or you need to apply pipeline to M different input streams of arbitrary length?
Also, what do you mean by stages being independent? The idea of the pipeline is to process data stage by stage. In TBB, the order of stages is setup up before the pipeline starts execution, and it can't change.
0 Kudos
knmaheshy2k
Beginner
530 Views

I am not sure if I understand. By saying that you need to execute pipeline M times, do you mean you need to pass M data items through the pipeline, or you need to apply pipeline to M different input streams of arbitrary length?
Also, what do you mean by stages being independent? The idea of the pipeline is to process data stage by stage. In TBB, the order of stages is setup up before the pipeline starts execution, and it can't change.

@Alex: Thanks for replying.

I actually meant applying the pipeline to M different input streams of fixed length. I found out that I can do that by having a pipeline with 6 filters and all filters configured for serial_in_order execution I can make the program work in this way.

.....M+1 M M M M M M M-1 M-1........
......S1- S6- S5- S4- S3- S2- S1- S6- S5- ........


Is my understanding correct?

I had one more doubt. If I wanted to stop the execution of pipeline after M different tokens in the above case, I should return null in S1 when the number of tokens exceeds M. So if I do this, will the pipeline stop at once or it will complete execution of the previous tokens in flight and then stop?

Also is there any other efficient way of doing this apart from this?

Thanks in advance!
0 Kudos
Reply