Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Pipelines and instrumentation

gnl29
Beginner
862 Views
Hello everyone,

I have a classical TBB pipeline that will process various input streams in parallel and saving them to memory. So let's say the pipeline is simply :

input filter ----> parallel process filter ----> output filter
(f0) (f1) (f2)

The input filter will create tokens that are aligned on (for instance) a newline character. Each token will contain a list of lines, and each of these lines will be parsed by f1. Sometimes, the lines are discarded in such a way that the output filter must not consider them. That pattern is implemented and works well.
What we want to achieve is a bit of instrumentation that would allow to be sure that N valid lines have been processed by f1 (and thus f2). If this is not the case, it will ask the input filter to create new tokens and so on, until N has been reached (or the input filter has no more data to process).
I'd like to know if such pattern can easily be done with what currently exists in TBB. The different ways I am considering are :
  • a thread that will launch the pipeline continually by asking to f0 to create N0 lines each time. It will read a counter incremented by f2 and stops the pipeline when N is reached (by using a user-supplied task_group_context to parallel_pipeline::run)
  • my understanding is that a pipeline can be considered as a TBB task, so maybe we can play with the task scheduler to implement this (by creating "pipeline task" on purpose)

Thanks for any help !
0 Kudos
3 Replies
jimdempseyatthecove
Honored Contributor III
862 Views
f0:
collects/produces 0 tox lines of data. 0 lines are produced after input "eof" (no more data)
Whenoutput filter (f2) completesN lines of data a shared flag set to indicate input should shut down. (pipeline complete)

f1:
receives token with 0to xlines, processes lines, with potential for removal of lines, outputting 0 to y number of lines. f1 may test output filter done indicator to shunt/eliminate processing of additional lines.

f2:
accepts token with 0 toy number of processed lines, outputs upto N number oftotal lines, sets shared done indicator after N'th output

If you observe a done flag as opposed to an output record count you will encounter fewer cache line misses.

Jim Dempsey
0 Kudos
gnl29
Beginner
862 Views
Hi Jim,

Thanks for your answer.

I think this is a reasonnable approach. The other thing is that i'd like to be able to stop the pipeline whenever "I like", but this is another problem (and IIRC the answer is already somewhere in this forum).

Have a nice day !
0 Kudos
jimdempseyatthecove
Honored Contributor III
862 Views
You can set the "done" flag anytime anywhere you like. The pipeline won't stop immediate as the tokens need to percolate around back to the input stage. However, you can shortent the latency by inserting bail-out tests in your processing code (at the expense of the overhead of the test).

Jim Dempsey
0 Kudos
Reply