Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
Announcements
Welcome to the Intel Community. If you get an answer you like, please mark it as an Accepted Solution to help others. Thank you!

parallel_pipeline implementation

Singh_Jasdeep
Beginner
66 Views

hi ,

i am implementing parallel_pipeline function in one code and sometimes according to my condition pipeline stops but sometimes it doesn't . i am unable to find the reason .Please help where it gets stucked.

Code which i am using is :

#include <iostream>
#include <sstream>
#include <fstream>
#include <vector>
#include <algorithm>
#include <tbb/pipeline.h>
#include <tbb/atomic.h>
#include <tbb/concurrent_queue.h>
#include <tbb/compat/thread>
#include <tbb/tbbmalloc_proxy.h>
#include <tbb/tick_count.h>
using namespace std;
using namespace tbb;

#define pi 3.141593
#define FILTER_LEN 265

class MyBuffer
{
public:
double *acc;
double *buffer;
int start,end;
static int j;

MyBuffer()
{
start=0;
end=0;

buffer=new double[150264];
acc=new double[150000];
fill_n(buffer,150264,0);

}

~MyBuffer()
{
delete[] buffer;
delete[] acc;
}

int startnumber()
{
return start;
}

int endnumber()
{
return end;
}

};

typedef concurrent_bounded_queue<MyBuffer> QueueMyBufferType;
QueueMyBufferType chunk_queue;

int MyBuffer::j=0;

int queueloopcount=30;

void input_function()
{
stop_flag = false;

cout<<"thread reached to call input function " <<endl;
ofstream o("testing sinewave.csv");

int counter=0;
while(counter<150000)
{
// cout<<"value of counter is \t" <<counter << endl;

MyBuffer *b=new MyBuffer;
b->start=(FILTER_LEN-1+(counter));
b->end=(5264+(counter));

// cout<<"value of b.start is and b.end is "<<b->start<<"\t" <<b->end<<endl;

for(int i =b->startnumber(); i <b->endnumber(); i++)
{
b->buffer = sin(700 * (2 * pi) * (i / 5000.0));
o<<b->buffer<<endl;
}
chunk_queue.push(*b);

counter+=5000;
// cout<<"value of queueloopcount is "<< queueloopcount << endl;
}

cout<<"all data is perfectly generated" <<endl;
}

int main()
{

int ntokens = 8;

thread inputfunc(input_function);
tick_count t1,t2;
ofstream o("filter700Hz.csv");
t1=tick_count::now();

bool stop_pipeline = false;
stop_filter=false;

/* MyBuffer *b=new MyBuffer;

while(queueloopcount!=0)
{
chunk_queue.pop(*b);
{
cout<<"value of start and end popped is "<<b->startnumber()<<"\t"<<b->endnumber()<<endl;
queueloopcount--;
}

} */

inputfunc.join();

parallel_pipeline(ntokens,make_filter<void,MyBuffer*>
(
filter::parallel,[&](flow_control& fc)->MyBuffer*
{
if(queueloopcount==0)
{
fc.stop();
cout<<"pipeline stopped"<<endl;
}
else
{
MyBuffer *b=new MyBuffer;
chunk_queue.pop(*b);
{
cout<<"value of start and end popped is "<<b->startnumber()<<"\t"<<b->endnumber()<<endl;
queueloopcount--;
}
return b;
}
}
)&

make_filter<MyBuffer*, void>
(
filter::serial,[&](MyBuffer* b)
{
cout<<"value of second filter start is and end is \t "<< b->startnumber() << "\t" << b->endnumber() <<endl;
}
)
);
 

t2=tick_count::now();

cout << "\n Time elapsed is \n\n" <<(t2-t1).seconds()<<endl;

return 0;
}

0 Kudos
2 Replies
MLema2
New Contributor I
66 Views

I might be wrong but it seems strange to me to have a parallel first filter.  I'm not sure if flow_control object is designed to be used this way.

You should probably have a serial first filter that only create an empty MyBuffer* and stop the pipeline if no more work is due.  Then have a parallel second filter that performs the real work and finally a serial (in-order ?) output stage.

Singh_Jasdeep
Beginner
66 Views

@Michek Lemay yeah this made my code to work ...but the thing is sometimes it works with first filter in parallel and secondly the sample Network Router Emulator which has this implementation has flow_control in first stage implemented in parallel and it works ..So reason is code is blocking when it tries to pop if implemented parallely because pop here used is a blocking call.

Reply