Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2466 Discussions

help, how to stop or terminate parallel_for and parallel reduce

mblizz
Beginner
2,179 Views

Is there a way to terminate an iteration when doing parallel_for or parallel reduce?? Like for example the "break" in a normal for loop??

In advance, thanks for reading this and much more if there is feedback. Godbless..

0 Kudos
29 Replies
jimdempseyatthecove
Honored Contributor III
495 Views

The CAS only occures for new best fit. Depending on frequency of matches, the CAS may only occur once.

Using parallel REDUCE, as Raf outlined would cause threads not making first find (not necessarily earliest index) to bail out without regard to if their search preceed the first found index. Parallel REDUCEcould work with the following modification. Any thread making a find overwrites what ever is in the found shared variable without regard as to if their found index preceeds or follows currently stored found index. Other threads, detecting last written found index (not necessarily earliest index) would compare this against their search index. Should their search positionfollow this found index - bail out, else continue searching. The lowest index can be chosed in the reduction operator.

The benefit is eliminating the CAS
The cost is should an earlier found index be overwrittenalater found index (prior to other threads seeing earlier found index), then this delays the bailout time. The delay in the bailout time could exceed the cost of the CAS. You also have the cost of performing the reduction.

An alternate method would be (for small number of threads <= 8) to have a table of found indexes. One for each thread. A thread bails out on found (and poking mailbox) or if any found index in any mailbox preceeds its search point.

Note, as thread observes change in found table at lower than current search position, the overhead will be equivilent to the CAS. Therefor, elaborating the technique may be of little value. Without running a test it would be hard to prove or disprove this assumption.

I think this is a case where CAS is more effective than the alternatives.

Jim Dempsey


0 Kudos
RafSchietekat
Valued Contributor III
495 Views
I think you slightly misinterpreted my proposal (there would be no bailout that might miss the ultimate result), but you've also convinced me to withdraw it, thanks.
0 Kudos
jimdempseyatthecove
Honored Contributor III
495 Views

Raf,

>>(there would be no bailout that might miss the ultimate result)

This would be true when you remove the inner loop bail-out test and perform the test after the completion of the run of the inner loops (at leap intervals)and requires synchronization of threadsat leap intervals.

e.g. 4-threads, iteration space divided into 40 zones, 4 threads work on 1st 4 zones, test flag, 4 threads advance to next 4 zones, ...

Your technique required the threads not finding an index to advance through the remainder of its zone (since it would not know if its zone preceeded/followed other threads zones (excepting for seperate mailbox/found flag technique outlined earlier). This would cause an exit latency after found. This latency can be reduced by increasing the number of zones (reducing size of zone), however, this also increases the number of synchronization points.

The method I proposed has no synchronization points. i.e. each thread can pick its zone from the group of 4 zones then leap toits next zone, etc.. with no synchronization between leaps. Should any thread in the team of 4 get preempted (to run some other application) then the three other threads won't get held up at the group of 4 zones boundry.

Don't count out your technique too soon. When the match test is very short, continuing through to end of zone might be faster than having a test for found after each match test. Combinations of running for bursts within your zone without found test might work well too.

There is no one best way for all situations.

Keep up your good contributions.

Jim Dempsey
0 Kudos
RafSchietekat
Valued Contributor III
495 Views
"This would be true when you remove the inner loop bail-out test and perform the test after the completion of the run of the inner loops (at leap intervals) and requires synchronization of threads at leap intervals."
Well, I wasn't thinking of a uniform work distribution with gates along the way, which would only work on a dedicated system: each thread would process pieces at its own pace, and start reducing as soon as it was refused a new one. Your idea to distribute information about where a solution has been found would probably already improve overall performance and decrease latency because some tasks could bail out earlier. Still, to win against CAS (with implicit reduction), pieces might need to be so small that their execution (tasks) and distribution (fetch-and-add) would carry too much overhead, plus the increased cost of the reduction phase, so it doesn't seem a likely winner anymore. Well, maybe I should unwithdraw it anyway, and then somebody could try to put some numbers on it, to confirm this or surprise us. :-)
0 Kudos
jimdempseyatthecove
Honored Contributor III
495 Views

Case 1: item searched for not in list - # CAS = 0

Case 2: 1st thread to find match finds lowest index - 1 thread performs CAS, others "suffer" cache miss on reading found variable and bails out. Note, this suffrage is equivalent to cache miss on reading of bool bailout flag.

Case 3: 1st thread to find match finds 2ndlowest index - 1 thread performs CAS, others "suffer" cache miss on reading found variable and all but one of these threads bails out, remaining thread performs CAS. 2 CAS

...

So there is a potential of having 0 to nThread number of CAS operations.

Your urge to avoid CAS, IMHO, is unfounded.

Assume you replace CAS with mailbox 1 per thread where threads examine each others mailbox.

Your "training" tells you to place mailboxes on separate cache lines. Whereas themost efficient method (in this case)would be to place in samecache line (or in as few as possible cache lines). Because, any thread making a non-first occurance find will invalidate the other threads cache line when writing to mailbox. Other threads thus experience a cache miss. Should two finds occur ~concurrently the stillsearching threads experience 2 cache misses (with mailbox in seperatecache line) or 1 cache miss (with mailbox in same cache line).

Whatever is used for signaling whether CAS, or write-through, will cause cache eviction and cache miss on other threads. The CAS will delay the thread issuing the CAS but at the benefit of reducing the latency of the other threads completing the task (either to bail out or to determine to continue). The optimization should be focused on mt-Task completion not individual runtimes (from perspective of perspective of thread as opposed to impact on other team members).

Test do need to be made. Rather tweaks need to be made to optimize the code.

Jim



0 Kudos
RafSchietekat
Valued Contributor III
495 Views
"Your urge to avoid CAS, IMHO, is unfounded."
I feel I'd better play dead now to survive this. :-)
0 Kudos
robert_jay_gould
Beginner
495 Views
Quoting - mblizz

Is there a way to terminate an iteration when doing parallel_for or parallel reduce?? Like for example the "break" in a normal for loop??

In advance, thanks for reading this and much more if there is feedback. Godbless..


Although what I'm going to propose is kinda of odd solution, and isn't really as effective as canceling by hand, maybe block_range could have a cancel() method or something like that...

However honestly I can't think of a good reason, that's not smelly, why block_range would have a cancel method, it would make the library a bit simpler and safer to use, as it removes the burden from the user to break loops in a thread safe way.

Another option would be to provide a cancelable_for_loop, that stops once the operator() returns a false (using a bool operator()(range r) functor).

But either of these solutions would still need to wait until the current longest range finishes processing. So they wouldn't be as good as a hand-break.

0 Kudos
jimdempseyatthecove
Honored Contributor III
495 Views

Correct Robert,

Conceptually the parallel_for "needs" an optional terminator in addition to the standard iterator. The "hand-break" code is placed into the terminator. You could also have a new class of iterator that contains the standard iterator plus the terminator and mf that coordinate when to call the terminator test.

Either you hide the hand-break inside the classes or you expose the hand-break. Go the hide in class route when you need reusibility (and after you standardize just what you want to do).

Jim
0 Kudos
jimdempseyatthecove
Honored Contributor III
495 Views

Raf,

Below is a sample of break out code followed by runtimes

Note, the search is on an array of longs. This is a memory bandwidth problem. Typically a bail-out technique will be used when the parallel_for step cost is relatively large (it is extreamly small in this example).

Also, the sample is coded in QuickThread as opposed to TBB or OpenMP. Should be easy enough to adapt.

[cpp]const long nCells = 1000000;
long	Array[nCells];

// parallel_for task to find value in array
//
void doFind(long iBegin, long iEnd, long A[], long find, volatile long* foundIndex)
{
 if(*foundIndex < iBegin) return;	// other thread found first occurance
 for(long i=iBegin; i < iEnd; ++i)
 {
  if((i%100)==0)	// arbitrarily look at memory every 100 iterations
  {
   if(*foundIndex < i) return;	// other thread found first occurance
  }
  if(A == find)
  {
   for(;;)
   {
    long oldIndex = *foundIndex;
    if(oldIndex < i) return;	// other thread found first occurance
    if(_InterlockedCompareExchange(foundIndex, i, oldIndex) == oldIndex)
     return;
   }
  }
 }
}

// parallel_distribute task to find value in array
void doFindDistribute(
  int threadNum, int nThreads, long iBegin, long iEnd,
  long A[], long find, volatile long* foundIndex)
{
  int	L2cacheLineSize = (int)CacheLevelLineSize(2);
  int	runCount = L2cacheLineSize / sizeof(A[0]);
  for(long iOuter = iBegin + (runCount * threadNum); iOuter < iEnd; iOuter += runCount * nThreads)
  {
    if(*foundIndex < iOuter) return;	// other thread found first occurance
    long iEndInner = iOuter + runCount;
    if(iEndInner > iEnd) iEndInner = iEnd;
    for(long i = iOuter; i < iEndInner; ++i)
    {
      if(A == find)
      {
        for(;;)
        {
          long oldIndex = *foundIndex;
          if(oldIndex < i) return;	// other thread found first occurance
          if(_InterlockedCompareExchange(foundIndex, i, oldIndex) == oldIndex)
            return;
        }
      }
    }
  }
}

void test()
{
  // serialy initialize data
  for(long i = 0; i < nCells; ++i)
    Array = rand() * rand();

  for(long i=0; i < nCells; i+=100000)
  {
    __int64 BeginTick, EndTick, ElapseTicks;

    long find = Array;
    volatile long foundIndex;
    long Chunk = 50000;
    foundIndex = nCells; // initialize beyond end of array
    BeginTick = _rdtsc();
    for(long j = 0; j < nCells; ++j)
    {
      if(Array == find)
      {
        foundIndex = j;
        break;
      }
    }
    EndTick = _rdtsc();
    ElapseTicks = EndTick - BeginTick;
    if(foundIndex < nCells)
    {
      std::cout << "Ser " << find << " at "
                << foundIndex << " Ticks "
                << ElapseTicks << std::endl;
    }
    else
    {
      std::cout << "Ser " << find
                << " not found Ticks "
                << ElapseTicks << std::endl;
    }

    foundIndex = nCells; // initialize beyond end of array
    BeginTick = _rdtsc();
    parallel_for_v1(Chunk, doFind, 0, nCells, Array, find, &foundIndex);
    EndTick = _rdtsc();
    ElapseTicks = EndTick - BeginTick;
    if(foundIndex < nCells)
    {
      std::cout << "Par " << find << " at " 
                << foundIndex << " Ticks "
                << ElapseTicks << std::endl;
    }
    else
    {
      std::cout << "Par " << find
                << " not found Ticks " 
                << ElapseTicks << std::endl;
    }

    foundIndex = nCells; // initialize beyond end of array
    BeginTick = _rdtsc();
    parallel_distribute_v1(AllThreads$, doFindDistribute, 0, nCells, Array, find, &foundIndex);
    EndTick = _rdtsc();
    ElapseTicks = EndTick - BeginTick;
    if(foundIndex < nCells)
    {
      std::cout << "Dis " << find << " at " 
                << foundIndex << " Ticks "
                << ElapseTicks << std::endl;
    }
    else
    {
      std::cout << "Dis " << find << " not found Ticks " 
                << ElapseTicks << std::endl;
    }

  }
}


Ser    757147 at      0 Ticks   12501
Par    757147 at      0 Ticks  106767
Dis    757147 at      0 Ticks   96138
Ser 388526383 at 100000 Ticks  295803
Par 388526383 at 100000 Ticks  434907
Dis 388526383 at 100000 Ticks  324747
Ser 115913288 at 200000 Ticks  422262
Par 115913288 at 200000 Ticks  469719
Dis 115913288 at 200000 Ticks  223155
Ser 206495010 at 300000 Ticks  654147
Par 206495010 at 300000 Ticks  892719
Dis 206495010 at 300000 Ticks  400932
Ser  76467996 at 400000 Ticks  934578
Par  76467996 at 400000 Ticks 1156104
Dis  76467996 at 400000 Ticks  723879
Ser 230797512 at 500000 Ticks 1056978
Par 230797512 at 500000 Ticks 1310562
Dis 230797512 at 500000 Ticks  705096
Ser  71493905 at 600000 Ticks 1454517
Par  71493905 at 600000 Ticks 1575054
Dis  71493905 at 600000 Ticks  783054
Ser 692857960 at 700000 Ticks 1678023
Par 692857960 at 700000 Ticks 1674198
Dis 692857960 at 700000 Ticks 1170072
Ser  72972742 at 800000 Ticks 1838934
Par  72972742 at 800000 Ticks 1980621
Dis  72972742 at 800000 Ticks 1075095
Ser 194646209 at 900000 Ticks 1904148
Par 194646209 at 900000 Ticks 1983087
Dis 194646209 at 900000 Ticks 1168587
[/cpp]

Jim
0 Kudos
Reply