// inclusive_scan.cpp // // Input: A[] = {1,2,3,4,5}; // Output: B[] = {1,3,6,10,16}; #include "stdafx.h" #include #include #include #include #include int nThreadsMax = -1; // Init() int nThreadsGlobal = -1; // Varies during test const int CacheLineSize = 64; // hard wired, should determine const int VectorWidthAVXfloat = sizeof(__m256) / sizeof(float); const int floatsPerCacheLine = CacheLineSize / sizeof(float); unsigned int CacheSize_L1_KB = 0; // Init() unsigned int CacheSize_L2_KB = 0; // Init() unsigned int CacheSize_L3_KB = 0; // Init() unsigned int CacheSize_LLC_KB = 0; // Init() int CacheCutoffInFloats = 0; // Varies during test const int Nmax = 4096*4096; __declspec(align(CacheLineSize)) float Ainput[Nmax]; __declspec(align(CacheLineSize)) float BoutputReference[Nmax]; // Result array of Simple_inclusive_scan __declspec(align(CacheLineSize)) float Boutput[Nmax]; // Result array of variation under test // Once only initialization void Init() { if(nThreadsMax < 0) { #pragma omp parallel nThreadsGlobal = nThreadsMax = omp_get_num_threads(); CacheSize_L1_KB = __cacheSize(1); CacheSize_L2_KB = __cacheSize(2); CacheSize_L3_KB = __cacheSize(3); CacheSize_LLC_KB = __cacheSize(4); if(CacheSize_LLC_KB == 0) { if((CacheSize_LLC_KB = CacheSize_L3_KB) == 0) { if((CacheSize_LLC_KB = CacheSize_L2_KB) == 0) { CacheSize_LLC_KB = CacheSize_L1_KB; } } } } } // Used in traditional manner to produce the inclusive scan // Returns the last value float Simple_inclusive_scan(float b[], float a[], int n) { __assume_aligned(a,CacheLineSize); __assume_aligned(b,CacheLineSize); float carry = 0.0f; for(int i=0; i < n; ++i) { b[i] = (carry += a[i]); } return carry; } // Used by Parallel_Simple_inclusive_scan to propigate carry // Returns the last value float Simple_inclusive_scan_Carry(float b[], int n, float carryIn) { __assume_aligned(b,CacheLineSize); for(int i=0; i < n; ++i) { b[i] += carryIn; } return b[n-1]; } // Parallel version of Simple_inclusive_scan // Returns the last value float Parallel_Simple_inclusive_scan(float b[], float a[], int n) { __assume_aligned(a,CacheLineSize); __assume_aligned(b,CacheLineSize); int nCacheLines = n / floatsPerCacheLine; // (there may be a partial remainder) int MinimumCacheLinesPerThread = 1; // change this later to most efficient int ThreadsToUse = std::min(nThreadsGlobal, nCacheLines / MinimumCacheLinesPerThread); // See if more efficient to use 1 thread if(ThreadsToUse <= 1) return Simple_inclusive_scan(b, a, n); // Use more than one thread // b[] and a[] are partitioned into ThreadsToUse number of partitions // Each thread to perfrom Simple_inclusive_scan and place results into float PartitionSums[ThreadsToUse]; // List of done flages for ThreadsToUse number of partitions std::atomic PartitionSequenceDone[ThreadsToUse]; // Initialize the PartitionSums and PartitionSequenceDone indicators for(int i=0; i= nCacheLinesRemainder) { startOfRegion += nCacheLinesRemainder * floatsPerCacheLine; // skip the additional cache lines taken endOfRegion += nCacheLinesRemainder * floatsPerCacheLine; // skip the additional cache lines taken } else { startOfRegion += iThread * floatsPerCacheLine; // skip the additional cache lines taken before my thread endOfRegion += (iThread + 1) * floatsPerCacheLine; // ... plus the additional one for my thread } } if(endOfRegion > n) endOfRegion = n; int my_n = endOfRegion - startOfRegion; PartitionSums[iThread] = Simple_inclusive_scan(&b[startOfRegion], &a[startOfRegion], my_n); PartitionSequenceDone[iThread] = mySequence; // signal other threads we completed our slice sequence if(iThread == 0) { // thread 0 for(int i=1; i= n // Each thread to perfrom Simple_inclusive_scan and place results into float PartitionSums[ThreadsToUse]; // List of sequence (progress) indicators for each thread std::atomic PartitionSequenceDone[ThreadsToUse]; // Initialize the PartitionSums and PartitionSequenceDone indicators for(int i=0; i Sequence = 0; volatile float CarryIn = 0.0f; // choose a suitable number for each thread to use nCacheLines = SliceSize / floatsPerCacheLine; int nCacheLinesPerThreadBase = nCacheLines / ThreadsToUse; int nCacheLinesRemainder = nCacheLines % ThreadsToUse; #pragma omp parallel num_threads(ThreadsToUse) { int nThreads = omp_get_num_threads(); // may be less than global scoped nThreads int iThread = omp_get_thread_num(); int mySequence = 0; for(int iSlice = 0; iSlice < nSlices; ++iSlice) { int SliceBase = SliceSize * iSlice; int SliceEnd = std::min(SliceBase + SliceSize, n); int CacheLineStart = SliceBase + nCacheLinesPerThreadBase * iThread; int nCacheLines = nCacheLinesPerThreadBase; // new locally scopped nCacheLines int startOfRegion = CacheLineStart * floatsPerCacheLine; int endOfRegion = startOfRegion + (nCacheLines * floatsPerCacheLine); if(nCacheLinesRemainder) { // The first nCacheLinesRemainder number of threads get 1 more cache line if(iThread >= nCacheLinesRemainder) { startOfRegion += nCacheLinesRemainder * floatsPerCacheLine; // skip the additional cache lines taken endOfRegion += nCacheLinesRemainder * floatsPerCacheLine; // skip the additional cache lines taken } else { startOfRegion += iThread * floatsPerCacheLine; // skip the additional cache lines taken endOfRegion += (iThread + 1) * floatsPerCacheLine; // ... + 1 } } if(endOfRegion > n) endOfRegion = n; int my_n = endOfRegion - startOfRegion; PartitionSums[iThread] = Simple_inclusive_scan(&b[startOfRegion], &a[startOfRegion], my_n); PartitionSequenceDone[iThread] = mySequence; // signal other threads we completed our slice of Parallel_Simple_inclusive_scan_Slice float mySum = CarryIn; if(iThread == 0) { if(mySum != 0.0f) mySum = Simple_inclusive_scan_Carry(&b[startOfRegion], my_n, mySum); for(int i=1; i 1) // see if more than one vector, post decriment Nv { __m256 Out = Av[0]; // Out= 8, 7, 6, 5, 4, 3, 2, 1 (AVX register notation High:Low) #pragma unroll(2) for(int i = 0; i < Nv; ++i) { __m256 OutNext = Av[i+1]; // prefetch // Out= 8, 7, 6, 5, 4, 3, 2, 1 (AVX register notation High:Low) __m256 t1 = _mm256_permute_ps(Out,(2<<6)+(2<<4)+(0<<2)+0); // t1 = 7, 7, 5, 5, 3, 3, 1, 1 t1 = _mm256_and_ps(t1, mask_F0F0F0F0); // t1 = 7, 0, 5, 0, 3, 0, 1, 0 Out = _mm256_add_ps(Out, t1); // Out=15, 7,11, 5, 7, 3, 3, 1 __m256 t2 = _mm256_permute_ps(Out,(1<<6)+(1<<4)+(1<<2)+0); // t2 =11,11,11, 5, 3, 3, 3, 1 t2 = _mm256_and_ps(t2,mask_FF00FF00); // t2 =11,11, 0, 0, 3, 3, 0, 0 Out = _mm256_add_ps(Out, t2); // Out=26,18,11, 5,10, 6, 3, 1 __m256 t3 = _mm256_permute2f128_ps(Out,Zero,3+(0<<4)); // t3 =10, 6, 3, 1, 0, 0, 0, 0 t3 = _mm256_permute_ps(t3,3+(3<<2)+(3<<4)+(3<<6)); // t1 =10,10,10,10, 0, 0, 0, 0 Out = _mm256_add_ps(Out,t3); // Out=36,28,21,15,10, 6, 3, 1 Bv[i] = Out = _mm256_add_ps(Out,Carry); // Out=36,28,21,15,10, 6, 3, 1 + Carry Carry = _mm256_permute_ps(Out, 3+(3<<2)+(3<<4)+(3<<6)); //Carry=36,36,36,36,10,10,10,10 Carry = _mm256_permute2f128_ps(Carry,Zero,1+(1<<4)); //Carry=36,36,36,36,36,36,36,36 Out = OutNext; } // for(int i = 0; i < Nv; ++i) } // if(Nv-- > 1) __m256 Out = Av[Nv]; //last vector (Nv was decremented earlier) // Out= 8, 7, 6, 5, 4, 3, 2, 1 (AVX register notation High:Low) __m256 t1 = _mm256_permute_ps(Out,(2<<6)+(2<<4)+(0<<2)+0); // t1 = 7, 7, 5, 5, 3, 3, 1, 1 t1 = _mm256_and_ps(t1, mask_F0F0F0F0); // t1 = 7, 0, 5, 0, 3, 0, 1, 0 Out = _mm256_add_ps(Out, t1); // Out=15, 7,11, 5, 7, 3, 3, 1 __m256 t2 = _mm256_permute_ps(Out,(1<<6)+(1<<4)+(1<<2)+0);// t2 =11,11,11, 5, 3, 3, 3, 1 t2 = _mm256_and_ps(t2,mask_FF00FF00); // t2 =11,11, 0, 0, 3, 3, 0, 0 Out = _mm256_add_ps(Out, t2); // Out=26,18,11, 5,10, 6, 3, 1 __m256 t3 = _mm256_permute2f128_ps(Out,Zero,3+(0<<4)); // t3 =10, 6, 3, 1, 0, 0, 0, 0 t3 = _mm256_permute_ps(t3,3+(3<<2)+(3<<4)+(3<<6)); // t1 =10,10,10,10, 0, 0, 0, 0 Out = _mm256_add_ps(Out,t3); // Out=36,28,21,15,10, 6, 3, 1 Bv[Nv] = Out = _mm256_add_ps(Out,Carry); // Out=36,28,21,15,10, 6, 3, 1 + Carry return Out.m256_f32[7]; } float Vector_inclusive_scan_AVX(float b[], float a[], int n) { int nVectors = n / VectorWidthAVXfloat; // get number of whole vectors if(nVectors == 0) return Simple_inclusive_scan(b, a, n); float carry = Vector_inclusive_scan_AVX((__m256*)b, (__m256*)a, nVectors); // process remainder if any for(int i = nVectors * VectorWidthAVXfloat; i < n; ++i) { b[i] = (carry += a[i]); } return carry; } float Vector_inclusive_scan_AVX_Carry(__m256* Bv, int Nv, float CarryIn) { if(CarryIn != 0.0f) { __m256 Carry = _mm256_set1_ps(CarryIn); for(int i = 0; i < Nv; ++i) { Bv[i] = _mm256_add_ps(Bv[i], Carry); // Out= 8, 7, 6, 5, 4, 3, 2, 1 (AVX register notation High:Low) } } return Bv[Nv-1].m256_f32[7]; } float Vector_inclusive_scan_AVX_Carry(float b[], int n, float CarryIn) { if(CarryIn != 0.0f) { int Nv = n / VectorWidthAVXfloat; if(Nv) Vector_inclusive_scan_AVX_Carry((__m256*)b, Nv, CarryIn); for(int i=Nv * VectorWidthAVXfloat; i < n; ++i) b[i] += CarryIn; } return b[n-1]; } float Parallel_Vector_inclusive_scan_AVX(__m256* Bv, __m256* Av, int Nv) { __m256 Zero = _mm256_setzero_ps(); __m256 Carry = Zero; __m256 mask_F0F0F0F0 = _mm256_castsi256_ps(_mm256_set_epi32(0xFFFFFFFF,0x00000000,0xFFFFFFFF,0x00000000,0xFFFFFFFF,0x00000000,0xFFFFFFFF,0x00000000)); __m256 mask_FF00FF00 = _mm256_castsi256_ps(_mm256_set_epi32(0xFFFFFFFF,0xFFFFFFFF,0x00000000,0x00000000,0xFFFFFFFF,0xFFFFFFFF,0x00000000,0x00000000)); if(Nv-- > 1) // see if more than one vector, post decriment Nv { __m256 Out = Av[0]; // Out= 8, 7, 6, 5, 4, 3, 2, 1 (AVX register notation High:Low) #pragma unroll(2) for(int i = 0; i < Nv; ++i) { __m256 OutNext = Av[i+1]; // Out= 8, 7, 6, 5, 4, 3, 2, 1 (AVX register notation High:Low) __m256 t1 = _mm256_permute_ps(Out,(2<<6)+(2<<4)+(0<<2)+0); // t1 = 7, 7, 5, 5, 3, 3, 1, 1 t1 = _mm256_and_ps(t1, mask_F0F0F0F0); // t1 = 7, 0, 5, 0, 3, 0, 1, 0 Out = _mm256_add_ps(Out, t1); // Out=15, 7,11, 5, 7, 3, 3, 1 __m256 t2 = _mm256_permute_ps(Out,(1<<6)+(1<<4)+(1<<2)+0); // t2 =11,11,11, 5, 3, 3, 3, 1 t2 = _mm256_and_ps(t2,mask_FF00FF00); // t2 =11,11, 0, 0, 3, 3, 0, 0 Out = _mm256_add_ps(Out, t2); // Out=26,18,11, 5,10, 6, 3, 1 __m256 t3 = _mm256_permute2f128_ps(Out,Zero,3+(0<<4)); // t3 =10, 6, 3, 1, 0, 0, 0, 0 t3 = _mm256_permute_ps(t3,3+(3<<2)+(3<<4)+(3<<6)); // t1 =10,10,10,10, 0, 0, 0, 0 Out = _mm256_add_ps(Out,t3); // Out=36,28,21,15,10, 6, 3, 1 Bv[i] = Out = _mm256_add_ps(Out,Carry); // Out=36,28,21,15,10, 6, 3, 1 + Carry Carry = _mm256_permute_ps(Out, 3+(3<<2)+(3<<4)+(3<<6)); //Carry=36,36,36,36,10,10,10,10 Carry = _mm256_permute2f128_ps(Carry,Zero,1+(1<<4)); //Carry=36,36,36,36,36,36,36,36 Out = OutNext; } // for(int i = 0; i < Nv; ++i) } // if(Nv-- > 1) __m256 Out = Av[Nv]; // Out= 8, 7, 6, 5, 4, 3, 2, 1 (AVX register notation High:Low) __m256 t1 = _mm256_permute_ps(Out,(2<<6)+(2<<4)+(0<<2)+0); // t1 = 7, 7, 5, 5, 3, 3, 1, 1 t1 = _mm256_and_ps(t1, mask_F0F0F0F0); // t1 = 7, 0, 5, 0, 3, 0, 1, 0 Out = _mm256_add_ps(Out, t1); // Out=15, 7,11, 5, 7, 3, 3, 1 __m256 t2 = _mm256_permute_ps(Out,(1<<6)+(1<<4)+(1<<2)+0); // t2 =11,11,11, 5, 3, 3, 3, 1 t2 = _mm256_and_ps(t2,mask_FF00FF00); // t2 =11,11, 0, 0, 3, 3, 0, 0 Out = _mm256_add_ps(Out, t2); // Out=26,18,11, 5,10, 6, 3, 1 __m256 t3 = _mm256_permute2f128_ps(Out,Zero,3+(0<<4)); // t3 =10, 6, 3, 1, 0, 0, 0, 0 t3 = _mm256_permute_ps(t3,3+(3<<2)+(3<<4)+(3<<6)); // t1 =10,10,10,10, 0, 0, 0, 0 Out = _mm256_add_ps(Out,t3); // Out=36,28,21,15,10, 6, 3, 1 Bv[Nv] = Out = _mm256_add_ps(Out,Carry); // Out=36,28,21,15,10, 6, 3, 1 + Carry return Out.m256_f32[0]; } float Parallel_Vector_inclusive_scan_AVX(float b[], float a[], int n) { __assume_aligned(b,CacheLineSize); __assume_aligned(a,CacheLineSize); int nCacheLines = n / floatsPerCacheLine; // (there may be a partial remainder) int MinimumCacheLinesPerThread = 1; // change this later to most efficient int ThreadsToUse = std::min(nThreadsGlobal, nCacheLines / MinimumCacheLinesPerThread); // See if more efficient to use 1 thread if(ThreadsToUse <= 1) return Simple_inclusive_scan(b, a, n); // Use more than one thread // b[] and a[] are partitioned into ThreadsToUse number of partitions // Each thread to perfrom Simple_inclusive_scan and place results into float PartitionSums[ThreadsToUse]; // List of done flages for ThreadsToUse number of partitions std::atomic PartitionSequenceDone[ThreadsToUse]; // Initialize the PartitionSums and PartitionSequenceDone indicators for(int i=0; i= nCacheLinesRemainder) { startOfRegion += nCacheLinesRemainder * floatsPerCacheLine; // skip the additional cache lines taken endOfRegion += nCacheLinesRemainder * floatsPerCacheLine; // skip the additional cache lines taken } else { startOfRegion += iThread * floatsPerCacheLine; // skip the additional cache lines taken before my thread endOfRegion += (iThread + 1) * floatsPerCacheLine; // ... +1 for my thread } } if(endOfRegion > n) endOfRegion = n; int my_n = endOfRegion - startOfRegion; PartitionSums[iThread] = Vector_inclusive_scan_AVX(&b[startOfRegion], &a[startOfRegion], my_n); PartitionSequenceDone[iThread] = mySequence; // signal other threads we completed our slice sequence if(iThread == 0) { // thread 0 for(int i=1; i= n // Each thread to perfrom Simple_inclusive_scan and place results into float PartitionSums[ThreadsToUse]; // List of sequence (progress) indicators for each thread std::atomic PartitionSequenceDone[ThreadsToUse]; // Initialize the PartitionSums and PartitionSequenceDone indicators for(int i=0; i Sequence = 0; volatile float CarryIn = 0.0f; // choose a suitable number for each thread to use int nCacheLinesSlice = SliceSize / floatsPerCacheLine; int nCacheLinesPerThreadBase = nCacheLinesSlice / ThreadsToUse; int nCacheLinesRemainder = nCacheLinesSlice % ThreadsToUse; #pragma omp parallel num_threads(ThreadsToUse) { int nThreads = omp_get_num_threads(); // may be less than global scoped nThreads int iThread = omp_get_thread_num(); int mySequence = 0; for(int iSlice = 0; iSlice < nSlices; ++iSlice) { int SliceBaseCacheLine = nCacheLinesSlice * iSlice; int SliceEndCacheLine = std::min(SliceBaseCacheLine + nCacheLinesSlice, n); int CacheLineStart = SliceBaseCacheLine + nCacheLinesPerThreadBase * iThread; int nCacheLinesThread = nCacheLinesPerThreadBase; int startOfRegion = CacheLineStart * floatsPerCacheLine; int endOfRegion = startOfRegion + (nCacheLinesThread * floatsPerCacheLine); if(nCacheLinesRemainder) { // The first nCacheLinesRemainder number of threads get 1 more cache line if(iThread >= nCacheLinesRemainder) { startOfRegion += nCacheLinesRemainder * floatsPerCacheLine; // skip the additional cache lines taken endOfRegion += nCacheLinesRemainder * floatsPerCacheLine; // skip the additional cache lines taken } else { startOfRegion += iThread * floatsPerCacheLine; // skip the additional cache lines taken before my thread endOfRegion += (iThread + 1) * floatsPerCacheLine; // ... plus the additional one for my thread } } if(endOfRegion > n) endOfRegion = n; int my_n = endOfRegion - startOfRegion; PartitionSums[iThread] = Vector_inclusive_scan_AVX(&b[startOfRegion], &a[startOfRegion], my_n); _mm_mfence(); PartitionSequenceDone[iThread] = mySequence; // signal other threads we completed our slice of Parallel_Simple_inclusive_scan_Slice if(iThread == 0) { if(CarryIn != 0.0f) Vector_inclusive_scan_AVX_Carry(&b[startOfRegion], my_n, CarryIn); } else { // not thread 0 float mySum = CarryIn; // include the thread carries into mySum for(int i=0; i worst) worst = times[i]; sum += times[i]; } return best; return (sum - worst) / 3; } int _tmain(int argc, _TCHAR* argv[]) { Init(); CacheCutoffInFloats = Nmax; // no cutoff for(int N=1; N <= Nmax; N *=2) { Populate(N); __int64 Simple, VectorAVX; __int64 Parallel_Simple[nThreadsMax+1]; __int64 Parallel_VectorAVX[nThreadsMax+1]; __int64 Parallel_VectorAVX_Sliced[nThreadsMax+1]; for(int i=1;i<=nThreadsMax; ++i) Parallel_Simple[i] = Parallel_VectorAVX[i] = Parallel_VectorAVX_Sliced[i] = -1; Simple = AverageBestThreeOutOfFour(Simple_inclusive_scan, BoutputReference, Ainput, N); VectorAVX = AverageBestThreeOutOfFour(Vector_inclusive_scan_AVX, Boutput, Ainput, N); Check(Boutput,BoutputReference, N); for(nThreadsGlobal = 1; nThreadsGlobal <= nThreadsMax; ++nThreadsGlobal) { Parallel_Simple[nThreadsGlobal] = AverageBestThreeOutOfFour(Parallel_Simple_inclusive_scan, Boutput, Ainput, N); Check(Boutput,BoutputReference, N); Parallel_VectorAVX[nThreadsGlobal] = AverageBestThreeOutOfFour(Parallel_Vector_inclusive_scan_AVX, Boutput, Ainput, N); Check(Boutput,BoutputReference, N); Parallel_VectorAVX_Sliced[nThreadsGlobal] = AverageBestThreeOutOfFour(Parallel_Vector_inclusive_scan_AVX_Sliced, Boutput, Ainput, N); Check(Boutput,BoutputReference, N); } std::cout << N << "," << Simple << "," << VectorAVX; for(int i=1; i<=nThreadsMax; ++i) std::cout << "," << Parallel_Simple[i] << "," << Parallel_VectorAVX[i]; std::cout << std::endl; } std::cout << std::endl; std::cout << "Find CacheCutoffInFloats" << std::endl; nThreadsGlobal = nThreadsMax; int CacheLineBump = 1024 / sizeof(float); for(CacheCutoffInFloats = CacheLineBump; CacheCutoffInFloats <= CacheSize_LLC_KB * 1024 / sizeof(float); CacheCutoffInFloats += CacheLineBump) { std::cout << CacheCutoffInFloats << ", " << AverageBestThreeOutOfFour(Parallel_Vector_inclusive_scan_AVX_Sliced, Boutput, Ainput, Nmax) << std::endl; Check(Boutput,BoutputReference, Nmax); if(CacheCutoffInFloats * sizeof(float) > CacheSize_L2_KB * 1024) CacheLineBump = 1024 * 16 / sizeof(float); } std::cout << "Done" << std::endl; return 0; }