Community
cancel
Showing results for 
Search instead for 
Did you mean: 
Highlighted
New Contributor I
9 Views

parallel_reduce: are the join and the call operator exclusive in the body?

From the TBB docs and examples it seems that the intent of the body's join and function call operator methods are that they aren't reentrant and are not called at the same time. This avoids needing locks in the general case which is a nice feature.

Is my interpretation of the docs correct?

I've run in to a situation where it seems this rule is violated so I'm wondering if it's a bug or a misunderstanding on my part.

Thanks.

Ahran
0 Kudos
12 Replies
Highlighted
Black Belt
9 Views

operator()() and join()would not be called at the same time for any particular instance, but they should still be reentrant, because they may be called concurrently between different Body instances. That implies that you should synchronise access to any shared data; note that competing for write access to shared data should be avoided if at all possible because it is a performance killer.
0 Kudos
Highlighted
Black Belt
9 Views

"note that competing for write access to shared data should be avoided if at all possible because it is a performance killer"
Those by-the-way remarks can be dangerous (I now wish I had omitted it, and sorry for any confusion)... I meant anything that interferes with anybody's attempts to get exclusive access, which includes, paradoxically, acquiring a read lock even if nobody is interested in writing the data (the lock action itself would be the problem), or reading an atomic value that another thread might be updating. Concurrent pure reads should be OK, though... except that, strictly speaking, there is no documentationaboutmemory semantics guarantees between tasks, so you can only assume that things work as you would expect.Well, that's how I understand it, so you're most welcome to correctthis if needed.

0 Kudos
Highlighted
New Contributor I
9 Views

Quoting - Raf Schietekat
operator()() and join() would not be called at the same time for any particular instance, but they should still be reentrant, because they may be called concurrently between different Body instances. That implies that you should synchronise access to any shared data; note that competing for write access to shared data should be avoided if at all possible because it is a performance killer.

I think I'd define "reentrant" in a slightly different way than you for instance methods but I think I agree with what you are saying. In other words, it should be safe to modify instance member variables without a lock in either of these methods. Any access to shared data would need to be locked. Some pseudo-code might help me explain my question better.

I believe that the following assertions should always be true. However, I think I've found a case where they aren't. (The "bug" has to do with task stealing. I'll post the full test case once I figure out how to properly paste in source code.)

class reducer {
bool m_accessing_local_data;

public:
reducer() : m_accessing_local_data(false) {}
reducer(reducer &other, tbb::split): m_accessing_local_data(false) {}

void join(reducer &other) {
assert(!accessing_local_data);
accessing_local_data = true;
// merge other's data
accessing_local_data = false;
}

void operator()(/*...*/) {
assert(!accessing_local_data);
accessing_local_data = true;
// operate on the range
accessing_local_data = false;
}
};
0 Kudos
Highlighted
Black Belt
9 Views

I see nothing wrong with these assumptions. For good measure, what happens if you make the reducer noncopyable (hide otherwise implicitly declared and defined copy constructor and copy assignment operator), and perhaps don't use a default constructor?

0 Kudos
Highlighted
New Contributor I
9 Views

Thanks Raf. Yes. My more complete test case tries to ensure that the body class isn't copyable, etc. I'll post a test case shortly for others to play around with and file a proper bug.
0 Kudos
Highlighted
New Contributor I
9 Views

Here is the example I mentioned. It's simply intended to show what I think is a bug. It doesn't do any useful work.

The intent is that the main thread spawns a bunch of tasks that either call parallel_reduce themselves or spawn another set of tasks that call parallel_reduce or spawn tasks etc. etc. The goal is to get the scheduler to have a few threads stealing tasks while at least one parallel_reduce is going on. This is the solution I came up with first.


#include 
#include 
#include 
#include 
#include 

#include 
#include 
#include 
#include 

/* stay busy for a little while */
void twiddle_thumbs(int duration) {
  volatile double value = 1;
  for (int i = 0; i < duration*1000; ++i) {
    value *= cos(i) + i;
  }
}

/* Is it a bug to have the call operator and join active at the same time? The
 * docs seem to indicate that it is.  
 *
 * Validate that it is ok to have operator() and join() without a lock. If
 * these two methods are ever called at the same time print a message and
 * abort.  Use a mutex to do the validation because the work-around is to lock
 * access to instance variables.
 */

class reducer {
  tbb::mutex m_mutex;

  /* noncopyable */
  reducer(reducer const& other);
  reducer& operator=(reducer const& other);

  /* assert that the caller has exclusive access to instance variables */
  void assert_acquire(char const* s, tbb::mutex::scoped_lock& lock) {
    if (!lock.try_acquire(m_mutex)) {
      // DEBUG: If the lock is already taken then we've discovered
      // that either our assumptions are incorrect or there is a bug.
      fprintf(stderr, "%s called while lock is activen", s);
      abort();
      // the presumed work-around if the two lines above are removed.
      lock.acquire(m_mutex);
    }    
  }

public:
  reducer() : m_mutex() { }
  
  reducer(reducer& other, tbb::split) : m_mutex() { }
  
  ~reducer() { 
    tbb::mutex::scoped_lock lock;
    assert_acquire("~reducer()", lock);    
  }
  
  template 
  void operator()(T const& range) {
    tbb::mutex::scoped_lock lock;
    assert_acquire("operator()", lock);
    
    /* iterate through the range work to stay busy */
    typedef typename T::const_iterator const_iterator;
    for (const_iterator it = range.begin(); it != range.end(); ++it) {
      twiddle_thumbs(1);
    }
  }

  void join(reducer& other) {
    tbb::mutex::scoped_lock lock;
    assert_acquire("join()", lock);    

    /* stay busy for a little while */
    twiddle_thumbs(1000);
  }
};

/* Forward declaration. See below. */
template 
void spawn_reduce_task_and_wait(int chain_length, I begin, I end);

/* This class chains together root tasks that eventually end up calling
 * parallel_reduce.  The goal is to get tasks in the scheduler that can engange
 * in task stealing. 
 *
 * This would probably be better implemented as two classes.  One for the
 * generic function of spawning another task and one for calling
 * parallel_reduce.  Hopefully the double usage doesn't obscure the intent.
 */
template
struct reduce_task : tbb::task {
  I m_begin;
  I m_end;
  int m_chain_length;

  void do_reduce() {
    tbb::blocked_range range(m_begin, m_end);
    reducer body;
    tbb::parallel_reduce(range, body, tbb::auto_partitioner());    
  }

  reduce_task(int chain_length, I begin, I end)
    : m_begin(begin)
    , m_end(end)
    , m_chain_length(chain_length) 
  { }
  
  tbb::task *execute() {
    if (m_chain_length <= 0) {
        do_reduce();
    } else {
        spawn_reduce_task_and_wait(--m_chain_length, m_begin, m_end);
    }
    return 0;
  }
};

/* Wait for a task to complete. */
struct join_tasks {
  void operator()(tbb::task *task) {
    if (task) {
      task->wait_for_all();
      task->destroy(*task);
    }        
  }
};

/* Queue up a parallel_reduce. NOTE: we use root tasks to add work to the
 * global queue of tasks that the scheduler can process.  These tasks are
 * semi-independent so we don't use any of the more complex task chaining or
 * continuation constructs within the task interface. */
template 
tbb::task* spawn_reduce_task(int chain_length, I begin, I end) {
  tbb::task *root_task = new(tbb::task::allocate_root()) tbb::empty_task;

  tbb::task *work_task = new(root_task->allocate_child()) 
    reduce_task(chain_length, begin, end);

  /* one for the root and one for the work */
  root_task->set_ref_count(2); 
  root_task->spawn(*work_task);
  return root_task;
}

/* Spawn a chain of reduce tasks and wait for all of them to complete */
template 
void spawn_reduce_task_and_wait(int chain_length, I begin, I end) {
  tbb::task *task = spawn_reduce_task(chain_length, begin, end);
  join_tasks()(task);
}


int main(int argc, char** argv)
{
  tbb::task_scheduler_init init(tbb::task_scheduler_init::automatic);

  /* The problem seems to have something to do with the depth of the recursion
   * in our root tasks. This number controls the number of reductions that are
   * eventually completed and also the maximum number of root tasks spawned in
   * order to get there.
   */
  int const NUM_REDUCE_TASKS = 6;

  int const NUM_OBJECTS = 1000;

  /* Generic data for the reduction. */
  std::vector nums(NUM_OBJECTS);

  /* Lanch a bunch of reductions in parallel using wrapper tasks to
   * queue them up.  The reductions may happen within one level of
   * tasks or a few levels deep. (See spawn_reduce_task).
   */    
  while (1) { 
    std::vector<:TASK> tasks;

    /* Spawn parallel_reduce but queue them up so they'll get serviced as
     * processors are free. */
    for (int i = 0; i < NUM_REDUCE_TASKS; ++i) {
        tasks.push_back( spawn_reduce_task(i, nums.begin(), nums.end()) );
    }

    /* Wait for all of the reductions to complete. */
    std::for_each(tasks.begin(), tasks.end(), join_tasks());

    fprintf(stdout, ".");
    fflush(stdout);
  }

  return 0;
}
0 Kudos
Highlighted
9 Views

Quoting - adunsmoor
I believe that the following assertions should always be true. However, I think I've found a case where they aren't. (The "bug" has to do with task stealing. I'll post the full test case once I figure out how to properly paste in source code.)

class reducer {
bool m_accessing_local_data;

public:
reducer() : m_accessing_local_data(false) {}
reducer(reducer &other, tbb::split): m_accessing_local_data(false) {}

void join(reducer &other) {
assert(!accessing_local_data);
accessing_local_data = true;
// merge other's data
accessing_local_data = false;
}

void operator()(/*...*/) {
assert(!accessing_local_data);
accessing_local_data = true;
// operate on the range
accessing_local_data = false;
}
};

Right, the assertions like that should hold.
I added similar guards to test_parallel_reduce, and run it for 1 to 4 threads on 2-core machine. It passed (for 100 times).
Note that your pseudocode uses two different names, with and without m_ prefix. I guess it's a misprint, and you actually meant the same class data member.
0 Kudos
Highlighted
9 Views

Quoting - adunsmoor
Here is the example I mentioned. It's simply intended to show what I think is a bug. It doesn't do any useful work.

I compiled it and run on two machines, dual core laptop and 8-core workstation both running Windows XP. So far both work with no problem, each printed for about 500 dots.

What is your environment, and how does the problem look like - does the test appear hanging, or crash, or what?
0 Kudos
Highlighted
New Contributor I
9 Views


Right, the assertions like that should hold.
I added similar guards to test_parallel_reduce, and run it for 1 to 4 threads on 2-core machine. It passed (for 100 times).
Note that your pseudocode uses two different names, with and without m_ prefix. I guess it's a misprint, and you actually meant the same class data member.

Thanks. That's what I get for trying to write code in a web form. Yes, I meant for the variable names to be the same in the snippet.

I took a look at the test you mentioned. The main difference between that and the full example I posted above is that I have to chain a bunch of tasks together in order to get the problem to occur.


0 Kudos
Highlighted
New Contributor I
9 Views

I'm running this test on a Redhat 4 Linux machine with 8 processors. Here is some info from gdb for one test where it aborted very early on. I'm using tbb21_015.


(gdb) info threads
6 Thread 1094732128 (LWP 11283) 0x000000311211f448 in cos () from /lib64/tls/libm.so.6
5 Thread 1090533728 (LWP 11282) 0x000000311211ebd8 in cos () from /lib64/tls/libm.so.6
4 Thread 1086335328 (LWP 11281) 0x000000311270af8b in __lll_mutex_lock_wait () from /lib64/tls/libpthread.so.0
3 Thread 1082136928 (LWP 11280) 0x000000311211f0ef in cos () from /lib64/tls/libm.so.6
2 Thread 1077938528 (LWP 11279) 0x000000311211f160 in cos () from /lib64/tls/libm.so.6
* 1 Thread 182895621920 (LWP 11021) 0x0000003111e2e25d in raise () from /lib64/tls/libc.so.6

(gdb) where
#0 0x0000003111e2e25d in raise () from /lib64/tls/libc.so.6
#1 0x0000003111e2fa5e in abort () from /lib64/tls/libc.so.6
#2 0x0000000000403106 in reducer::assert_acquire (this=0x2a958bf1d8, s=0x404442 "join()", lock=@0x7fbfffcf30) at main.cc:44
#3 0x0000000000403fbc in reducer::join (this=0x2a958bf1d8, other=@0x2a958b37d8) at main.cc:74
#4 0x0000000000403f80 in tbb::internal::finish_reduce::execute (this=0x2a958b37c0) at test/external/include/tbb/parallel_reduce.h:72
#5 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
#6 0x00000000004020a1 in tbb::task::wait_for_all (this=0x2a958d5e40) at test/external/include/tbb/task.h:521
#7 0x0000000000402036 in join_tasks::operator() (this=0x7fbfffd51f, task=0x2a958d5e40) at main.cc:125
#8 0x00000000004031ba in spawn_reduce_task_and_wait<> (chain_length=0, begin={_M_current = 0x50a1d0}, end={_M_current = 0x50b170}) at main.cc:152
#9 0x0000000000402f6a in reduce_task<>::execute (this=0x2a958d5fc0) at main.cc:115
#10 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
#11 0x00000000004020a1 in tbb::task::wait_for_all (this=0x2a958d6140) at test/external/include/tbb/task.h:521
#12 0x0000000000402036 in join_tasks::operator() (this=0x7fbfffdaff, task=0x2a958d6140) at main.cc:125
#13 0x00000000004031ba in spawn_reduce_task_and_wait<> (chain_length=1, begin={_M_current = 0x50a1d0}, end={_M_current = 0x50b170}) at main.cc:152
#14 0x0000000000402f6a in reduce_task<>::execute (this=0x2a958d62c0) at main.cc:115
#15 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
#16 0x00000000004020a1 in tbb::task::wait_for_all (this=0x2a958d6440) at test/external/include/tbb/task.h:521
#17 0x0000000000402036 in join_tasks::operator() (this=0x7fbfffe0df, task=0x2a958d6440) at main.cc:125
#18 0x00000000004031ba in spawn_reduce_task_and_wait<> (chain_length=2, begin={_M_current = 0x50a1d0}, end={_M_current = 0x50b170}) at main.cc:152
#19 0x0000000000402f6a in reduce_task<>::execute (this=0x2a958d65c0) at main.cc:115
#20 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
#21 0x00000000004020a1 in tbb::task::wait_for_all (this=0x2a958d6740) at test/external/include/tbb/task.h:521
#22 0x0000000000402036 in join_tasks::operator() (this=0x7fbfffe6bf, task=0x2a958d6740) at main.cc:125
#23 0x00000000004031ba in spawn_reduce_task_and_wait<> (chain_length=3, begin={_M_current = 0x50a1d0}, end={_M_current = 0x50b170}) at main.cc:152
#24 0x0000000000402f6a in reduce_task<>::execute (this=0x2a958d68c0) at main.cc:115
#25 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
#26 0x00000000004020a1 in tbb::task::wait_for_all (this=0x2a958d6a40) at test/external/include/tbb/task.h:521
#27 0x0000000000402036 in join_tasks::operator() (this=0x7fbfffec9f, task=0x2a958d6a40) at main.cc:125
#28 0x00000000004031ba in spawn_reduce_task_and_wait<> (chain_length=4, begin={_M_current = 0x50a1d0}, end={_M_current = 0x50b170}) at main.cc:152
#29 0x0000000000402f6a in reduce_task<>::execute (this=0x2a958d6bc0) at main.cc:115
#30 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
#31 0x00000000004020a1 in tbb::task::wait_for_all (this=0x2a958d7c40) at test/external/include/tbb/task.h:521
#32 0x0000000000402036 in join_tasks::operator() (this=0x7fbffff2a0, task=0x2a958d7c40) at main.cc:125
#33 0x0000000000402004 in std::for_each<, join_tasks> (__first={_M_current = 0x50b1f0}, __last={_M_current = 0x50b220}, __f={}) at /usr/lib/gcc/x86_64-redhat-linux/3.4.6/../../../../include/c++/3.4.6/bits/stl_algo.h:158
#34 0x000000000040195b in main (argc=1, argv=0x7fbffff458) at main.cc:186

(gdb) thread apply all where 5

Thread 6 (Thread 1094732128 (LWP 11283)):
#0 0x000000311211f448 in cos () from /lib64/tls/libm.so.6
#1 0x00000000004017f0 in twiddle_thumbs (duration=1000) at main.cc:18
#2 0x0000000000403fc6 in reducer::join (this=0x2a958a71d8, other=@0x2a958b3dd8) at main.cc:77
#3 0x0000000000403f80 in tbb::internal::finish_reduce::execute (this=0x2a958b3dc0) at test/external/include/tbb/parallel_reduce.h:72
#4 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
(More stack frames follow...)

Thread 5 (Thread 1090533728 (LWP 11282)):
#0 0x000000311211ebd8 in cos () from /lib64/tls/libm.so.6
#1 0x00000000004017f0 in twiddle_thumbs (duration=1000) at main.cc:18
#2 0x0000000000403fc6 in reducer::join (this=0x41001970, other=@0x2a958be8d8) at main.cc:77
#3 0x0000000000403f80 in tbb::internal::finish_reduce::execute (this=0x2a958be8c0) at test/external/include/tbb/parallel_reduce.h:72
#4 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
(More stack frames follow...)

Thread 4 (Thread 1086335328 (LWP 11281)):
#0 0x000000311270af8b in __lll_mutex_lock_wait () from /lib64/tls/libpthread.so.0
#1 0x0000000000000002 in ?? ()
#2 0x0000000040c029f0 in ?? ()
#3 0x0000003112706e10 in ?? () from /lib64/tls/libpthread.so.0
#4 0x0000000000000000 in ?? ()

Thread 3 (Thread 1082136928 (LWP 11280)):
#0 0x000000311211f0ef in cos () from /lib64/tls/libm.so.6
#1 0x00000000004017f0 in twiddle_thumbs (duration=1) at main.cc:18
#2 0x0000000000403b59 in reducer::operator()<:BLOCKED_RANGE><> > (this=0x2a958bf1d8, range=@0x2a958b3950) at main.cc:68
#3 0x0000000000403688 in tbb::internal::start_reduce<:BLOCKED_RANGE><>, reducer, tbb::auto_partitioner>::execute (this=0x2a958b3940) at test/external/include/tbb/parallel_reduce.h:141
#4 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
(More stack frames follow...)

Thread 2 (Thread 1077938528 (LWP 11279)):
#0 0x000000311211f160 in cos () from /lib64/tls/libm.so.6
#1 0x00000000004017f0 in twiddle_thumbs (duration=1) at main.cc:18
#2 0x0000000000403b59 in reducer::operator()<:BLOCKED_RANGE><> > (this=0x2a958b31d8, range=@0x2a958cb650) at main.cc:68
#3 0x0000000000403688 in tbb::internal::start_reduce<:BLOCKED_RANGE><>, reducer, tbb::auto_partitioner>::execute (this=0x2a958cb640) at test/external/include/tbb/parallel_reduce.h:141
#4 0x0000002a955855d0 in try_end_0_10 () at ../../src/tbb/task.cpp:2292
(More stack frames follow...)

Thread 1 (Thread 182895621920 (LWP 11021)):
#0 0x0000003111e2e25d in raise () from /lib64/tls/libc.so.6
#1 0x0000003111e2fa5e in abort () from /lib64/tls/libc.so.6
#2 0x0000000000403106 in reducer::assert_acquire (this=0x2a958bf1d8, s=0x404442 "join()", lock=@0x7fbfffcf30) at main.cc:44
#3 0x0000000000403fbc in reducer::join (this=0x2a958bf1d8, other=@0x2a958b37d8) at main.cc:74
#4 0x0000000000403f80 in tbb::internal::finish_reduce::execute (this=0x2a958b37c0) at test/external/include/tbb/parallel_reduce.h:72
(More stack frames follow...)

0 Kudos
Highlighted
9 Views

Quoting - adunsmoor
I'm running this test on a Redhat 4 Linux machine with 8 processors. Here is some info from gdb for one test where it aborted very early on. I'm using tbb21_015.


I am able to reproduce it now, on Windows too. I will look at it. Thanks for reporting the issue!
0 Kudos
Highlighted
9 Views

I am able to reproduce it now, on Windows too. I will look at it. Thanks for reporting the issue!
Using parallel_reduce with affinity_partitioner should not have this problem. You might try it as a temporary workaround.
0 Kudos