Community
cancel
Showing results for 
Search instead for 
Did you mean: 
Petros_Mamales
Beginner
56 Views

Questio on parallel_reduce

Hi,

I have no experience with mt programming other than with tbb and there i a good chance that i am asking something very naive/not-very-smart.

I have used parallel_reduce to obtain various statistics of vector numerical expressions (like mean and variance).

All seems to work wel, when I have a single random variable (RV).

Tried to extend this for many RVs. However, I notice that two consecutive runs of the calculator give very different results ( not explainable by the order of calculation).

The idea is, that one does a single sweep fo all RVs and calculates per thread a range of te indices.

The attached code snippet is a slimmed down version of my actual app.

there is a buffer of all the random numbers (C-format mcSize x nbrRVs) and "Vector"s (in reality views to this buffer).

Each thread has to have a local buffer equal to the number of RVs. At join the "local" class adds on the buffer of the call arg to the one of the caller.

In the global, everything is added to an external buffer.

Can someone help?

(the effort with the "global" buffer would be a nice to understand - i did not expect this tobe an issue since there is no read/write mixing. Also, any stl sructure hasbeen stripped off in an effort to exclude problem that could come from the stl thread safety).

Thank you in advance for your help,

Petros
ps: apologies, for some reason the Add Files button does nothing ;-))

[cpp]





#define MAXFACTORS 100 //stl #include using namespace std; //tbb #include "tbb\parallel_reduce.h" #include "tbb\blocked_range.h" //define a vector to be a view to data: class Vector{ public: size_t size_; size_t stride_; double * data_; Vector() {} Vector(size_t size, double * const pdata, size_t stride ) :size_(size), data_(pdata), stride_(stride) {} ~Vector() {} double & operator[]( const size_t i ){ return *(data_ + i* stride_ ); } double const & operator[]( const size_t i ) const { return *(data_ + i* stride_ ); } }; //define the class to be used by tbb: class moment_calculator_local{ private: Vector * vars_; const size_t size_; public: double m1_[MAXFACTORS]; moment_calculator_local( Vector * const vars, const size_t size ) :vars_( vars ), size_(size) { for ( size_t i = 0; i != size_; ++i ) m1_ = 0L; } ~moment_calculator_local() {} const size_t size() const { return size_;} void operator()( const tbb::blocked_range< size_t > & r ) { const size_t nVars = size_; size_t jVar; for( size_t i = r.begin(); i != r.end(); ++i ) { for ( jVar = 0; jVar != nVars; ++jVar ) { const Vector & view_j = vars_[jVar]; const double var_j_i = view_j; m1_[ jVar ] += var_j_i ; } } } moment_calculator_local( moment_calculator_local & ec, tbb::split & splt ) :vars_( ec.vars_ ), size_(ec.size_){ memcpy( m1_, ec.m1_, ec.size_* sizeof(double)/sizeof(char) ); } void join ( const moment_calculator_local & ec ) { size_t nVars = size_; for ( size_t j = 0; j != nVars; ++j ) m1_ += ec.m1_; } }; class moment_calculator_global{ private: Vector * vars_; const size_t size_; double * const tgt_; public: double m1_[MAXFACTORS]; moment_calculator_global( Vector * const vars, const size_t size, double * const tgt ) :vars_( vars ), size_(size), tgt_(tgt) { for ( size_t i = 0; i != size_; ++i ) m1_ = 0L; } ~moment_calculator_global() {} const size_t size() const { return size_;} void operator()( const tbb::blocked_range< size_t > & r ) { const size_t nVars = size_; size_t jVar; for( size_t i = r.begin(); i != r.end(); ++i ) { for ( jVar = 0; jVar != nVars; ++jVar ) { const Vector & view_j = vars_[jVar]; const double var_j_i = view_j; m1_[ jVar ] += var_j_i ; } } } moment_calculator_global( moment_calculator_global & ec, tbb::split & splt ) :vars_( ec.vars_ ), size_(ec.size_), tgt_(ec.tgt_){ memcpy( m1_, ec.m1_, ec.size_* sizeof(double)/sizeof(char) ); } void join ( const moment_calculator_global & ec ) { size_t nVars = size_; for ( size_t j = 0; j != nVars; ++j ) tgt_ += ec.m1_; } }; int main(){ const size_t dimension = 5; const size_t mcSize = 10000; const size_t sampleSize = dimension * mcSize; double * pdata = new double[ sampleSize ]; //create uniform random data in [0,1] for ( size_t i = 0; i != sampleSize; ++i ) pdata = double(rand())/double(RAND_MAX) - 0.5; //create Vectors corresponding to the columns of the sample Vector vars[dimension]; for (size_t i = 0; i != dimension; ++i ) vars = Vector( mcSize, pdata + i, dimension ); moment_calculator_local mc( vars, dimension ); tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), mc, tbb::auto_partitioner() ); moment_calculator_local mc2( vars, dimension ); tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), mc2, tbb::auto_partitioner() ); cout << "Adding the results locally:" << endl; cout << "\t1st:\t\t2nd:" << endl; cout << "\t===:\t\t===:" << endl; for ( size_t i = 0; i != dimension; ++i ) cout << i << "\t" << mc.m1_ << "\t\t" << mc2.m1_ << endl; cout << endl; double m1Tgt[dimension]; std::fill( m1Tgt, m1Tgt + dimension, 0.L ); double m2Tgt[dimension]; std::fill( m2Tgt, m2Tgt + dimension, 0.L ); moment_calculator_global mc21( vars, dimension, m1Tgt ); tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), mc21, tbb::auto_partitioner() ); moment_calculator_global mc22( vars, dimension, m2Tgt ); tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), mc22, tbb::auto_partitioner() ); cout << "Adding the results to gloabal target buffer:" << endl; cout << "\t1st:\t\t2nd:" << endl; cout << "\t===:\t\t===:" << endl; for ( size_t i = 0; i != dimension; ++i ) cout << i << "\t" << m1Tgt << "\t\t" << m2Tgt << endl; return 1; } [/cpp]

0 Kudos
16 Replies
Andrey_Marochko
New Contributor III
56 Views

You apparently have a classic data race in your code. Look at your "global" functor:

  1. voidoperator()(consttbb::blocked_range&r){
  2. constsize_tnVars=size_;
  3. size_tjVar;
  4. for(size_ti=r.begin();i!=r.end();++i){
  5. for(jVar=0;jVar!=nVars;++jVar){
  6. constVector&view_j=vars_[jVar];
  7. constdoublevar_j_i=view_j;
  8. m1_[jVar]+=var_j_i;
  9. }
  10. }
  11. }
Variable jVar takes the values from the same range for all concurrently executed functor instances. And it indexes the elements being updated...

Petros_Mamales
Beginner
56 Views

Andrey, thank you for your response.
I don't want to insist but maybe you could help me understand something. I was assuming that the body of the operator()(range..) was run by a single thread that holds an instantiation of the class. If this is correct then the variables jVar and the class variables m1_ would be accessible only by the thread, that hosts the class object.
Where am I wrong?
Thank you again for your response.
RafSchietekat
Black Belt
56 Views

That assumption seems valid.

(Replaced) Sorry, please forget what you may have read here before.
Petros_Mamales
Beginner
56 Views

Hi Raf,
Thank you for responding. May I suggest that the indication "topic replied" is changed, so hopefully someonewith an idea to suggest picks it up? Thx again, P-
Andrey_Marochko
New Contributor III
56 Views

Ah, yes, as Raf said this assumption is correct. But there are still other races:

  1. Body splitting may happen concurrently with the operator function execution. As the result the split off body will get a copy of partially processed m1_. Besides memcpy() depending on its implementation may copy machine words non-atomically, so you may even get garbage in some of m1_ elements.
  2. Array tgt_ seems to be shared between all body instances, and as joins on different pairs of bodies can happen concurrently, there is a race on its elements.
At last I do not see how m1_ of the original body is joined with its tgt_ array.

And finally, a general performance note, using large member arrays in bodies is definitely a bad idea. Especially in case of parallel_for that does splitting much more often than parallel_reduce. Though even with parallel_reduce such copying may substantially increase overhead.
Petros_Mamales
Beginner
56 Views

Andrey,
Thank you very much fo the response!
On 2) you are right, apologies, my bad. When I slimmed down my code, I omitted to put it on the operator(range..) part. The whole idea was tohave a moment calculator that would traverse vectors(and vector expressions) only once and accelerate evaluation. I am surprised that there is a data race problem, since there is only writting to this buffer - not reading. Apparently, I am too green on this ;-), but the issue seems to be the granularity of the thread safety of the write operation.


On, 1) seems to imply that there is no way to use parallel_reduce for more than one targets? I have used very similar code extensively for one target with no problem.

Wouldn't the fact that body splitting may happen concurrently with the operator function execution present a problem for any use of parallel_reduce?

To my mind, this would imply that it is the user's responsibility to guard for concurrency issues, which is really something that tbb was supposed to do since it runs this code in one thread. So, if it splits then it should protect all the data when it copies them across, no?
Otherwise, I really don't see how one can then use the operator on a blocked range safely.
Is it possible that this data integrity is not maintained for arrays?

The memcpy observation had actually never occured to me ( I trusted that I am using the mt rt - ms vs2010.).
But then how can one ever trust function calls from an mt crt?

Finally, thank you for all the advice on arrays within threads, I really appreciate the effort. Iwas suspectingthat this would be an issue, but I was hoping either for the "global" version to work, or to do it with only a few threads (i.e. segment the range coarsly).

Thank you very much for all your help, P-.

Petros_Mamales
Beginner
56 Views

Andrey,
One more thing.
I did manage to run successfully the "global" version, by utilizing a queuing_rw_mutex (had to exchange the order of the loops, so the mutex blocking happens on a per array occurence, instead of per range element).
Thank you very much again, P-
RafSchietekat
Black Belt
56 Views

For 1), it seems that the split-constructed body should start out with zeroed values in m1_ (using memset), not with a copy from an undeterministic intermediate value from some other body (using memcpy). If the split constructor does not use any data that is being written at the same time by operator() or join() of the argument, only data that is constant for the duration of the parallel_reduce, there is no need to worry about a race.

"I am surprised that there is a data race problem, since there is only writting to this buffer - not reading."
A race is simultaneous access among operations, at least one of which is a write, so two writes to the same location make a race.

If the body needs a vector to perform its function, then so be it, but try to keep it minimal by not using MAXFACTOR=100 places if 5 is enough.

If I overlooked or misinterpreted anything essential, do tell.
Petros_Mamales
Beginner
56 Views

Hi Raf, Thank you very much for your remarks! I did correct the split ctor and the local version works too now. The MAXFACTOR 100 was just a placeholder when I slimmed down from an stl vector ;-)). And I really learned something useful.. Thank you both for all the help. Have a nice weekend
Andrey_Marochko
New Contributor III
56 Views

BTW, one more suggestion. You do not seem to need both m1_ and tgt_ members. Throw out tgt_ and use zero-initialized (as Raf suggested) m1_ array. After parallel_reduce finishes, mc22.m1_ will contain your reduction result.

Petros_Mamales
Beginner
56 Views

Andrey, thank you for the heads-up. Iknow there is a lot of cleaning I need to do, but this app is mainly a proof of concept.
BTW, is there a concept/availability of tls in tbb? I saw some comments on the web saying no, but these were a bit old and hope always dies last.
Sadly enough, the solution with the mutex seems to have issues as well. Besides, the problem that it deadlocks sometimes (which is an indication that the design choice is bad bad bad ), if it is not an imposition, I wish I could understand why (leaving aside, for now,the suboptimality of the design choice ).Below is the code with the mutex (in the
main, repeated attempts of rerunnung the tasks are made until failure, which is then reported).
Also tried the concurrent_vector as the "global" buffer, but this failed as well (almost expectedly, but hoping ..).
Thank you for all the help.

[cpp]#define MAXFACTORS 100
//stl
#include 
#include 
using namespace std;
//tbb
#include 
#include 
#include 
#include 

typedef tbb::concurrent_vector TBBVectorDouble;
typedef tbb::spin_rw_mutex MyMtx;

//define a vector to be a view to data:
class Vector{
public:
	size_t size_;
	size_t stride_;
	double * data_;

	Vector()
	{}
	Vector(size_t size, double * const pdata, size_t stride )
		:size_(size),
		data_(pdata),
		stride_(stride)
	{}
	~Vector()
	{}
	double & operator[]( const size_t i ){ return *(data_ + i* stride_ ); }
	double const & operator[]( const size_t i ) const { return *(data_ + i* stride_ ); }
};

//A class that uses the mutex:
class moment_calculator_mtx{
private:
	Vector * vars_;
	const size_t size_;
	double  * const tgt_;
	MyMtx ossMutex_;
public:
	double m1_[MAXFACTORS];
	moment_calculator_mtx( Vector * const vars, const size_t size, double  * const tgt, MyMtx & ossMutex )
		:vars_( vars ),
		size_(size),
		tgt_(tgt),
		ossMutex_(ossMutex) { 
			for ( size_t i = 0; i != size_; ++i ) m1_ = 0L;
	}
	~moment_calculator_mtx(){}
	void operator()( const tbb::blocked_range< size_t > & r ) {
		const size_t nVars = size_;
		size_t jVar;
		for ( jVar = 0; jVar != nVars; ++jVar ) {
			const Vector & view_j = vars_[jVar];
			double localsum = 0;
			for( size_t i = r.begin(); i != r.end(); ++i ) {
				const double var_j_i	= view_j; 
				localsum += var_j_i ;
			}
			MyMtx::scoped_lock writeLock;
			writeLock.acquire( ossMutex_, true );
			tgt_[ jVar ] += localsum;
			writeLock.release();
		}
	} 
	moment_calculator_mtx( moment_calculator_mtx & ec, tbb::split & splt )
		:vars_( ec.vars_ ),
		size_(ec.size_),
		tgt_(ec.tgt_),
		ossMutex_( ec.ossMutex_ )
	{}
	void join ( const moment_calculator_mtx & ec ) 
	{}
};

MyMtx ossMutex1, ossMutex2;

int main(int argc, char * argv[] ){
	const size_t dimension = 5;
	const size_t mcSize =20000;
	const size_t sampleSize = dimension * mcSize;
	double * pdata = new double[ sampleSize ];

//create uniform random data in [0,1]
	for ( size_t i = 0; i != sampleSize; ++i )
		pdata = double(rand())/double(RAND_MAX) - 0.5;
//create Vectors corresponding to the columns of the sample
	Vector vars[dimension];
	for  (size_t i = 0; i != dimension; ++i )
		vars = Vector( mcSize, pdata + i, dimension );

//	U S I N G	T H E	M U T E X	A P P R O A C H :
//initialize the target buffers:
	double m1Tgt[dimension]; std::fill( m1Tgt, m1Tgt + dimension, 0.L );
	double m2Tgt[dimension]; 
// 1st calculation:
	moment_calculator_mtx mcMtx1( vars, dimension, m1Tgt, ossMutex1 );
	tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), 
		mcMtx1, 
		tbb::auto_partitioner() );
// 2nd calculation:
	moment_calculator_mtx *pmcMtx2;
	size_t i;
	for ( i = 0; i != 1000; ++i ){
		cout << i << endl;
		pmcMtx2 = new moment_calculator_mtx( vars, dimension, m2Tgt, ossMutex2 );
		moment_calculator_mtx & mcMtx2 = *pmcMtx2;
		std::fill( m2Tgt, m2Tgt + dimension, 0.L );
		tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), 
			mcMtx2, 
			tbb::auto_partitioner() );
		bool bIsDifferent = false;
		size_t id;
		for ( id = 0; id != dimension; ++ id ){
			if ( fabs( m2Tgt[id] - m1Tgt[id] ) > 10E-10){
				bIsDifferent = true;
				break;
			}
		}
		if ( bIsDifferent ){
			cout << "found difference on the " << i + 1 << " iteration for the " << id +1 << " dimension ( index = " << id <<" )";
			cout << endl << endl;
			break;
		} else {
			delete pmcMtx2;
		}
	}
	if ( i == 1000 )
		cout << "Succeeded!! passed the 1000 waves test!!" << endl << endl;
	
	cout << "Adding the results to global target buffer (mutex):" << endl; 	
	cout << "t1st:tt2nd:" << endl;
	cout << "t===:tt===:" << endl;
	for ( size_t i = 0; i != dimension; ++i )
		cout << i << "t" << m1Tgt << "tt" << m2Tgt << endl;

	cout << "the single threaded result is:" << endl;
	vector stsums(dimension, 0);
	for ( size_t i = 0; i != dimension; ++i )
		for ( size_t ii = 0; ii != mcSize; ++ii )
			stsums += vars[ii];
	for ( size_t i = 0; i != dimension; ++i )
		cout << i << "t" << stsums << endl;

	return 1;
} 
[/cpp]
Andrey_Marochko
New Contributor III
56 Views

Well, you seem to have misunderstood my suggestion.

You do not need m1_ in the body class at all. And tgt_ should become a zero-initialized member array just as m1_ is now (you should try to initialize tgt_ with a pointer to a global array). You do not need mutex either. Just return the reduction in the join() method where it was originally.
Petros_Mamales
Beginner
56 Views

Andrey,
Thank you for the response. However, I am confused now. In order to return the reduction in the join, I have to put it in a buffer (like m1_) , when the operator(range..) is called.
You don't mean to perform the sumation loops in the join, correct? O/wise, I have to let the join, know the limits of the sumation..
Thank you for your help.
Petros
RafSchietekat
Black Belt
56 Views

It seems that you have two methods, a local one and a global one, and the global one has a race. You could solve that with atomics in the global one, but that only makes it correct, not performant. Stick with the local one.

(Added) Also some information may be lost with the global method (assuming the race is eliminated using atomics or locks, both quite slow compared to the local method), because only joined information is added to the global array.

(Replaced) Don't do the computations in the join, though! I think some miscommunication is going on there between you and Andrey.
Petros_Mamales
Beginner
56 Views

Raf,
Thank you for your response. I agree I should stick with the local in the end.
Have a nice weekend. P-
Andrey_N_Intel
Employee
56 Views

Hello Petros,

I noticed that you dovarious computationsincluding generation of random numbers (discussed on MKL forum)and computation of statistical estimates like mean or variance.
Starting with MKL 10.3 Statistical Component of MKL is extended with Summary Statistics functionality intended for initial statistical analysis of the datasets. This technology includes various algorithms for estimation of statistical moments, covariance, quantiles, detection of outliers, robust estimation of correlation, stabilization of correlation. The algorithms are highly optimized/parallelized for multi-core CPUs. The additional details and descriptionof the feature are available in MKL Manual and Summary Statistics Application Notes http://software.intel.com/sites/products/documentation/hpc/mkl/ssl/sslnotes.pdf

You might want to evaluate this feature of MKL 10.3 from perspective of development and supportof your applications. Please, feel free to ask questions on Statistical Component of MKL on MKL web forum.

Best,
Andrey
Reply