Community
cancel
Showing results for 
Search instead for 
Did you mean: 
Harjot_Gill
Beginner
66 Views

Context ordered filter in pipeline

There are several event driven applications in business/telecom/networking domain which need to order execution of events in a given context and unrelated contexts can continue execution parallel. Usually, to achieve concurrency, context identifiers (based on application) are hashed and events are posted to raw threads. This ensures that FIFO ordering is maintained for same context events. Take an example of call setup messages coming into the system. The context identifier is call ID and all messages of same call need to be executed in order. And unrelated calls can continue execution in parallel.Usually on multicore servers, throughput is achieved by virtualization but TBB can help reduce processing latency when system is not fully loaded. For example, call setup messages in SIP protocol can be chopped up and parsed in parallel. But we have to make sure, ordering of call messages is also maintained.
We found that TBB pipeline framework is well suited for our application and last week we developed a new filter mode in pipeline to support context ordered execution. In a nutshell, when a task enters our filter stage, we first call designate_context() method to find out the effected contexts. The user defines this method in the filter class along with operator(). The designate_context() method is supposed to take a sneak peek into the given item and decide which context(s) does it belong to. Shared resources can be expressed as contexts.
The best part is, we allow user to provide us a set of multiple contexts. Thus, with this filter, atomic operations on a given set of contexts can be performed. This paradigm can be used elsewhere too, as it provides abstraction over locks and lets programmer choose granularity of shared resources without worrying about deadlocks. For example in a proxy server, we might want to perform atomic operations on two half call blocks. In database domain, user can provide a set of keys, she wants to operate on atomically and so on.
We might put in effort to enhance it further to support priority execution in context filters.
We will be glad to share our code. Do let us know your thoughts.
0 Kudos
12 Replies
ARCH_R_Intel
Employee
66 Views

Your design is intriguing generalization of a pipline stage. If I understand your design correctly, a parallel stage can be viewed as "each item has a unique context" and a serial_in_order stage can be viewed as "each item has the same context". Your extension allows intermediate degrees of parallelism that map neatly to resource sharing.

The implementation would seem to be a variation on "class input_buffer" in pipeline.cpp, with some hashing involved, right?

It would be good if you could share your code. The contribution process is described here.

Harjot_Gill
Beginner
66 Views

In our demo app, we are using these stages:
1. Serial In Order Input stage
2. Context Filter for parallel operations
3. Another Context Filter just for logging (can be skipped)
Input stage is same as in conventional pipeline. However, our filter is actually 2 in 1 stage. Serially, it calls designate_context() method. After this, it puts items in buffers based on context. We are redistributing tokens based on context and maintaining separate buffers. These buffers are cleared when the last task of a given context exits the pipeline. Flow control in pipeline is still controlled by input stage as before.
Atomic operations are implemented by spawning tasks only when given contexts are ready.
I was unable to access the link you shared. The page does not exist.
Dmitry_Vyukov
Valued Contributor I
66 Views

> Atomic operations are implemented by spawning tasks only when given contexts are ready.

May you elaborate a bit more here?
If a task is spawned when contexts are ready, it does not mean that the task will be executed when contexts are ready. Consider, the contexts are ready, you spawn 2 independent tasks, when tasks executed they clash.
When contexts became non-ready?
Dmitry_Vyukov
Valued Contributor I
66 Views

How do you work with states of a set of contexts?
Do you have a centralized container of contexts protected by a mutex, then for each token lock the mutex, observe states of interesting contexts, unlock the mutex, spawn a task if necessary?
Andrey_Marochko
New Contributor III
66 Views

Harjot_Gill
Beginner
66 Views

We made modifications mainly in the mechanism tokens are put and tasks are spawned in note_done.
For atomic operations, the task is not spawned until all input_buffers of given set of contexts are ready. When we get such a task, we just put it into input_buffer(s) of given contexts. The design is pretty simple. Tasks now maintain a ready count (atomic). In conventional pipeline, task is spawned by previous task exiting a stage. We spawn only when all buffers are ready i.e. the last context which makes the ready count as 0, executes. When atomic task is done, it is now responsible for spawning next tasks in given input_buffers.
Harjot_Gill
Beginner
66 Views

Just like a input_buffer in conventional pipeline, we now maintain a table of input_buffers (dynamically created as of now based on running contexts) for each stage in pipeline.
There is another table to dispense tokens based on context, which is shared in a pipeline.
These two tables are accessed serially (when getting a token and putting a task into buffer) because before this stage, our filter has serial designate_context() stage.
The only points of contention are when tasks exit a stage or exit the pipeline. When exiting a stage, while next task is being spawned, the context input_buffer is locked. When exiting pipeline, global entry for that context in token dispenser is locked so that contexts can be cleared in context filter stages.
Harjot_Gill
Beginner
66 Views

Code submitted. Thanks.
ozang
Beginner
66 Views

Hello Harjot,

First of all thank you for sharing contex ordered filter approach. I will be very glad to see your demo. I checked the contribution archive but could not see your project:
http://www.threadingbuildingblocks.org/file.php?fid=86
Is it located in somewhere else?
best regards,
ozan
Alexey_K_Intel3
Employee
66 Views

Contributions don't get into the archive automatically, because of some due diligence which we should do to accept the contribution. This one was accepted, and since you asked for it, I will push for it being published soon. Please stay tuned.

Added: By the way we will not publish the demo app which for a reason was excluded from the contribution. Please ask the author for itssource.
Alexey_K_Intel3
Employee
66 Views

I added the contribution to the archive; you can find it there, or go directly to http://www.threadingbuildingblocks.org/ver.php?fid=156
ozang
Beginner
66 Views

Thank you .