- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

Hi,

I have no experience with mt programming other than with tbb and there i a good chance that i am asking something very naive/not-very-smart.

I have used parallel_reduce to obtain various statistics of vector numerical expressions (like mean and variance).

All seems to work wel, when I have a single random variable (RV).

Tried to extend this for many RVs. However, I notice that two consecutive runs of the calculator give very different results ( not explainable by the order of calculation).

The idea is, that one does a single sweep fo all RVs and calculates per thread a range of te indices.

The attached code snippet is a slimmed down version of my actual app.

there is a buffer of all the random numbers (C-format mcSize x nbrRVs) and "Vector"s (in reality views to this buffer).

Each thread has to have a local buffer equal to the number of RVs. At join the "local" class adds on the buffer of the call arg to the one of the caller.

In the global, everything is added to an external buffer.

Can someone help?

(the effort with the "global" buffer would be a nice to understand - i did not expect this tobe an issue since there is no read/write mixing. Also, any stl sructure hasbeen stripped off in an effort to exclude problem that could come from the stl thread safety).

Thank you in advance for your help,

Petros

ps: apologies, for some reason the Add Files button does nothing ;-))

[cpp]

#define MAXFACTORS 100 //stl #includeusing namespace std; //tbb #include "tbb\parallel_reduce.h" #include "tbb\blocked_range.h" //define a vector to be a view to data: class Vector{ public: size_t size_; size_t stride_; double * data_; Vector() {} Vector(size_t size, double * const pdata, size_t stride ) :size_(size), data_(pdata), stride_(stride) {} ~Vector() {} double & operator[]( const size_t i ){ return *(data_ + i* stride_ ); } double const & operator[]( const size_t i ) const { return *(data_ + i* stride_ ); } }; //define the class to be used by tbb: class moment_calculator_local{ private: Vector * vars_; const size_t size_; public: double m1_[MAXFACTORS]; moment_calculator_local( Vector * const vars, const size_t size ) :vars_( vars ), size_(size) { for ( size_t i = 0; i != size_; ++i ) m1_ = 0L; } ~moment_calculator_local() {} const size_t size() const { return size_;} void operator()( const tbb::blocked_range< size_t > & r ) { const size_t nVars = size_; size_t jVar; for( size_t i = r.begin(); i != r.end(); ++i ) { for ( jVar = 0; jVar != nVars; ++jVar ) { const Vector & view_j = vars_[jVar]; const double var_j_i = view_j; m1_[ jVar ] += var_j_i ; } } } moment_calculator_local( moment_calculator_local & ec, tbb::split & splt ) :vars_( ec.vars_ ), size_(ec.size_){ memcpy( m1_, ec.m1_, ec.size_* sizeof(double)/sizeof(char) ); } void join ( const moment_calculator_local & ec ) { size_t nVars = size_; for ( size_t j = 0; j != nVars; ++j ) m1_+= ec.m1_ ; } }; class moment_calculator_global{ private: Vector * vars_; const size_t size_; double * const tgt_; public: double m1_[MAXFACTORS]; moment_calculator_global( Vector * const vars, const size_t size, double * const tgt ) :vars_( vars ), size_(size), tgt_(tgt) { for ( size_t i = 0; i != size_; ++i ) m1_ = 0L; } ~moment_calculator_global() {} const size_t size() const { return size_;} void operator()( const tbb::blocked_range< size_t > & r ) { const size_t nVars = size_; size_t jVar; for( size_t i = r.begin(); i != r.end(); ++i ) { for ( jVar = 0; jVar != nVars; ++jVar ) { const Vector & view_j = vars_[jVar]; const double var_j_i = view_j; m1_[ jVar ] += var_j_i ; } } } moment_calculator_global( moment_calculator_global & ec, tbb::split & splt ) :vars_( ec.vars_ ), size_(ec.size_), tgt_(ec.tgt_){ memcpy( m1_, ec.m1_, ec.size_* sizeof(double)/sizeof(char) ); } void join ( const moment_calculator_global & ec ) { size_t nVars = size_; for ( size_t j = 0; j != nVars; ++j ) tgt_+= ec.m1_ ; } }; int main(){ const size_t dimension = 5; const size_t mcSize = 10000; const size_t sampleSize = dimension * mcSize; double * pdata = new double[ sampleSize ]; //create uniform random data in [0,1] for ( size_t i = 0; i != sampleSize; ++i ) pdata = double(rand())/double(RAND_MAX) - 0.5; //create Vectors corresponding to the columns of the sample Vector vars[dimension]; for (size_t i = 0; i != dimension; ++i ) vars= Vector( mcSize, pdata + i, dimension ); moment_calculator_local mc( vars, dimension ); tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), mc, tbb::auto_partitioner() ); moment_calculator_local mc2( vars, dimension ); tbb::parallel_reduce( tbb::blocked_range (0, mcSize, 1000 ), mc2, tbb::auto_partitioner() ); cout << "Adding the results locally:" << endl; cout << "\t1st:\t\t2nd:" << endl; cout << "\t===:\t\t===:" << endl; for ( size_t i = 0; i != dimension; ++i ) cout << i << "\t" << mc.m1_ << "\t\t" << mc2.m1_<< endl; cout << endl; double m1Tgt[dimension]; std::fill( m1Tgt, m1Tgt + dimension, 0.L ); double m2Tgt[dimension]; std::fill( m2Tgt, m2Tgt + dimension, 0.L ); moment_calculator_global mc21( vars, dimension, m1Tgt ); tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), mc21, tbb::auto_partitioner() ); moment_calculator_global mc22( vars, dimension, m2Tgt ); tbb::parallel_reduce( tbb::blocked_range (0, mcSize, 1000 ), mc22, tbb::auto_partitioner() ); cout << "Adding the results to gloabal target buffer:" << endl; cout << "\t1st:\t\t2nd:" << endl; cout << "\t===:\t\t===:" << endl; for ( size_t i = 0; i != dimension; ++i ) cout << i << "\t" << m1Tgt << "\t\t" << m2Tgt<< endl; return 1; } [/cpp]

Link Copied

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

- voidoperator()(consttbb::blocked_range
&r){ - constsize_tnVars=size_;
- size_tjVar;
- for(size_ti=r.begin();i!=r.end();++i){
- for(
**jVar**=0;jVar!=nVars;**++jVar**){ - constVector&view_j=vars_[jVar];
- constdoublevar_j_i=view_j
*;* - m1_[
**jVar**]+=var_j_i; - }
- }
- }

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

I don't want to insist but maybe you could help me understand something. I was assuming that the body of the operator()(range..) was run by a single thread that holds an instantiation of the class. If this is correct then the variables jVar and the class variables m1_ would be accessible only by the thread, that hosts the class object.

Where am I wrong?

Thank you again for your response.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

(Replaced) Sorry, please forget what you may have read here before.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

Thank you for responding. May I suggest that the indication "topic replied" is changed, so hopefully someonewith an idea to suggest picks it up? Thx again, P-

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

- Body splitting may happen concurrently with the operator function execution. As the result the split off body will get a copy of partially processed m1_. Besides memcpy() depending on its implementation may copy machine words non-atomically, so you may even get garbage in some of m1_ elements.
- Array tgt_ seems to be shared between all body instances, and as joins on different pairs of bodies can happen concurrently, there is a race on its elements.

And finally, a general performance note, using large member arrays in bodies is definitely a bad idea. Especially in case of parallel_for that does splitting much more often than parallel_reduce. Though even with parallel_reduce such copying may substantially increase overhead.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

Thank you very much fo the response!

On 2) you are right, apologies, my bad. When I slimmed down my code, I omitted to put it on the operator(range..) part. The whole idea was tohave a moment calculator that would traverse vectors(and vector expressions) only once and accelerate evaluation. I am surprised that there is a data race problem, since there is only writting to this buffer - not reading. Apparently, I am too green on this ;-), but the issue seems to be the granularity of the thread safety of the write operation.

On, 1) seems to imply that there is no way to use parallel_reduce for more than one targets? I have used very similar code extensively for one target with no problem.

Wouldn't the fact that body splitting may happen concurrently with the operator function execution present a problem for any use of parallel_reduce?

To my mind, this would imply that it is the user's responsibility to guard for concurrency issues, which is really something that tbb was supposed to do since it runs this code in one thread. So, if it splits then it should protect all the data when it copies them across, no?

Otherwise, I really don't see how one can then use the operator on a blocked range safely.

Is it possible that this data integrity is not maintained for arrays?

The memcpy observation had actually never occured to me ( I trusted that I am using the mt rt - ms vs2010.).

But then how can one ever trust function calls from an mt crt?

Finally, thank you for all the advice on arrays within threads, I really appreciate the effort. Iwas suspectingthat this would be an issue, but I was hoping either for the "global" version to work, or to do it with only a few threads (i.e. segment the range coarsly).

Thank you very much for all your help, P-.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

One more thing.

I did manage to run successfully the "global" version, by utilizing a queuing_rw_mutex (had to exchange the order of the loops, so the mutex blocking happens on a per array occurence, instead of per range element).

Thank you very much again, P-

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

*"I am surprised that there is a data race problem, since there is only writting to this buffer - not reading."*

A race is simultaneous access among operations, at least one of which is a write, so two writes to the same location make a race.

If the body needs a vector to perform its function, then so be it, but try to keep it minimal by not using MAXFACTOR=100 places if 5 is enough.

If I overlooked or misinterpreted anything essential, do tell.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

BTW, is there a concept/availability of tls in tbb? I saw some comments on the web saying no, but these were a bit old and hope always dies last.

Sadly enough, the solution with the mutex seems to have issues as well. Besides, the problem that it deadlocks sometimes (which is an indication that the design choice is bad bad bad ), if it is not an imposition, I wish I could understand why (leaving aside, for now,the suboptimality of the design choice ).Below is the code with the mutex (in the

main, repeated attempts of rerunnung the tasks are made until failure, which is then reported).

Also tried the concurrent_vector as the "global" buffer, but this failed as well (almost expectedly, but hoping ..).

Thank you for all the help.

[cpp]#define MAXFACTORS 100 //stl #include#include using namespace std; //tbb #include #include #include #include typedef tbb::concurrent_vector TBBVectorDouble; typedef tbb::spin_rw_mutex MyMtx; //define a vector to be a view to data: class Vector{ public: size_t size_; size_t stride_; double * data_; Vector() {} Vector(size_t size, double * const pdata, size_t stride ) :size_(size), data_(pdata), stride_(stride) {} ~Vector() {} double & operator[]( const size_t i ){ return *(data_ + i* stride_ ); } double const & operator[]( const size_t i ) const { return *(data_ + i* stride_ ); } }; //A class that uses the mutex: class moment_calculator_mtx{ private: Vector * vars_; const size_t size_; double * const tgt_; MyMtx ossMutex_; public: double m1_[MAXFACTORS]; moment_calculator_mtx( Vector * const vars, const size_t size, double * const tgt, MyMtx & ossMutex ) :vars_( vars ), size_(size), tgt_(tgt), ossMutex_(ossMutex) { for ( size_t i = 0; i != size_; ++i ) m1_ = 0L; } ~moment_calculator_mtx(){} void operator()( const tbb::blocked_range< size_t > & r ) { const size_t nVars = size_; size_t jVar; for ( jVar = 0; jVar != nVars; ++jVar ) { const Vector & view_j = vars_[jVar]; double localsum = 0; for( size_t i = r.begin(); i != r.end(); ++i ) { const double var_j_i = view_j; localsum += var_j_i ; } MyMtx::scoped_lock writeLock; writeLock.acquire( ossMutex_, true ); tgt_[ jVar ] += localsum; writeLock.release(); } } moment_calculator_mtx( moment_calculator_mtx & ec, tbb::split & splt ) :vars_( ec.vars_ ), size_(ec.size_), tgt_(ec.tgt_), ossMutex_( ec.ossMutex_ ) {} void join ( const moment_calculator_mtx & ec ) {} }; MyMtx ossMutex1, ossMutex2; int main(int argc, char * argv[] ){ const size_t dimension = 5; const size_t mcSize =20000; const size_t sampleSize = dimension * mcSize; double * pdata = new double[ sampleSize ]; //create uniform random data in [0,1] for ( size_t i = 0; i != sampleSize; ++i ) pdata= double(rand())/double(RAND_MAX) - 0.5; //create Vectors corresponding to the columns of the sample Vector vars[dimension]; for (size_t i = 0; i != dimension; ++i ) vars= Vector( mcSize, pdata + i, dimension ); // U S I N G T H E M U T E X A P P R O A C H : //initialize the target buffers: double m1Tgt[dimension]; std::fill( m1Tgt, m1Tgt + dimension, 0.L ); double m2Tgt[dimension]; // 1st calculation: moment_calculator_mtx mcMtx1( vars, dimension, m1Tgt, ossMutex1 ); tbb::parallel_reduce( tbb::blocked_range(0, mcSize, 1000 ), mcMtx1, tbb::auto_partitioner() ); // 2nd calculation: moment_calculator_mtx *pmcMtx2; size_t i; for ( i = 0; i != 1000; ++i ){ cout << i << endl; pmcMtx2 = new moment_calculator_mtx( vars, dimension, m2Tgt, ossMutex2 ); moment_calculator_mtx & mcMtx2 = *pmcMtx2; std::fill( m2Tgt, m2Tgt + dimension, 0.L ); tbb::parallel_reduce( tbb::blocked_range (0, mcSize, 1000 ), mcMtx2, tbb::auto_partitioner() ); bool bIsDifferent = false; size_t id; for ( id = 0; id != dimension; ++ id ){ if ( fabs( m2Tgt[id] - m1Tgt[id] ) > 10E-10){ bIsDifferent = true; break; } } if ( bIsDifferent ){ cout << "found difference on the " << i + 1 << " iteration for the " << id +1 << " dimension ( index = " << id <<" )"; cout << endl << endl; break; } else { delete pmcMtx2; } } if ( i == 1000 ) cout << "Succeeded!! passed the 1000 waves test!!" << endl << endl; cout << "Adding the results to global target buffer (mutex):" << endl; cout << "t1st:tt2nd:" << endl; cout << "t===:tt===:" << endl; for ( size_t i = 0; i != dimension; ++i ) cout << i << "t" << m1Tgt << "tt" << m2Tgt<< endl; cout << "the single threaded result is:" << endl; vectorstsums(dimension, 0); for ( size_t i = 0; i != dimension; ++i ) for ( size_t ii = 0; ii != mcSize; ++ii ) stsums += vars[ii]; for ( size_t i = 0; i != dimension; ++i ) cout << i << "t" << stsums<< endl; return 1; } [/cpp]

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

You do not need m1_ in the body class at all. And tgt_ should become a zero-initialized member array just as m1_ is now (you should try to initialize tgt_ with a pointer to a global array). You do not need mutex either. Just return the reduction in the join() method where it was originally.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

Thank you for the response. However, I am confused now. In order to return the reduction in the join, I have to put it in a buffer (like m1_) , when the operator(range..) is called.

You don't mean to perform the sumation loops in the join, correct? O/wise, I have to let the join, know the limits of the sumation..

Thank you for your help.

Petros

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

(Added) Also some information may be lost with the global method (assuming the race is eliminated using atomics or locks, both quite slow compared to the local method), because only joined information is added to the global array.

(Replaced) Don't do the computations in the join, though! I think some miscommunication is going on there between you and Andrey.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

Thank you for your response. I agree I should stick with the local in the end.

Have a nice weekend. P-

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

I noticed that you dovarious computationsincluding generation of random numbers (discussed on MKL forum)and computation of statistical estimates like mean or variance.

Starting with MKL 10.3 Statistical Component of MKL is extended with Summary Statistics functionality intended for initial statistical analysis of the datasets. This technology includes various algorithms for estimation of statistical moments, covariance, quantiles, detection of outliers, robust estimation of correlation, stabilization of correlation. The algorithms are highly optimized/parallelized for multi-core CPUs. The additional details and descriptionof the feature are available in MKL Manual and Summary Statistics Application Notes http://software.intel.com/sites/products/documentation/hpc/mkl/ssl/sslnotes.pdf

You might want to evaluate this feature of MKL 10.3 from perspective of development and supportof your applications. Please, feel free to ask questions on Statistical Component of MKL on MKL web forum.

Best,

Andrey

- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page