Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

parallel std::partition

Burr__Adam
Beginner
1,643 Views

Does anyone have any tips for efficiently parallelizing std::partition using TBB? Has this been done already?

0 Kudos
37 Replies
Anton_M_Intel
Employee
518 Views

Adam,

parallel_invoke can be a little (very little) bit faster than task_group and structured_task_group since it knows in advance the number of tasks to be created (i.e. it's a matter of one atomic increment per task).

Continuation passing styles are certainly good approaches but if you can abstract your algorithm to fit the Range concept (a specialized view of divide-and-conquer concept) to be applied with parallel_for or parallel_reduce, it's the best way since it can benefit from partitioner algorithms and avoid clunky low-level API.

0 Kudos
Burr__Adam
Beginner
518 Views

I'm trying to use parallel_reduce with my own range class and I can get it to compile but so far I can't get it to map to my algorithm correctly. I'm using the functional form now but I could try the imperative form if that would make a difference. In the docs it shows 2 possible execution pattern examples, the first where only consecutive sub-ranges are merged, and the second where non-consecutive sub-ranges are merged. For my algorithm to work I need it to follow a consecutive-only merging pattern, but I don't see any way to force parallel_reduce to do that. Am I missing anything or should I stick to doing my own recursion?

0 Kudos
RafSchietekat
Valued Contributor III
518 Views

About non-consecutive execution: parallel_reduce() is not making an assumption of commutativity, if that's what you are thinking. Each Body keeps adding information to the right of the range that it has executed and/or joined, until it is joined by another Body and is destroyed (there is no recycling).

Moreover, an execution is equivalent to the following: for each element in the chunk, construct a Body, execute it on the element, join the Body executing the chunk with the singleton Body. The only difference is in performance, where TBB aims for fewer Body instances executing more elements.

I hope that this clears things up, but if it is not relevant I would be very interested to know how a different execution of parallel_reduce() would make a difference here.

0 Kudos
Burr__Adam
Beginner
518 Views

Yes, I can see that behavior in the execution and I understand why it would be efficient for most reductions.

The issue for my algorithm is that there are limitations to which reduced ranges I can merge. I can only merge the reduced results from two ranges if they have been split from the same "parent" range. This is because the reduction result is a special "strided" iterator, not a typical sum. I cannot merge two iterators with different parent ranges because I would get a complex stride pattern that would be more expensive to manage.

I don't see this limitation as a problem with tbb::parallel_reduce, I just don't think my algorithm maps to it well. Doing the recursion/reduction myself is not a problem, I already have that code, I was just experimenting with parallel_reduce out of curiosity because it would make my code much simpler and more compact.

A few other projects need attention so it may be a few days before I can continue working on this...

0 Kudos
RafSchietekat
Valued Contributor III
518 Views

It seems we have a use for a new parallel_reduce_by_clades() or somesuch, then...

Partitioners are nice, but they only work locally inside a limited choice of black boxes, and each such box pretends to own the world.

(Added) Try parallel_deterministic_reduce() for now. It only works with a simple_partitioner, though, so you'll be responsible for tuning. But then you might as well inherit a depth in a parallel_invoke() tree. The only thing you get out of it is that the code is ready for a drop-in replacement from a future parallel_reduce_by_clades().

0 Kudos
Burr__Adam
Beginner
518 Views

It looks like parallel_deterministic_reduce works, thanks for the tip! I will post some timing results in a few days when I wrap up some other work...

0 Kudos
RafSchietekat
Valued Contributor III
518 Views

Just to be sure you didn't miss that: you'll be responsible for tuning, while Anton's suggestion was about freeing you from it.

0 Kudos
Burr__Adam
Beginner
518 Views

yes, I think I will have to manage the tuning either way because I'm using my own range class...

0 Kudos
jimdempseyatthecove
Honored Contributor III
518 Views

Here are some rambling thoughts:

a) Use parallel_reduce (or for) to count the number of matches or mismatches. This produces a pivot point.

b) partition each section on either side of the pivot point a TBD number of chunks. The number dependent on number of items in data set and number of threads available. For large data sets it would be (my guess) significantly larger than 2x number of threads.

d) each task, in order, takes the same relative chunk from each side of the pivot point.

e) Each available thread works on the pair of chunks in a slightly different way. The partitioning only tests and exchanges between chunks and not within chunks. Note, at end of process you have 3 possible outcomes: 1) both sides fully qualify the condition, 2) left side qualifies, right side not, 3) left side not qualified, right side qualified.

f) from e) above, either 0 or one of the sides are entered into one of two re-do queues: 1) left side didn't qualify, or 2) right side didn't qualify

g) at the end of the first pass of division by b), the threads then pick (in an orderly manner) one entry from each redo queue. If queue empty, and process not done, thread yield-waits for entry in appropriate queue(s) and/or for done condition.

The advantage of this technique is there is only one exchange about a pivot point and there is no thread interlock issues (excepting for task fetching and re-do queue push/pop.

The disadvantage, is the comparison test is performed twice.

edit) note, the token placed into the redo queue can specify a begin() and end() of those records that did not qualify. 

Jim Dempsey

0 Kudos
Burr__Adam
Beginner
518 Views

interesting, thanks. what do you mean by "thread interlock"? I assume you just mean that no locks are used?

0 Kudos
jimdempseyatthecove
Honored Contributor III
518 Views

What I mean something on the order of mutex or sync_fetch_and_add

The former is a lock, the latter is not (though it contains a LOCK prefix explicit or implicit). But both have significant overhead as compared to "standard" code sequences.

Inserting data into a MPMC queue could use either, though I think the TBB concurrent queues use a sync_fetch_and_add (like) construction. This "wait-free" programming.

Jim Dempsey

0 Kudos
Burr__Adam
Beginner
518 Views

I've continued to work on this sporadically when I've had gaps in other work. I'm pretty happy with the results and I'm ready to start using it in production code. I just uploaded an image showing it's thread scaling performance while partitioning an array of 100000000 pointers with a very simple predicate (pointer deference and operator<). If the the data size, array size or predicate complexity increases then it scales even better. It easily beats GCC's "parallel mode" std::partition (__gnu_parallel::partition) which topped out at <2x speedup on the same problem with 16 cores.

0 Kudos
jimdempseyatthecove
Honored Contributor III
518 Views

Adam,

Could you post your test program? (preferably with both a simple predicate and typical predicate)

Jim Dempsey

0 Kudos
Burr__Adam
Beginner
518 Views

Here is how it scales with array size, log scale. Again, this is an array of pointers, same deference operator< predicate, on 16 cores. The shape of the curve seems strange to me, I don't understand why it plateaus at 10x.

0 Kudos
Burr__Adam
Beginner
518 Views

Sure, here is the test code for that last graph, the predicate is simple but it is representative of my intended application. Please excuse any sloppiness here, it is just throwaway test code:

#include <algorithm>
#include <array>
#include <chrono>
#include <ctime>
#include <iostream>
#include <random>
#include <vector>
#include <tbb/tbb.h>

#include <bssParallelPartition.h>

typedef std::array< float, 6 > bbox;
typedef std::vector< bbox > bbox_array;
typedef std::vector< const bbox* > bbox_pointer_array;

void generate_bboxes (bbox_array& bboxes, bbox_pointer_array& bbox_pointers,
        unsigned long long seed)
{
    const int threads = tbb::task_scheduler_init::default_num_threads();
    const unsigned streams = 4 * threads;
    std::vector< unsigned long long > seeds (streams);
    std::default_random_engine engine;
    engine.seed (seed);
    std::uniform_int_distribution< unsigned long long >
            distribution (0, std::numeric_limits< unsigned long long >::max ());
    for (unsigned i = 0; i < streams; ++i) {
        seeds = distribution (engine);
    }
    typedef tbb::blocked_range< unsigned > blocked_range;
    auto rand_items_func = [&](const blocked_range& range) {
        for (unsigned t = range.begin(); t < range.end(); ++t) {
            std::default_random_engine engine;
            std::uniform_real_distribution< float > distribution (0, 1.);
            engine.seed (seeds);
            for (std::size_t i = t; i < bboxes.size(); i += streams) {
                bboxes[0] = distribution (engine);
                bboxes[1] = distribution (engine);
                bbox_pointers = &bboxes;
            }
        }
    };
    tbb::task_scheduler_init scheduler (threads);
    tbb::parallel_for (blocked_range (0, streams), rand_items_func);
}

double time_std_partition (bbox_pointer_array bbox_pointers, std::size_t& p)
{
    const bbox** begin = bbox_pointers.begin().base();
    const bbox** end = bbox_pointers.end().base();
    auto part_func = [] (const bbox* i) {return (*i)[0] + (*i)[1] < 1.;};
    auto start_time = std::chrono::high_resolution_clock::now();
    const bbox** pivot = std::partition (begin, end, part_func);
    auto end_time = std::chrono::high_resolution_clock::now();
    p = pivot - begin;
    std::chrono::duration< double > elapsed_time = end_time - start_time;
    return elapsed_time.count();
}

double time_parallel_partition (bbox_pointer_array bbox_pointers, std::size_t p,
        unsigned grain_size, unsigned max_tasks)
{
    const bbox** begin = bbox_pointers.begin().base();
    const bbox** end = bbox_pointers.end().base();
    auto part_func = [] (const bbox* i) {return (*i)[0] + (*i)[1] < 1.;};
    auto start_time = std::chrono::high_resolution_clock::now();
    const bbox** pivot =
            bss::parallel_partition (begin, end, part_func, grain_size, max_tasks);
    auto end_time = std::chrono::high_resolution_clock::now();
    assert (begin + p == pivot);
    std::chrono::duration< double > elapsed_time = end_time - start_time;
    return elapsed_time.count();
}

int main (int, char**)
{
    unsigned long long seed = 1408737560227051;
    std::size_t max_size = 2000000000;
    std::size_t min_size = 1000;
    double size_scale = .85;
    unsigned max_threads = 32;
    unsigned min_threads = 32;
    unsigned grain_size = 10000;
    unsigned passes = 4;
    
    bbox_array bboxes (max_size);
    bbox_pointer_array bbox_pointers (max_size);
    generate_bboxes (bboxes, bbox_pointers, seed);
    for (std::size_t size = max_size; size >= min_size; size = size * size_scale) {
        bboxes.resize (size);
        bbox_pointers.resize (size);
        std::size_t p;
        double serial_time = 0.;
        for (unsigned pass = 0; pass < passes; ++pass) {
            serial_time += time_std_partition (bbox_pointers, p);
        }
        serial_time /= passes;
        tbb::task_scheduler_init scheduler (tbb::task_scheduler_init::deferred);
        for (unsigned threads = max_threads; threads >= min_threads; threads -= 1) {
            scheduler.initialize (threads);
            unsigned max_tasks = threads > 1 ? 32 * threads : 1;
            double parallel_time = 0.;
            for (unsigned pass = 0; pass < passes; ++pass) {
                parallel_time += time_parallel_partition (bbox_pointers, p, grain_size, max_tasks);
            }
            parallel_time /= passes;
            float speedup = serial_time / parallel_time;
            float efficiency = speedup / threads;
            std::cout << bbox_pointers.size() << ", " << threads << ", " << grain_size << ", " <<
                    serial_time << ", " << parallel_time << ", " <<
                    speedup << ", " << efficiency << std::endl;
            scheduler.terminate();
        }
    }
    return 0;
}

 

0 Kudos
Burr__Adam
Beginner
518 Views

Here it is on a 24 core machine compared to the 12 core machine, looks like it tops out near 12x. There is a dramatic drop in speed when the array gets bigger than ~400000000 elements on the 24 core machine, I have no idea why, they both have the same amount of memory. But it is unlikely I will ever have an array that big.

0 Kudos
Burr__Adam
Beginner
518 Views

I revisited GCC's __gnu_parallel::partition and discovered I didn't have OpenMP set up correctly. Here it is now, compared to mine. Even if it was faster it still might not be practice for me because I need to run multiple partitions in parallel inside of tbb tasks which might cause over-subscription. This is the same test code as before on 16 cores.

0 Kudos
Reply