Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Adding new work to parallel do

Franco_M_
Beginner
271 Views

Dear all,

in a previous post (this link) I've asked if TBB could help me in writing my software, and it turns out it is perfect for my job. I've got, however, a problem in adding new work to a parallel_do routine.

Let me recap the objective, reconstruct a particle path with an iterative solution (this image may help you):

  • each path point has an initial position
  • when the first solution to the coordinates are found, the next point starts
  • after adding the new point, the coordinates are optimized

The following code is a simulation of what I'd like to have. Each path node has an an integer ID, and I'd like to have 10 points, just to understand how I can parallelize the whole thing. Moreover, as a bonus, I'd like to retain all the path nodes in a vector (or any other container), but I don't know what's best for this objective.

I must be doing some stupid errors, since I always get ID zero.

This is my code:

#include <tbb/task_scheduler_init.h>
#include <tbb/parallel_for.h>
#include <tbb/parallel_do.h>
#include <tbb/concurrent_vector.h>

class pathnode
{
    int id_;
    
    std::mutex &m_;
    
    tbb::concurrent_vector<pathnode> &v_;
    
public:
    
    pathnode(int id, tbb::concurrent_vector<pathnode> v, std::mutex &mtx) : id_(id), v_(v), m_(mtx)
    {
        // NOP
    }
    
    void operator()(pathnode item, tbb::parallel_do_feeder<pathnode> &feeder) const
    {
        {
            std::lock_guard<std::mutex> l(m_);
            std::cout << "START thread id " << this_thread::get_id() << " ID " << id_ << std::endl;
        }
        
        // Simulate initializing the path coordinates
        {
            std::mt19937_64 eng{std::random_device{}()};
            std::uniform_int_distribution<> dist{100, 1000};
            auto p = std::chrono::milliseconds{dist(eng)};
            std::this_thread::sleep_for(p);
        }
        
        // Add more work to perform
        if (id_ < 10)
        {
            feeder.add(pathnode(id_ + 1, v_, m_));
        }

        // Simulate optimizing coordinates
        {
            std::mt19937_64 eng{std::random_device{}()};
            std::uniform_int_distribution<> dist{1000, 2000};
            auto p = std::chrono::milliseconds{dist(eng)};
            std::this_thread::sleep_for(p);
        }

        {
            std::lock_guard<std::mutex> l(m_);
            std::cout << "ENDED thread id " << this_thread::get_id() << " ID " << id_ << std::endl;
        }
    }
};

int main(int argc, char** argv)
{   
    std::mutex m;

    tbb::concurrent_vector<pathnode> trajectory;
    
    pathnode initial(0, trajectory, m);
    
    trajectory.push_back(initial);
    
    tbb::parallel_do(trajectory.begin(), trajectory.end(), trajectory.back());
    
    return 0;
}

The output isn't particularly good:

START thread id 0x104c03000 ID 0
START thread id 0x700000804000 ID 0
START thread id 0x700000c07000 ID 0
ENDED thread id 0x104c03000 ID 0
START thread id 0x700000401000 ID 0
START thread id 0x104c03000 ID 0
ENDED thread id 0x700000804000 ID 0
START thread id 0x700000804000 ID 0
ENDED thread id 0x700000c07000 ID 0
ENDED thread id 0x700000401000 ID 0
START thread id 0x700000c07000 ID 0
ENDED thread id 0x104c03000 ID 0
START thread id 0x104c03000 ID 0
START thread id 0x700000401000 ID 0
ENDED thread id 0x700000804000 ID 0
START thread id 0x700000804000 ID 0
ENDED thread id 0x700000c07000 ID 0
ENDED thread id 0x104c03000 ID 0
START thread id 0x700000c07000 ID 0
START thread id 0x104c03000 ID 0
Program ended with exit code: 9
^
|
I stopped it.

Can you help me?

Thanks!
     Franco

0 Kudos
2 Replies
Alexei_K_Intel
Employee
271 Views

Hi Franco,

In my opinion, the main issue that the code is a bit complicated: the Item and Body functionality are merged into one class. Undoubtedly, it is not error or something unusual but if consider the line:

std::cout << "START thread id " << this_thread::get_id() << " ID " << id_ << std::endl;

we can see that we access the "id_" from "this" (i.e. Body) not from "item" (that was previously added to the feeder).

On the line:

tbb::parallel_do(trajectory.begin(), trajectory.end(), trajectory.back());

The third argument is Body and it is used for all invocations of

void operator()(pathnode item, tbb::parallel_do_feeder<pathnode> &feeder) const

So "this->id_" is always 0 (because "initial.id_" == 0).

I'd suggest adding a special class for Body:

struct Body {
    void operator()(pathnode item, tbb::parallel_do_feeder<pathnode> &feeder) const {
        item( feeder );
    }
};

And use it in parallel_do (do not forget to fix functor of pathnode) :

tbb::parallel_do(trajectory.begin(), trajectory.end(), Body());

 

As for concurrent_vector, it will work until the container that does not invalidate iterator is used. I.e. if push_back (or similar interface) does not invalidate iterators then such container can be used like in the example.

 

One more remark that parallel_do allows specifying the whole container instead of iterators, i.e.

tbb::parallel_do(trajectory, Body);
0 Kudos
Franco_M_
Beginner
271 Views

Thanks Alaxei, it worked!

I don't understand if what I did is dangerous:

        // Add more work to perform
        if (id_ < 10)
        {
            auto p = v_.push_back(pathnode(id_ + 1, v_, m_));
            feeder.add(*p);
        }

I need to keep all the points in the vector, but I don't know if those lines may bring me some kind of race condition. Do you think that code is ok?

Thanks!

0 Kudos
Reply