Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2464 Discussions

help, how to stop or terminate parallel_for and parallel reduce

mblizz
Beginner
1,919 Views

Is there a way to terminate an iteration when doing parallel_for or parallel reduce?? Like for example the "break" in a normal for loop??

In advance, thanks for reading this and much more if there is feedback. Godbless..

0 Kudos
29 Replies
Anton_Pegushin
New Contributor II
1,563 Views
Quoting - mblizz

Is there a way to terminate an iteration when doing parallel_for or parallel reduce?? Like for example the "break" in a normal for loop??

In advance, thanks for reading this and much more if there is feedback. Godbless..

Hello, there is a functionality in TBB called "Task cancellation". From what you ask, this is exactly what you need. It's described in section 8.3.8 of the Reference manual and here is a short example on how you use it:

[cpp]class FindElemAndCancel {
    const vector& intVec;
    const int valToFind;
    int * const idx;
public:
    FindElemAndCancel(const vector& intVec_, const int valToFind_, int *idx_)
        : intVec(intVec_), valToFind(valToFind_), idx(idx_) {}

    void operator() (const tbb::blocked_range& r) const {
        int r_end = r.end();
        for ( int i = r.begin(); i < r_end; ++i ) {
            if ( intVec == valToFind ) {
                tbb::task::self().cancel_group_execution();
            }
        }
    }
};

tbb::parallel_for( tbb::blocked_range(0, problemSize), FindElemAndCancel(intVec, valToFind, &valIdx), tbb::auto_partitioner() );

[/cpp]


0 Kudos
Alexey-Kukanov
Employee
1,563 Views
Quoting - mblizz

Is there a way to terminate an iteration when doing parallel_for or parallel reduce?? Like for example the "break" in a normal for loop??

In advance, thanks for reading this and much more if there is feedback. Godbless..


Read sections about task cancellation and task_group_context in the TBB Reference manual (Reference.pdf). Also a series of blogs on exception handling and cancellation by Andrey Marochkocould provide additional information.

0 Kudos
jimdempseyatthecove
Honored Contributor III
1,563 Views

Hello, there is a functionality in TBB called "Task cancellation". From what you ask, this is exactly what you need. It's described in section 8.3.8 of the Reference manual and here is a short example on how you use it:

class FindElemAndCancel { const vector& intVec; const int valToFind; int * const idx; public: FindElemAndCancel(const vector& intVec_, const int valToFind_, int *idx_) : intVec(intVec_), valToFind(valToFind_), idx(idx_) {} void operator() (const tbb::blocked_range& r) const { int r_end = r.end(); for ( int i = r.begin(); i < r_end; ++i ) { if ( intVec == valToFind ) { tbb::task::self().cancel_group_execution(); } } } }; tbb::parallel_for( tbb::blocked_range(0, problemSize), FindElemAndCancel(intVec, valToFind, &valIdx), tbb::auto_partitioner() );



Anton,

Since all instances of the parallel_for are using the same referenced iterator why cann't the thread discovering the termination condition simply set the end to the begin or add a member function to do this (e.g. terminate()). In this manner you do not need to add additional code other than a call to r.terminate().

Jim Dempsey

0 Kudos
RafSchietekat
Valued Contributor III
1,563 Views

"all instances of the parallel_for are using the same referenced iterator" That would not appear to be so.

0 Kudos
Alexey-Kukanov
Employee
1,563 Views
Anton,

Since all instances of the parallel_for are using the same referenced iterator why cann't the thread discovering the termination condition simply set the end to the begin or add a member function to do this (e.g. terminate()). In this manner you do not need to add additional code other than a call to r.terminate().

Jim Dempsey


Such a solution was described in Arch Robison's blog: "Have a fish - how break from a parallel loop in TBB".

0 Kudos
Anton_Pegushin
New Contributor II
1,563 Views
Quoting - Raf Schietekat

"all instances of the parallel_for are using the same referenced iterator" That would not appear to be so.

Hi, I'm guessing what Jim meant was that all instances of parallel_for use the same blocked_range object by reference. That is true and that is what Arch Robison used for his sample code that Alexey mentions. Arch wrote that code and that blog entrybeforetask cancellation functionality was added to TBB, but we already had at least two customers requesting it.

Sample code that I posted in my original comment was aimed only to help understand how cancellation can be used. Justifying cancellation and making it the only possible solution would require coming up with a much more complex problem, which might have defeated the purpose of that example.

0 Kudos
RafSchietekat
Valued Contributor III
1,563 Views

"all instances of parallel_for use the same blocked_range object by reference" Who are you and what have you done to Anton Pegushin? :-)

0 Kudos
RafSchietekat
Valued Contributor III
1,563 Views

I wrote "Who are you and what have you done to Anton Pegushin?" I should probably have reserved that for another situation...

More straightforwardly, the described technique had to rely on explicit inheritance of a reference/pointer to a common external switch (which should probably be tbb::atomic instead), because each Body::operator()invocation uses a distinct Range instance (a corrollary of the splitting-constructor API).

0 Kudos
Anton_Pegushin
New Contributor II
1,563 Views
Quoting - Raf Schietekat

"all instances of parallel_for use the same blocked_range object by reference" Who are you and what have you done to Anton Pegushin? :-)

:) you're right. What I said was incorrect in so many ways... what was I thinking.

What I actually meant was that parallel_for algorithm iterates over a range, which can be implemented to reference a global "cancel this operation" flag which is monitored to, when signalled, set the end of the range to the beginning of the range or report the range as empty.

0 Kudos
jimdempseyatthecove
Honored Contributor III
1,563 Views

:) you're right. What I said was incorrect in so many ways... what was I thinking.

What I actually meant was that parallel_for algorithm iterates over a range, which can be implemented to reference a global "cancel this operation" flag which is monitored to, when signalled, set the end of the range to the beginning of the range or report the range as empty.

The parallal_for, which creates the block ranged iterators for each thread (with different ranges) could construct a terminationable class of iterator which point back (instead of to a global termination flag) to a common variable constructed by the parallel_for (call it a parent interator control structure if you wish). The next() for thistype of iterator could then test the termination flag in addition to the limit on the range. You would also need a terminate() or have the dtor of the (component) thread's copy of the iterator perform an implicit terminate().

Conceptually you could view this as a shared reduction variable (as opposed to private copy to be reduced at end of task). When modified, it indicates solution found (or other termination condition).

Using a shared variablewould be less of an impact on the system than a terminate thread (or task)(when each pass of iteration has relatively low computation requirements).

Jim Dempsey

0 Kudos
aftershock4
Beginner
1,563 Views

Hello, there is a functionality in TBB called "Task cancellation". From what you ask, this is exactly what you need. It's described in section 8.3.8 of the Reference manual and here is a short example on how you use it:

class FindElemAndCancel { const vector& intVec; const int valToFind; int * const idx; public: FindElemAndCancel(const vector& intVec_, const int valToFind_, int *idx_) : intVec(intVec_), valToFind(valToFind_), idx(idx_) {} void operator() (const tbb::blocked_range& r) const { int r_end = r.end(); for ( int i = r.begin(); i < r_end; ++i ) { if ( intVec == valToFind ) { tbb::task::self().cancel_group_execution(); } } } }; tbb::parallel_for( tbb::blocked_range(0, problemSize), FindElemAndCancel(intVec, valToFind, &valIdx), tbb::auto_partitioner() );




I have been looking just for this example. I do wonder if it is correct.
void operator() (const tbb::blocked_range& r) const {
int r_end = r.end();
for ( int i = r.begin(); i < r_end; ++i ) {
if ( intVec == valToFind ) {
tbb::task::self().cancel_group_execution();
*idx=i; // <--- This was missing
}
}
}

It was missing the line *idx=i;
Correct me if I am wrong.. Would that be the way to return information from the for loop?

0 Kudos
RafSchietekat
Valued Contributor III
1,563 Views
Some "details" were indeed left out. It might be clearer to put that missing assignment before the cancel_group_instruction(), and then you might as well also break/return. If you're sure that only one index will be found, you can probably get away with a pointer/reference to a plain int, if you only read its value after returning from parallel_for; otherwise make that int atomic. Also don't forget to occasionally poll is_cancelled() when not relying on a suitable grainsize.
0 Kudos
Anton_Pegushin
New Contributor II
1,563 Views
Quoting - Raf Schietekat
Some "details" were indeed left out. It might be clearer to put that missing assignment before the cancel_group_instruction(), and then you might as well also break/return. If you're sure that only one index will be found, you can probably get away with a pointer/reference to a plain int, if you only read its value after returning from parallel_for; otherwise make that int atomic. Also don't forget to occasionally poll is_cancelled() when not relying on a suitable grainsize.
Hi,

Raf is right. Index assignment should be before the call to "cancel" and guarded too. Also keep in mind that when searching in parallel like in the example above, you're not guaranteed to obtain an index of the first occurence. An index will be pointing to _some_ element in the array, where one of the worker threads found it first.
0 Kudos
aftershock4
Beginner
1,563 Views
Quoting - Raf Schietekat
Some "details" were indeed left out. It might be clearer to put that missing assignment before the cancel_group_instruction(), and then you might as well also break/return. If you're sure that only one index will be found, you can probably get away with a pointer/reference to a plain int, if you only read its value after returning from parallel_for; otherwise make that int atomic. Also don't forget to occasionally poll is_cancelled() when not relying on a suitable grainsize.

What do you mean by occasionally? You mean is_cancelled() expensive? How slow is it?

So that is how it should look.
[cpp]void operator() (const tbb::blocked_range& r) const { 
int r_end = r.end(); 
for ( int i = r.begin(); i < r_end; ++i ) { 
if (tbb::task::self().is_cancelled())
break;
if ( intVec == valToFind ) { 
*idx*i;
tbb::task::self().cancel_group_execution(); 
break;
} 
} 
}[/cpp]
0 Kudos
RafSchietekat
Valued Contributor III
1,563 Views

I don't know how expensive is_cancelled() really is, but it's not inline, and some processors may incur an extrapenalty for the associated memory semantics, so you may perhaps do well by only calling it every 100 iterations or so (benefit or not, and exact number of iterations, to be determined empirically).

I didn't think it is necessary to put the result assigment before the cancellation, though, just clearer.

0 Kudos
Alexey-Kukanov
Employee
1,563 Views
Quoting - Raf Schietekat

I don't know how expensive is_cancelled() really is, but it's not inline, and some processors may incur an extrapenalty for the associated memory semantics, so you may perhaps do well by only calling it every 100 iterations or so (benefit or not, and exact number of iterations, to be determined empirically).

I didn't think it is necessary to put the result assigment before the cancellation, though, just clearer.


Actually, task::is_cancelled() is inline in task.h; task::self() is out-of-line however, and it contains access to thread local storage which is more expensive than reading e.g. stack variables. So Raf is right; run a few experiments with representative workloads to findgoodbalance between the cost of excessive computations and overhead of the cancellation check.
0 Kudos
jimdempseyatthecove
Honored Contributor III
1,563 Views

>>you're not guaranteed to obtain an index of the first occurence. An index will be pointing to _some_ element in the array, where one of the worker threads found it first.

This is a reason why you would favor a termination state variable in the iterator (or pointed to by the iterator). Should you issue a cancel_group() function then the first thread to find a match (not necessarily the earliest index) would terminate the search (note, you may also have multiple threads concurrently issuing the cancel_group).

With the termination state variable, a thread can test not only if other thread found a fish, but also if the found fish was further up/down stream of where you are now looking. Depending on placement the thread can decide NOT to terminate just yet, and continue searching until it passes the position in the stream where the other fish was found.

For this type of search (lowest/highest index) you would want to chunk the range into smaller pieces such that all threads work in smaller sections all progressingfront to back or back to front through the data set.

Jim Dempsey
0 Kudos
RafSchietekat
Valued Contributor III
1,563 Views
"Actually, task::is_cancelled() is inline in task.h;"
Yes, but that's only the tip of the ice cube, :-) which in an inner loop could perhaps make a bit of a difference, although the acquire is probably more important, even on Intel's own hardware (Itanium).

"task::self() is out-of-line however, and it contains access to thread local storage which is more expensive than reading e.g. stack variables."
Of course... sorry, I just replied to the question where I really should have looked at the code itself. Unfortunately Body and Range are just duck-typed, so there's nothing to easily redefine underneath (nod to Jim Dempsey).

"So Raf is right; run a few experiments with representative workloads to find good balance between the cost of excessive computations and overhead of the cancellation check."
Another idea, instead of or rather in addition to amortisation (sic?), is testing a shared atomic, perhaps the result itself, instead of calling is_cancelled(). You should find that the optimum number of iterations for amortisation will be lower, but it should still be beneficial. cancel_group_execution() would still serve to more efficiently tear down the parallel_for itself (I presume?).
0 Kudos
jimdempseyatthecove
Honored Contributor III
1,563 Views

Testing is important. Through testing you canderive ageneralized function for determining partitioning of the problem. This function can then be used in your production code. Note, the partitioning function may be reduced to a series of if test just prior to the parallel_for.

Data (array) xxx, thread partitioning tn

xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|t0|t1|t2|t3|t0|t1|t2|t3|t0|t1|t2|t3|t0|t1|t2|t3|...

or

xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
| t0 | t1 | t2 | t3 | t0 | t1 | t2 | t3 | t0 | t1 |...

The shorter the run time (|tn|) the lower the latency (from found to all threads terminating)
However, the shorter the run time per (|tn|) the higher the number of session start/stop times.

The recommend technique (my recommendation) would be to provide a6 valued iterator

iBegin, iEnd, iRunLengt, iSkipLength, iThreadNum, foundIndex*

Where

iBegin, iEnd are the full half open range
iRunLength is a consecutive index count
iSkipLength, is the number of elements to skip between consecutive index runs
iThreadNum (0 base team member number working on parallel_for)
foundIndex* where you store the result (atomic and viewable by all team members)

The iRunLength should also be tuned for cache sensitivity.

With the above type of iterator, the parallel_for instantiates the thread team once, then the iterator takes care of parceling out the array slices.

When operating in sections of array prior to any thread finding a match then the *foundIndex will remain unchanged and thus remain in cache (likely L1) and provide very little overhead to test. When a thread makes a find, it performs a CAS only when its found index is .lt. (or .gt.) other found or not found indicator. When thread has not found condition, it test *foundIndex, if not found or if a found condition is seen then if found condition preceeds your position - quit, if found position follows your position - continue.

The above iterator should have relatively little overhead as a two level loop

for(i=iBegin + (iRunLength*iThreadNum); i{
jEnd = min(i+iRunLength, iEnd);
for(j=i; j {
if(*foundIndex < i) return; // lowest match foundIndex initialize toint max
if(match(??))
{
// return our index or new found index whichever is less
for(;;)
{
int t = *foundIndex;
if(i>t) return;
if(CAS(foundIndex, t, i)) return;
// foundIndex changed, try again
}
}
}
}

And a slightly different version for searching from high to low.

Stick the above into a common iterator type once you standardize the functionality.

Jim Dempsey
0 Kudos
RafSchietekat
Valued Contributor III
1,389 Views

If you want to solve the smallest/largest-index problem, should you really use CAS, I wonder? I'd rather try parallel_reduce() and a simple "item found" signal. Just be careful about how to split a Range (each time getting a new discontiguous piece of a shared state, or something cleverer, see below), and never use task cancellation (which would only be correct with nested contexts in a bespoke algorithm, and would then probably not bring any benefit anyway, or worse). I think that should be strictly faster than the CAS solution, although not necessarily by a lot. Would you agree with that, Jim?

Just giving out discontiguous pieces may not yet be the most efficient way to distribute the work, because the winner may be near the end of a piece, and just making the pieces smaller implies more task scheduling overhead. Another solution (I'm just brainstorming now), assuming good cache-line alignment andN worker threads, would be to give out pieces of the P items at indices (i div N)*N*P+(i mod N)+N*[0,P[, with i growing from 0(maybe some items could be packed together for better alignment). Next question is how to determine good values for N and P at run time, and perhaps vary them during the computation by deviating from the regularity of that formula.

0 Kudos
Reply