Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Asynchronous tasks: can TBB help?

Franco_M_
Beginner
1,579 Views

Dear all,

I am planning to parallelize an iterative process, or better, I hope to parallelize it. I don't know if I can use TBB for this, I have a minimal training on TBB, using its containers and algorithms, but nothing like this.

The idea is simple, I have a path to reconstruct, and each point on the path is a task that changes the position of it via an iterative scheme. When a good solution is obtained the task will "signal" its neighbors that its position has changed, and they will start a new computation with the new adjacent point.

Can TBB be of help here? The essential part is to create independent tasks, signal adjacent ones, and when a signal is received, stop the computation and restart with the new information. 

Thanks for the help.
    /fm

0 Kudos
1 Solution
Alexei_K_Intel
Employee
1,579 Views

@Jim, TBB does guarantee any concurrency (with a small exception for tbb::task::enqueue). So using TBB parallel constructions with an assumption about the number of threads may lead to performance inefficiency or even a deadlock (depending on the algorithm). In some particular cases when your know about the whole application behavior and some internal behavior it is possible to know about real concurrency level; however, in any case, TBB popularizes task-based approach and OpenMP like approaches are not always suitable for it.

@Franco, I still suppose that the algorithm looks like wave-front for 1D case. I reworked the example from 2D case to 1D case. Consider the code snippet:

const int blockSize = 64;
// Initialize predecessor counts for blocks.
int numBlocks = points.size()/blockSize;
if ( points.size()%blockSize != 0 ) ++numBlocks;
std::vector<std::atomic<char>> counts(numBlocks, 3), counts2 (numBlocks);
*counts.begin() = *counts.cbegin() = 2;

// Create the second array for the points.
typedef decltype(points) Points;
Points points2(points.size());

// Prepare initial blocks for processing.
struct Block {
    size_t index;
    Points &points1;
    Points &points2;
    std::vector<std::atomic<char>> &counts1;
    std::vector<std::atomic<char>> &counts2;
};
std::vector<Block> blocks(numBlocks);
for ( size_t i = 0; i < numBlocks; ++i ) blocks = { i, points, points2, counts1, counts2 };

tbb::parallel_do( points.begin(), points.end(),
    [blockSize]( const Block& b, tbb::parallel_do_feeder<Block>&feeder ) {
        size_t numPoints = b.points1.size();
        // Extract bounds on block.
        size_t l = b.index * blockSize;
        size_t u = min(b.index * blockSize, b.points1.size());
        
        // Process the block.
        for (int i = l; i < u; i++) {
            int iter = 0;
            // Copy our point not to interfere with other tasks.
            b.points2 = b.points1;
            while (iter++ < maxIter && energyNorm > tol)
                b.points2.optimize(); // Optimize will use just b.points1, b.points1[i-1], and b.points1[i+1].
        }
        
        //Finish calculation if we meet a criterion.
        if ( globalNorm(curGlobalStress - prevGlobalStress) < tol )
            return;
                
        // Prepare a counter for the next epoch. It should be done before we release our reference.
        b.counts2[b.index] = b.index==0 || b.index==numPoints-1 ? 2 : 3;           

        // Account for successors.
        if( b.index-1>=0 && --b.counts1[b.index-1]==0 )
            feeder.add( Block( { b.index-1, b.points2, b.points1, b.counts2, b.counts1 } );

        if( --b.counts1[b.index]==0 )
            feeder.add( Block( { b.index, b.points2, b.points1, b.counts2, b.counts1 } );

        if( b.index+1<numPoints && --b.counts1[b.index+1]==0 )
            feeder.add( Block( { b.index+1, b.points2, b.points1, b.counts2, b.counts1 } );
    }
);

The idea is quite simple, we split points into blocks (the blockSize==1 is also possible). Each block is processed independently and the result is stored to a second array. We track dependencies in Count array (one counter for each block). When the current block is processed we decrement references to neighbors and ourself. If there are no more references to a block, we restart it with an updated array.

View solution in original post

0 Kudos
13 Replies
Vladimir_P_1234567890
1,579 Views

Hello, it looks you you are looking for async_node and flow graph https://software.intel.com/en-us/node/589743

--Vladimir

0 Kudos
Alexei_K_Intel
Employee
1,579 Views

Hi, could you describe the problem in some details, please? Is it stencil? Can the wave-front idea be applied?

Regards, Alex

0 Kudos
Franco_M_
Beginner
1,579 Views

Thanks for the pointers. Here's a simple drawing of what I need to do.

Let each point i on a path be aware of its predecessor and successor. Using the positions of i-1 and i+1 it will optimize its position (via an energy norm, but it's not important). When a "good position" is attained, it will raise a signal. Then its neighbors may themselves optimize their position based on the new i.

@Vladimir, this async_node solution seems very interesting. Do you think it can fit within my problem?

@Alex, I really hope I made my problem clearer now, let me know if I didn't explain it well enough. It's a path-optimization problem on a 2D, domain, however it's still monodimensional in its essence.

0 Kudos
Alexei_K_Intel
Employee
1,579 Views

Thank you for the explanation. Could you describe how is the algorithm implemented serially, e.g.

for ( int iter = 0; iter < maxIter; ++iter )
    for ( auto point = Points.begin()+1; point != Points.end()-1; ++point )
        point->optimizePosition( *(point-1), *(point+1) );

0 Kudos
Franco_M_
Beginner
1,579 Views

Sure, Alexei, the fact is that, using an energy norm, the serial algorithm would be more like this (with more details):

// Globally I will end when the energy norm is sufficiently stable
while (globalNorm(curGlobalStress - prevGlobalStress) < tol)
{
   // Optimize each point, independently
   for (int i = 0; i < points.size(); i++)
   {
      // Here I will optimize, bad code follows to explain what will happen
      int iter = 0;

      // The optimization will stop when stable, or because it takes too long
      while (iter++ < maxIter && energyNorm > tol)
         points.optimize(); // Optimize will use just i, i-1, and i+1
   }
}

I hope it helps. It's a "wave" movement from the first point to the last and back again, until I reach a good solution (globally). Points are optimized locally, not globally, so I just need the adjacent ones. I don't really care if an optimization, when parallelized, will use an "old" adjacent best position, I might check their "version number" to see if something has changed, and just in that case recompute.

Thanks for your help!
    Franco

0 Kudos
jimdempseyatthecove
Honored Contributor III
1,579 Views

This might be easier done with OpenMP. Sketch:

atomic<int> new_position;
new_position = 0;
atomic<bool> Done;
Done = false;

#pragma omp parallel
{
 int my_position = -1; // initialize to trigger restart
 while (!Done)
 {
  while (my_position != new_position)
  {
   my_position = new_position;
   // your code here to initialize/reset position info
   ...
   // have new position info
   // loop in the event of new_position
  } // while (my_position != new_position)
  // begin/advance a step in the position
  ...
  if (WeMoved())
   ++new_position; // atomic increment
  if (TerminateCondition())
   Done = true; // *** do not use Done = TerminateCondition(); ***
 } // while (!Done)
}

Jim Dempsey

0 Kudos
Alexei_K_Intel
Employee
1,579 Views

@Franco, have you considered using additional memory and usual parallel_for? e.g.

// Globally I will end when the energy norm is sufficiently stable
while (globalNorm(curGlobalStress - prevGlobalStress) < tol)
{
   // Optimize each point, independently
   tbb::parallel_for ( 0, points.size(), []( int i )
   {
      // Here I will optimize, bad code follows to explain what will happen
      int iter = 0;
 
      // The optimization will stop when stable, or because it takes too long
      newPoints = points;
      while (iter++ < maxIter && energyNorm > tol)
         newPoints.optimize(); // Optimize will use just i, i-1, and i+1
   } );
   swap(newPoints, points);
}

 

0 Kudos
Franco_M_
Beginner
1,579 Views

@Alexei, yes, but then I would lose the asynchronous part. I'd like to make everything independent as much as possible, if I have to swap points in bulk I need a barrier (forgive this MPI-jargon). And it would be no fun :)

@Jim of course, but I am stuck with clang on OSX, so OpenMP isn't an option.

Thanks!

0 Kudos
jimdempseyatthecove
Honored Contributor III
1,579 Views

The use of OpenMP was for sketch code. The same sketch will work using TBB constructs. You would replace the "#pragma omp parallel: with a TBB construct of your choice that constructs a team of all the available hardware theads. For example parallel_for_each using a range of [0:numberOfHardwareThreads).

The remainder of the code remains the same.

Jim

0 Kudos
Alexei_K_Intel
Employee
1,580 Views

@Jim, TBB does guarantee any concurrency (with a small exception for tbb::task::enqueue). So using TBB parallel constructions with an assumption about the number of threads may lead to performance inefficiency or even a deadlock (depending on the algorithm). In some particular cases when your know about the whole application behavior and some internal behavior it is possible to know about real concurrency level; however, in any case, TBB popularizes task-based approach and OpenMP like approaches are not always suitable for it.

@Franco, I still suppose that the algorithm looks like wave-front for 1D case. I reworked the example from 2D case to 1D case. Consider the code snippet:

const int blockSize = 64;
// Initialize predecessor counts for blocks.
int numBlocks = points.size()/blockSize;
if ( points.size()%blockSize != 0 ) ++numBlocks;
std::vector<std::atomic<char>> counts(numBlocks, 3), counts2 (numBlocks);
*counts.begin() = *counts.cbegin() = 2;

// Create the second array for the points.
typedef decltype(points) Points;
Points points2(points.size());

// Prepare initial blocks for processing.
struct Block {
    size_t index;
    Points &points1;
    Points &points2;
    std::vector<std::atomic<char>> &counts1;
    std::vector<std::atomic<char>> &counts2;
};
std::vector<Block> blocks(numBlocks);
for ( size_t i = 0; i < numBlocks; ++i ) blocks = { i, points, points2, counts1, counts2 };

tbb::parallel_do( points.begin(), points.end(),
    [blockSize]( const Block& b, tbb::parallel_do_feeder<Block>&feeder ) {
        size_t numPoints = b.points1.size();
        // Extract bounds on block.
        size_t l = b.index * blockSize;
        size_t u = min(b.index * blockSize, b.points1.size());
        
        // Process the block.
        for (int i = l; i < u; i++) {
            int iter = 0;
            // Copy our point not to interfere with other tasks.
            b.points2 = b.points1;
            while (iter++ < maxIter && energyNorm > tol)
                b.points2.optimize(); // Optimize will use just b.points1, b.points1[i-1], and b.points1[i+1].
        }
        
        //Finish calculation if we meet a criterion.
        if ( globalNorm(curGlobalStress - prevGlobalStress) < tol )
            return;
                
        // Prepare a counter for the next epoch. It should be done before we release our reference.
        b.counts2[b.index] = b.index==0 || b.index==numPoints-1 ? 2 : 3;           

        // Account for successors.
        if( b.index-1>=0 && --b.counts1[b.index-1]==0 )
            feeder.add( Block( { b.index-1, b.points2, b.points1, b.counts2, b.counts1 } );

        if( --b.counts1[b.index]==0 )
            feeder.add( Block( { b.index, b.points2, b.points1, b.counts2, b.counts1 } );

        if( b.index+1<numPoints && --b.counts1[b.index+1]==0 )
            feeder.add( Block( { b.index+1, b.points2, b.points1, b.counts2, b.counts1 } );
    }
);

The idea is quite simple, we split points into blocks (the blockSize==1 is also possible). Each block is processed independently and the result is stored to a second array. We track dependencies in Count array (one counter for each block). When the current block is processed we decrement references to neighbors and ourself. If there are no more references to a block, we restart it with an updated array.

0 Kudos
Franco_M_
Beginner
1,579 Views

Thanks, Alexei.

I struggled a little to understand what will happen in a parallel_do routine. And I still don't, honestly, I've never used anything different from a parallel_pipeline, or a parallel_for.

So you partition the list of points in blocks, so that they can be processed in parallel. Basically is the same as saying that each point will act independently from all the others.

Then, you call parallel_do on the list of points. The lambda will optimize the block of points.

However, I don't understand lines 42-53. As far as I understand, each block has a count property, but I fail to see how this is really used. It will keep track of adjacent nodes, but my understanding of the tbb::parallel_do_feeder is not at par with what it should be. When, for instance in line 42, we have the preceding block (index >= 1) with count zero, we change the count, and add another block to the list, maybe?

So is count is tracking how many blocks are currently using the i-th block?

Would you be so kind to explain it to me?

Thanks, and sorry to bother with these very basic questions!
     Franco

0 Kudos
Alexei_K_Intel
Employee
1,579 Views

You understand the idea correctly. count is tracking how many references to a block exist (up two from heighbours and one from itself). The idea is that we have two epochs: the current and the next. When we finish with the current epoch we remove the references to ourself and neighbours. When count (number of references) reaches zero, it means that the block is not more required on the current epoch and can be process again on the next epoch (we add the block to the feeder).

parallel_do can be considered as parallel_for_each but it has a possibility to add additional work via the feeder.

If you still have questions feel free to ask.

P.S. I realized that the code is incorrect because there are should be two arrays of counters (like two arrays of points for each epoch) because next epoch will interfere with the previous one. I fixed the code in the previous comment.

0 Kudos
Franco_M_
Beginner
1,579 Views

Thanks, Alexei, I tried a parallel_do and I think it suits my needs without too much efforts.

Thank you!

0 Kudos
Reply