Community
cancel
Showing results for 
Search instead for 
Did you mean: 
zoolii
Beginner
115 Views

Number of threads and Pipeline processing

Hi All,


One of my applicationsuses pipeline processing.

When the the input data size is small (below 5K), I set the number of tokens that can be executed parallely
to two. I am executing this application in aneight core machine. When the number of threadsto be executed is chosen by the TBB automatically , TPS is going down. If I set number of threads to be executed totwo explicitly by thetbb:task_scheduler_init object, performance is almost 100%more than the previous case.

Now how can I restrictTBB so thatonlytwo specific threadswill be used for processing (Even if there are 8 threads created automatically) ?


Thank You
0 Kudos
16 Replies
RafSchietekat
Black Belt
115 Views

Have you also experimented setting the number of tokens run by the pipeline instead of the number of threads used by the scheduler?
zoolii
Beginner
115 Views

Yes both cases I use

"pipeline.run(NUM_INPUT_TOKENS ); "

where NUM_INPUT_TOKENS = 2;

Only difference is for scenario where TPS is more , I explicitly restrict the number of

threads createdto two where as the other scenario TBBcreates 8 threads(since there are

eight cores) . Also when there are eight threads , the CPU usage is above 90%. May be this

is an indication that all the threads are used to execute the two tokens - but only two get

the tasks at a time.

Thank you

RafSchietekat
Black Belt
115 Views

But have you tried a higher value for NUM_INPUT_TOKENS, such as a default_num_threads or a small multiple of it? If not, is there a reason you didn't that might be relevant to solving the mystery? What is the topology (how many filters, and what kind)?

If that above-90% CPU usage is from a possible 0-800%, then the high below-100 value might be the main thread quite predictably twiddling its thumbs (TBB doesn't want it to go to sleep entirely), otherwise I wouldn't know.

jimdempseyatthecove
Black Belt
115 Views

The CPU usage from Task Manager (or similar Linux utility) does not indicate the percent of productive work. It tells you the percent of time the CPUs are not in idel state. TBB, OpenMP and other threading toolkits run in a spinwait immediately after completing a task whenever there is no next task to run. Should the remaining working threads enqueue a new task while this thread is in this spin wait period then the spinning thread can immediately resume working (without incurring the overhead of thread suspension and resumption). Only when the spinwait exceeds a predefined period without additional work will the thread suspend itself.

What you want to do is measure your TPS (transactions per second?) under differing circumstances (e.g. 8 threads 2 tokens, etc... with 6 idel threads).

Note, if the 8 threads are busy when you start your pipeline with 2 tokens, then the pipeline may run using one thread (the one issuing the parallel_pipeline.run). And thus exhibit half the TPS.

For testing, try this

a) init with 8 threads in TBB task pool
b) issue dummy parallel_for that is sufficiently large to use all 8 threads
c) after parallel_for (in main/serial code) issue a Sleep() to suspend for 1 or 2 seconds
d) read timestamp at start
e) run parallel pipeline with 2 tokens
f) read timestamp at end
g) compute time and print out TPS

For test 2 use test 1 setup but omit step 3 above (eliminate sleep)

For test 3

a) init with 8 threads in TBB task pool
b) issue dummy parallel_for that is sufficiently large to use all 8 threads
c) issue parallel_invoke with 2 tasks
c.1) issue dummy parallel_for that is sufficiently large to use all 8 threads
and run several times longer than parallel_pipeline
c.2.a)read timestamp at start
c.2.b) run parallel pipeline with 2 tokens
c.2.c) read timestamp at end
d) compute time and print out TPS

This will provide you with some idea of the variance in TPS due to differing load conditions.

Jim Dempsey
zoolii
Beginner
115 Views

Initialy NUM_INPUT_TOKENS= default_numthreads i.e 8

My input size is around 450 bytes. Performance was too low if we execute a pipeline process in which each parallel thread gets 450/8 (approx).Then I found for this scenario NUM_INPUT_TOKENS = 2 gives a significantly higher performance. Then came the observation that if I reduce the number of threads to 2, it further improves the TPS.Now I have a "selecter" function which will decide the "number of input tokens" based on the size of input data. If number of thread can alsobe selected like that, I would have done that.
RafSchietekat
Black Belt
115 Views

Still strange to have this negative scalability. Maybe this is a 2-socket machine and each cache can only handle one data item at a time before thrashing starts, does that make sense to anyone?
jimdempseyatthecove
Black Belt
115 Views

Zoolii,

Can you post your code?

Maybe there is something in the code that you think is innocuous but in fact is negativelyimpacting the scalability of your pipeline.

As Raf says, it seems strange to have somewhat negative scalability.

Jim
zoolii
Beginner
115 Views

Hi Jim,

I am pasting asample program below which shows exactly the same problem.


When I excuted it I got following results (In a dual quad core(2 X Xeon E5640))


TPS (8 Threads,8 Tokens) : 125250
TPS (8 Threads,2 Tokens) : 316944
TPS (2 Threads,2 Tokens) : 559019






[bash]#include "tbb/tick_count.h"
#include "tbb/pipeline.h"
#include "tbb/tick_count.h"
#include "tbb/task_scheduler_init.h"
#include 

using namespace std;
using namespace tbb;

#define MAX_INPUT_TOKEN 8


static char data[] = "Beat the 12 eggs, 2 cups sugar and vanilla or lemon extract together. Stir in the ricotta cheese and the chocolate chips. Set aside.
Preheat oven to 325 degrees F (165 degrees C). Grease two deep dish pie plates.
Combine the flour, baking powder, and 1 cup of the sugar together. Cut in the shortening and mix until the mixture resembles coarse crumbs. Mix in 4 of the eggs and 1 teaspoon of the vanilla. 
Divide dough into 4 balls and chill (if needed)";



 class TextSlice 
{


    const char* pBuffer;
	size_t bufLen;
    size_t count;
	char charToCount;

public:
	TextSlice() {}
	~TextSlice(){}

	void setInput(const char* pBuf, size_t len ,char charToCnt ) 
	{
		pBuffer = pBuf;
		bufLen = len;
		charToCount = charToCnt;
		count = 0;

	}

	void countChar()
	{
		size_t len = bufLen;
		while(len--) {
			if(pBuffer[len] == charToCount) {

				++count;
			}
		}
	}

	size_t getCount() 
	{
		return count;
	}
   
};


 class MyInputFilter: public filter 
{

private:

	TextSlice inputTokens[MAX_INPUT_TOKEN];
	const char* pBuffer;
	size_t bufLen;
	size_t unitLen;
	size_t numInputToken;
	char charToBeFound;
	int index;

public:
	MyInputFilter():  filter(serial_in_order){}
	~MyInputFilter(){}

	void setInput(const char* pBuf, 
					size_t len ,
					size_t numTokens,
					char charToFind	)
	{
		index = -1;
		pBuffer = pBuf;
		bufLen = len;
		numInputToken = numTokens;
		charToBeFound = charToFind;
		unitLen = bufLen/numTokens;
	}

	void* operator()(void* pItem)
	{
		if(bufLen) {

			TextSlice* pChunk = &inputTokens[ ++index % numInputToken];
			if(bufLen >= unitLen) {

				pChunk->setInput(pBuffer,unitLen,charToBeFound);
				pBuffer += unitLen;
				bufLen -=  unitLen;
			} else {

				pChunk->setInput(pBuffer,bufLen,charToBeFound);
				bufLen = 0;
			}
			return pChunk;
		}
		return 0;
	}
};

 

class MyTransformFilter: public filter {

public:
	MyTransformFilter() : filter(parallel) {}
	~MyTransformFilter() {}

	

	void* operator()( void* item ) 
	{
		((TextSlice*) item)->countChar();
		return item;
	}
};



 class MyOutputFilter: public filter 
{
	size_t count;
 
public:
	MyOutputFilter( ):  tbb::filter(serial_in_order) {}


	size_t getCount()
	{
		size_t cnt = count;
		count = 0;
		return cnt;
	}
    void* operator()( void* item )
	{
		count += ((TextSlice*)item)->getCount();
		return item;
	}
};



double measureTPS(size_t numTokens )
{

	MyInputFilter inputFilter;
	MyTransformFilter transformFilter;
	MyOutputFilter  outputFilter;
    
    pipeline pipeline;

    pipeline.add_filter( inputFilter );
    pipeline.add_filter( transformFilter );
    pipeline.add_filter( outputFilter );

	size_t numLoops = 1000000;
		
	double total = 0;
	
	for(size_t i = 0 ; i < numLoops ; i++) {

		tick_count t0 = tick_count::now();
	
		inputFilter.setInput(data,sizeof(data) - 1,numTokens,'a');
		pipeline.run(numTokens);
		outputFilter.getCount();

		tick_count t1 = tick_count::now();
		total += (t1 - t0).seconds();
	}
	
	
	return  numLoops/total;
}


void  measureTPS_8TH_8TK( )
{
	task_scheduler_init schInit(8);
	cout <<" TPS (8 Threads,8 Tokens)  : " << measureTPS(8) <
RafSchietekat
Black Belt
115 Views

Seems like a simple matter of parallel overhead. If you chop the input into little pieces based on the number of tokens assigned to the pipeline,and those pieces arenot big enough to offset the work involved in handling tasks, then most of the work will be spent on those tasks, of which there will now be more. That explains the longer time for more tokens. A fairer comparison would be with equal-size slices of text.

I yield the baton to whoever wants to explain the improved result with fewer threads, but maybe we should first also get the benchmark results for 2 threads and 8 tokens.
jimdempseyatthecove
Black Belt
115 Views

Zoolii,

Your test program is determining the time to:

1) construct a parallel_pipeline
2) run() the parallel_pipeline (initiate tasksfor pipeline)
3) process the data stream for use by the parallel_pipeline
4) terminate the tasks of the parallel_pipeline

A TPS, which I am assuming to mean is transactions per second.

parallel_pipelines do have overhead in setup (steps 1, 2) and in shutdown (step 4). The intended use is to increase the TPS of the data stream (step 3). Therefore their intended use is for data streams of sufficient size to make the overhead of the setup and shutdown and insignificant portion of the process.

If your program requirement is to run short streams of data packets through a pipeline (and where no I/O is involved)then I would suggest you consider using parallel_invoke containing your pipe lists and where the pipes use progress barriers

...
// we are pipe 2, wait for pipe 1
while(!Pipe1Done)
_mm_pause();
...

You may have to experiment with pipe ordering in the parallel_invoke to attain optimal performance.

Jim Dempsey
zoolii
Beginner
115 Views

Hi Jim,


Thank you for your reply. I never explored the parallel_invoketill your reply. Now I employ parallel_invoke and my application performance further improved. Thanks forthe same. However my original problem prevails. i.e for input data of small size, increase in number of thread affects performance negatively ( with constant parallel tokens). Any workaround?

Also in my example the time taken for construction of parallel_pipeline is not included. That is outside the loop.




Thank you
Zooli
RafSchietekat
Black Belt
115 Views

Butwhat's your response to #9, where I pointed out one thing that was wrong in your approach (perhaps eliminated by using parallel_invoke instead?) and asked for a 4th benchmark? In general, you should always be prepared for negative benefits from parallel processing of minimally sizedinput data (that fixed string is far too small to begin with), and follow tbb::parallel_sort's example of just switching to sequential processing instead below a specific threshold (grainsize is not just a parameter to blocked_range and its ilk).
zoolii
Beginner
115 Views

Hi Raf,

If the number of token is more and input size is less, thenthat can reduce the performance and that is quite obvious (which is not the issue I am trying to get solved).If you read the sample program given , it is testing equally sized/sliced text chunks(for 8thread_2token and 2thread_2token ).Also theperformance figuresfor2thread_8Token (the 4Th benchmark ) ismuch less than 2thread_2token scenario.

Input data given for the example islarge enough to get the scale up since it is giving more performancewhen parsed with 2 threads (instead of 1) .

So the question is whyis it giving more performance for 2thread_2token scenario than 8thread_2token scenario ? . Is it because the 'task_stealing' more'intense'since there are 8 threads trying get hold of the task ?.

Thank you

Zooli

RafSchietekat
Black Belt
115 Views

"If you read the sample program given , it is testing equally sized/sliced text chunks (for 8thread_2token and 2thread_2token ) ."
I don't really know what happened there, and I don't want to guess yet (if pressed, I might offer stealing and joining overhead). But the test program is pathetically small, so it hardly seems meaningful.

"Also the performance figures for 2thread_8Token (the 4Th benchmark ) is much less than 2thread_2token scenario ."
Line 88 ("unitLen = bufLen/numTokens;") will make the program use more tasks as the number of tokens increases, each of which has negligible work to do, so the increasing overhead dominates.

"Input data given for the example is large enough to get the scale up since it is giving more performance when parsed with 2 threads (instead of 1) ."
Why don't you print those performance points, and the 8/8 I suggested? Better yet, why not parameterise those measurement functions and run 8 times 8 tests that say exactly how much "more performance" and how it varies?
zoolii
Beginner
115 Views

I am not able toagree with your argument that small programs are meaningless. After all I am explaining a concept and I do not think you need a big program to explain that especially to suchforums.

Yes unitLen = bufLen/numTokens. will divide into same text size for 2thread_2token and 8thread_2token.

Since you were wondering about the 2thread_8Token scenario, I just stated that this scenario gives much less performance.

I measured same before posting the same.I did not post the results for "1thread_1token"and "8thread_8
tokens" because performance was less and that time that was not the issue .

The machin is not available with me. Once it is available I will publish the results.
RafSchietekat
Black Belt
115 Views

"I am not able to agree with your argument that small programs are meaningless. After all I am explaining a concept and I do not think you need a big program to explain that especially to such forums."
I meant that small input is often meaningless because you cannot reliably extrapolate the results in a context where overhead is a big part of the equation. Creating and running a pipeline a million times is not going to increase the amount of work each instance does, it only amortises thread startup.

"Since you were wondering about the 2thread_8Token scenario, I just stated that this scenario gives much less performance."
This is indeed the missing combination I asked for (not 8/8 as I wrote in #14), but, e.g., does it compare to 8/8 like 2/2 compares to 8/2 or is the ratio different? Unless somebody else can say more with what you have already provided (did I miss anything?), I can only ask for more (numerical) information to work with before I could possibly say more, and that's not even exploring realistic data sizes.

Reply