Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Design pattern for ordered traversal

dotcsw
Beginner
553 Views

I'm new to Threading Building Blocks and need a design pattern to parallelize an existing divide and conquer loop. Superficially, it works like this:

foreach element in container
    if element can be pruned
        continue
    else if element is divisible
        divide element into 4 smaller elements
    else
        conquer element

So far, no problem for TBB. Just use a parallel_for over a std::vector and a tbb::parallel_invoke (or something) for the subdivision.

Now for the twist. Note that conquer is an expensive step (in terms of CPU and memory) and pruning elements is a huge win. Whether an element can be pruned or not depends upon the other elements that have already been conquered. We can increase the chances of pruning by processing elements in a sorted order. This is one of those algorithmic optimizations that outweighs the benefits of parallelization.

My first hunch was parallel_sort followed by parallel_for. This is exactly wrong because the range will be recursively split so that the last thread is working on the lowest priority end of the vector.

My other notion was a parallel_do over a concurrent_priority_queue, with the subdivide step adding smaller elements back into the queue. However, parallel_do requires begin and end iterators. concurrent_priority_queue only provides try_pop.

How do you process elements in a prioritized order with TBB? I understand that only a serial algorithm will guarantee maximum pruning by conquering the first element before even considering the second, but there must be a way to focus n threads on the n highest priority elements.

0 Kudos
1 Solution
RafSchietekat
Valued Contributor III
553 Views

I'm not sure that parallel_for() could be the best choice here, even before the sorting requirements come into play. It probably depends on how large the initial container is compared to the total number of elements after division is taken into account, and unless most elements are in the initial container you may well be better of with parallel_do(), which should have fewer wait/steal moments (right?).

I presume that the new elements provided to a parallel_do() feeder have higher priority, although I'm not sure about the details between generations of feeder-provided elements: is that enough for this purpose or do you require global priority to be preserved?

If it has to be global, use a concurrent_priority_queue with a custom iterator (or just the (deprecated) parallel_while() to make this a little easier), but do note that this will not allow for the same level of parallelism as you can expect from parallel_for() or parallel_do/while() with copious use of the feeder because you're obtaining new elements though an essentially sequential straw. So you have to deliberate which will be the less expensive: conquering with only partial ordering (parallel_do() with feeder), or near-perfectly ordered conquering through a straw. The latter may be the winner if individual elements have enough work in them for a task, but otherwise the former looks more attractive.

For the iterator, use a default-constructed instance to represent the end, and implement operator!= to only allow comparison with the end, by consulting concurrent_priority_queue::empty(): you may get false positives, for which the workaround would be to wrap the algorithm in a serial loop (do report here if you can confirm that), but you shouldn't get false negatives, because there's only one consumer (could somebody confirm that intuition?). If you wish, I could provide some skeleton code for that, but it sounds scarier than it really is.

(Added) Also don't neglect other solutions that might be cheaper than concurrent_priority_queue: if you can derive information from the current generation, you can throw newly generated elements in the right bin without incurring the overhead of a priority queue.

View solution in original post

0 Kudos
11 Replies
RafSchietekat
Valued Contributor III
554 Views

I'm not sure that parallel_for() could be the best choice here, even before the sorting requirements come into play. It probably depends on how large the initial container is compared to the total number of elements after division is taken into account, and unless most elements are in the initial container you may well be better of with parallel_do(), which should have fewer wait/steal moments (right?).

I presume that the new elements provided to a parallel_do() feeder have higher priority, although I'm not sure about the details between generations of feeder-provided elements: is that enough for this purpose or do you require global priority to be preserved?

If it has to be global, use a concurrent_priority_queue with a custom iterator (or just the (deprecated) parallel_while() to make this a little easier), but do note that this will not allow for the same level of parallelism as you can expect from parallel_for() or parallel_do/while() with copious use of the feeder because you're obtaining new elements though an essentially sequential straw. So you have to deliberate which will be the less expensive: conquering with only partial ordering (parallel_do() with feeder), or near-perfectly ordered conquering through a straw. The latter may be the winner if individual elements have enough work in them for a task, but otherwise the former looks more attractive.

For the iterator, use a default-constructed instance to represent the end, and implement operator!= to only allow comparison with the end, by consulting concurrent_priority_queue::empty(): you may get false positives, for which the workaround would be to wrap the algorithm in a serial loop (do report here if you can confirm that), but you shouldn't get false negatives, because there's only one consumer (could somebody confirm that intuition?). If you wish, I could provide some skeleton code for that, but it sounds scarier than it really is.

(Added) Also don't neglect other solutions that might be cheaper than concurrent_priority_queue: if you can derive information from the current generation, you can throw newly generated elements in the right bin without incurring the overhead of a priority queue.

0 Kudos
dotcsw
Beginner
553 Views

Thanks for the detailed response, Raf. Off the top of my head, I'd estimate dozens of elements in the initial container and perhaps a thousand if all were fully subdivided (and not conquered). I would have to benchmark some actual data sets to get more accurate numbers. Like you say, it's a rather small input for parallel_for given that pruning and subdividing are easy tasks.

I presume that the new elements provided to a parallel_do() feeder have higher priority

No. Typically an element is split into four smaller elements, one of which has the same priority as the parent and the other three having lower priorities.

Another design pattern came to me today. The conquer step is big and naturally modeled as a pipeline. I was planning to use a TBB pipeline there anyway. This subdivide loop gets the whole thing started and is my first opportunity to employ parallelism. What if I make it the first stage, the "producer", of the conquer pipeline? It would look like just like a serial loop, popping elements from a concurrent_priority_queue and inserting subdivided elements back into it, except it could be tagged as a filter::parallel. As one of the simplest filters in the pipeline, chances are that only one thread would be assigned to it.

Does that sound like a good design?

0 Kudos
RafSchietekat
Valued Contributor III
553 Views

dotcsw wrote:

I presume that the new elements provided to a parallel_do() feeder have higher priority

No. Typically an element is split into four smaller elements, one of which has the same priority as the parent and the other three having lower priorities.

OK, useful detail… I'll assume that you also cannot put the generated elements in indexed bins (take element from highest bin with index i, possibly put 3 elements in bin i-x for some x, possibly recurse on remaining element)?

dotcsw wrote:

It would look like just like a serial loop, popping elements from a concurrent_priority_queue and inserting subdivided elements back into it, except it could be tagged as a filter::parallel. As one of the simplest filters in the pipeline, chances are that only one thread would be assigned to it.

If the conquer is already like a pipeline, it does make sense to use that fact. With a pipeline you don't have the hypothetical false negative on emptiness not because the queue is accessed sequentially by parallel_do() (if that is enough) but instead because you use try_pop() directly (the issue doesn't arise); however, do also wrap this in a serial loop to work around a hypothetical false positive or the real race between getting the last element out and putting some elements back in. If you make the input stage parallel, you may get multiple threads accessing the queue in parallel, but concurrent_priority_queue can handle that. Note that there's no one-to-one correspondence between threads and stages: instead, a task/thread takes an item across multiple parallel stages, until it has to leave the item behind in a sequential queue for some other task/thread to pick it up at the appropriate time (a task is always executed on a single thread).

BTW, perhaps the documentation could fill in some details about the possibility or not of false negative/positive for empty(), for different cardinalities of producer and consumer, or be more explicit about "due to pending concurrent push or pop operations" which seems to imply that with a single consumer you cannot have a false negative.

0 Kudos
jimdempseyatthecove
Honored Contributor III
553 Views

>> Whether an element can be pruned or not depends upon the other elements that have already been conquered.

If this means that some prior (currently) conquered element can affect the prospects of pruning some future element to be considered, then I suggest you explore:

Make the outer level a parallel pipeline with the number of tokens equal to some number, to be determined, but less than, the number of logical processors. What this will permit then is you will have at most, the number of pipeline tokens number of logical processors performing a conquer. The divide won't use parallel_invoke if it can place the sub-elements into a queue for processing by the outer-level parallel pipeline. IOW the outer-level parallel pipeline is fed by original number of elements plus the split elements. Your input filter could choose which feeder (unprocessed elements or split elements) to used based on a selection criteria you devise based on test run observations.

The tuning requirements then reduce to fining the optimal number of tokens and the input filter selection algorithm. You might find single threaded conquer tasks (# tokens run in parallel) might run well, at least while your input filter has more content.

Jim Dempsey

0 Kudos
dotcsw
Beginner
553 Views

Thanks for your help, guys. I have enough information to implement the split stage now. Where I really should be focusing my effort is pipelining the conquer part as that is the bottleneck and can really benefit from parallelism.

It turns out that I had some misconceptions about TBB pipeline. I intend to start another topic on this forum about that, since it's a separate subject.

0 Kudos
jimdempseyatthecove
Honored Contributor III
553 Views

If the number of conquers is large relative to the number of threads, then excepting for last level cache locality issues, having thread number of concurrent conquers (each non-pipelined, non-parallel_...), would likely provide the best performance. IOW, the further out you move the parallelization, (generally) the better the performance. "parallel outer - vector inner".

Should, or when, the number of conquers be on the order of, or less than, the number of logical processors (threads), then consider moving (enabling) parallelization to inner levels.

Jim Dempsey

0 Kudos
dotcsw
Beginner
553 Views

jimdempseyatthecove wrote:

If the number of conquers is large relative to the number of threads, then having thread number of concurrent conquers (each non-pipelined, non-parallel_...), would likely provide the best performance.

That's a good point. Most data sets that are worth parallelizing will have more conquers than threads. There are some simple use cases that have 1 to 4 conquers but those already run fast in the serial algorithm. I'm actually more concerned about scaling up to 64 cores and beyond in the future. A data set with 32 conquers will make good use of 8 threads but will be undersubscribed on next generation architectures.

The more I look at tbb::pipeline, the less I think it applies to my algorithm. Pipeline is good when you have serial stages and/or stalls due to I/O or something. My algorithm looks like a pipeline but it's CPU bound and all of the stages are parallel. There's an abundance of parallelism and few synchronization constraints. If I were to use tbb::pipeline, a single thread would follow a token all the way through the pipe (due to the depth-first nature of the scheduler). The thread would basically hand the data off to itself - all the overhead of task scheduling with none of the benefit.

So I think you're right, Jim. Concurrent conquers, each non-pipelined is the way to go. As it turns out, the bottleneck stage is naturally vectorizable.

Now I'm thinking about ditching the TBB algorithms (i.e. parallel_for and pipeline) and just spawning tasks manually. Something like this:

foreach element in container
    if element can be pruned
        continue
    else if element is divisible
        divide element into 4 smaller elements
    else
        increment_ref_count
        spawn(conquer task)
wait_for_all

Is this a correct usage of increment_ref_count? All of the examples do set_ref_count(n+1) before spawning n children and waiting for them. Since I don't know in advance how many conquers will be spawned, I'm hoping to do something on the fly like this (and perhaps one more increment_ref_count for the wait).

Raf Schietekat wrote:

For the iterator, use a default-constructed instance to represent the end, and implement operator!= to only allow comparison with the end, by consulting concurrent_priority_queue::empty(): you may get false positives, for which the workaround would be to wrap the algorithm in a serial loop (do report here if you can confirm that), but you shouldn't get false negatives, because there's only one consumer (could somebody confirm that intuition?). If you wish, I could provide some skeleton code for that, but it sounds scarier than it really is.

I would like to see that skeleton code, Raf. It would be nice to parallelize this divide loop as well. Thank you.

0 Kudos
jimdempseyatthecove
Honored Contributor III
553 Views

How about something like this:

atomic<int> ref_count;
ref_count = 0;

foreach element in container
    if element can be pruned
        continue
    else if element is divisible
        divide element into 4 smaller elements
    else
        increment_ref_count
        if(ref_count < nThreads * 2) // some threshold
            spawn(conquer task)
        else
            conquer( task ); // call directly
wait_for_all

void conquer( task ) {
    ...// do work
    --ref_count;
}

Jim Dempsey

0 Kudos
jimdempseyatthecove
Honored Contributor III
553 Views

I might also suggest that inside the do work section of conquer, that the threads monitor ref_count.

Whenever ref_count is less than nThreads then the conquer task (section) is permitted to spawn tasks (and increment/decriment ref_count in the process). This will provide better load balancing when the number of (remaining) conquer-able elements is less than the number of threads.

Jim Dempsey

0 Kudos
dotcsw
Beginner
553 Views

I thought the mindset of TBB was create more tasks than cores and let the scheduler balance the load.

Perhaps my question wasn't clear. I'm talking about tbb::task::increment_ref_count(), not a user-defined reference count. This is what I have in mind to spawn a conquer task:

class DivideTask : public task {
public:
    task *execute() {
        while (concurrentPriorityQueue.try_pop(element))
            if (element->canBeConquered()) {
                ConquerTask &c = *new(allocate_child()) ConquerTask(element);
                increment_ref_count();
                spawn(c);
            }
            else {
                // divide element and reinsert subelements into concurrentPriorityQueue
            }
        wait_for_all();
        return NULL;
    }
};

Is this use of increment_ref_count() an acceptable idiom? The TBB examples do set_ref_count(n+1) instead. Other documents say that increment_ref_count() should only be necessary when a task has more than one successor.

My second question is about running DivideTasks in parallel. There's a boundary case where the last element is popped from the priority queue and one task is in the process of dividing it. The other DivideTasks may see that the priority queue is empty and exit prematurely. That one element may generate 16 grandchildren elements after division.

Raf had some ideas about avoiding false positives/negatives on whether the priority queue is empty. It's more important to achieve parallelism than to maintain a strict priority ordering of the elements. Approximate priority order is fine.

0 Kudos
RafSchietekat
Valued Contributor III
553 Views

dotcsw wrote:

The more I look at tbb::pipeline, the less I think it applies to my algorithm. Pipeline is good when you have serial stages and/or stalls due to I/O or something. My algorithm looks like a pipeline but it's CPU bound and all of the stages are parallel. There's an abundance of parallelism and few synchronization constraints. If I were to use tbb::pipeline, a single thread would follow a token all the way through the pipe (due to the depth-first nature of the scheduler). The thread would basically hand the data off to itself - all the overhead of task scheduling with none of the benefit.

Pipeline is essential when you have serial stages, otherwise you might as well use parallel_do() and call each stage by yourself. But performance is still best with parallel stages: there is no overhead here, other than implicitly checking for cancellation (which is really cheap). And you should not block in a pipeline, either.

So even if all stages are parallel, you can use a pipeline to structure the code, automatically check for cancellation, and to be prepared for when one or more stages might become serial.

dotcsw wrote:

Is this use of increment_ref_count() an acceptable idiom? The TBB examples do set_ref_count(n+1) instead. Other documents say that increment_ref_count() should only be necessary when a task has more than one successor.

Don't forget initialising the reference count to 1 using set_ref_count(1) (to balance the wait_for_all()), and it's nicer to use allocate_additional_child_of() for the child tasks (implies increment_ref_count() and actually predates its availability).

0 Kudos
Reply