Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Optimizing a TBB pipeline

Sensei_S_
Beginner
786 Views

Dear all,

I'm currently using a parallel pipeline with I/O bounded filters. Let me remind you my project, if you can bear with me.

A serial filter reads a string, a parallel filter transforms a string, and a serial filter writes back. In order to have better performance, or so I thought, the writer and transformers threads won't communicate directly. Instead, the transformer filter writes to a concurrent queue, and the writer pops from that. 

Now I've launched my program setting in the main a number of threads, in my case tbb::task_scheduler_init init(6), however, in Xcode I see that I have only two threads on. The pipeline, moreover, is slow. I expected it to be so, since I'm using a transformer that is actually an optimizer with complexity O(n^3), but I think I'm doing something really wrong here.

So now, how can I profile my program in order to see what is slowing down the pipeline? I'd really like to optimize it at the best I can do.

I think I could use a queue also from reader to transformer threads, if this could help. Or the problem could lie in the writer, but I don't really know how I can see if this is the case...

Any hints would be really appreciated!

Cheers!

0 Kudos
2 Replies
jimdempseyatthecove
Honored Contributor III
786 Views

Here is a sample program from an older version of TBB

TBB parallel_pipeline

// Filter that writes each buffer to a file.

class MyOutputFilter: public tbb::filter

{

FILE* my_output_file;

public:

MyOutputFilter( FILE* output_file );

/*override*/void* operator()( void* item );

};

MyOutputFilter::MyOutputFilter( FILE* output_file ) :

tbb::filter(/*is_serial=*/true),

my_output_file(output_file)

{

}

void* MyOutputFilter::operator()( void* item )

{

MyBuffer& b = *static_cast<MyBuffer*>(item);

fwrite( b.begin(), 1, b.size(), my_output_file );

return NULL;

}

// Filter that changes the first letter of each word

// from lower case to upper case.

class MyTransformFilter: public tbb::filter

{

public:

MyTransformFilter();

/*override*/void* operator()( void* item );

};

MyTransformFilter::MyTransformFilter() :

 

tbb::filter(/*serial=*/false)

{}

/*override*/void* MyTransformFilter::operator()( void* item )

{

MyBuffer& b = *static_cast<MyBuffer*>(item);

bool prev_char_is_space = b.begin()[-1]==' ';

for( char* s=b.begin(); s!=b.end(); ++s )

{

if( prev_char_is_space && islower(*s) )

*s = toupper(*s);

prev_char_is_space = isspace(*s);

}

return &b;

}

class MyInputFilter: public tbb::filter

{

public:

static const size_t n_buffer = 4;

MyInputFilter( FILE* input_file_ );

private:

FILE* input_file;

size_t next_buffer;

char last_char_of_previous_buffer;

MyBuffer buffer[n_buffer];

/*override*/ void* operator()(void*);

};

MyInputFilter::MyInputFilter( FILE* input_file_ ) :

filter(/*is_serial=*/true),

next_buffer(0),

input_file(input_file_),

last_char_of_previous_buffer(' ')

{

}

void* MyInputFilter::operator()(void*)

{

MyBuffer& b = buffer[next_buffer];

next_buffer = (next_buffer+1) % n_buffer;

size_t n = fread( b.begin(), 1, b.max_size(), input_file );

if( !n )

{

// end of file

return NULL;

}

else

{

b.begin()[-1] = last_char_of_previous_buffer;

last_char_of_previous_buffer = b.begin()[n-1];

b.set_end( b.begin()+n );

return &b;

}

}

 

// Create the pipeline

tbb::pipeline pipeline;

// Create file-reading writing stage

MyInputFilter input_filter( input_file );

// and add it to the pipeline

pipeline.add_filter( input_filter );

// Create capitalization stage

MyTransformFilter transform_filter;

// and add it to the pipeline

pipeline.add_filter( transform_filter );

// Create file-writing stage

MyOutputFilter output_filter( output_file );

// and add it to the pipeline

pipeline.add_filter( output_filter );

// Run the pipeline

pipeline.run( MyInputFilter::n_buffer );

// Remove filters from pipeline before they are implicitly destroyed.

 

Jim Dempsey

0 Kudos
robert-reed
Valued Contributor II
786 Views

I have a blog series that is pretty old now (and thus may be a little decrepit) but is all about using Intel TBB pipelines to overlap processing with serial I/O  so might still have some relevance to your current problem.  It starts here:

https://software.intel.com/en-us/blogs/2007/08/23/overlapping-io-and-processing-in-a-pipeline 

0 Kudos
Reply