Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Pipe Line Problem

Jogi_Narain
Beginner
345 Views
Hi All,
I have created a simple 3 stage pipe-line. The first stage creates some data, the second processes it and the third prints it out.

I am having a problem with the first stage. Instead of generating 10 messages then quitting it continues for ever (ok -- until I ctrl-c after 10 secs)

If I subsitute the "for" loop with a "while" loop reading data from a file it works fine!

Has anyone got any insight into why the "while" works and the "for" fails? Thanks

Regards
->Jogi

###########

void* operator()(void* item)
{
int i = 0 ;
while (!_ifs.eof()) {
i++ ;
_ifs >> _datap.name ;
_ifs >> _datap.val_a ;
_ifs >> _datap.val_b ;
_ifs >> _datap.val_c ;

if (_datap.name == "") break ;

std::cout << "Rm: " << _counter << " " << _datap.name << " " << _datap.val_a << " " << _datap.val_b << " " << _datap.val_c << std::endl ;

return &_datap ;
}

return NULL ;
}

##########
#include
#include
#include
#include

#include "tbb/task_scheduler_init.h"
#include "tbb/concurrent_hash_map.h"
#include "tbb/pipeline.h"

typedef struct
{
std::string name ;
int val_a ;
double val_b ;
int val_c ;
} MsgData;

typedef tbb::concurrent_hash_map<:STRING> MsgMap ;

class ReadMessage : public tbb::filter
{
public:
ReadMessage(int count = 10) : tbb::filter(serial_in_order), _count(count) {}

private:

void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;

//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;

return &_data_msg ;
}

return NULL ;
}

int _count ;
MsgData _data_msg ;
};

class ProcessMessage : public tbb::filter
{
public:
ProcessMessage(MsgMap& name_map) : tbb::filter(parallel)
{
}

private:

void* operator()(void* item)
{
MsgData* datap = static_cast(item);

if (datap != NULL) {
//std::cout << datap->name << " " << datap->val_a << " " << datap->val_b << " " << datap->val_c << std::endl ;
MsgMap::accessor a ;
if (_name_map.find(a, datap->name)) {
a->second = *datap ;
}
else {
_name_map.insert(a, datap->name) ;
a->second = *datap ;
}
}

return datap ;
}

MsgMap _name_map ;
};


class PublishMessage : public tbb::filter
{
public:
PublishMessage() : tbb::filter(serial_in_order)
{
}

private:

void* operator()(void* item)
{
MsgData* datap = static_cast(item);

if (datap != NULL) {
std::cout << datap->name << " " << datap->val_a << " " << datap->val_b << " " << datap->val_c << std::endl ;
}

return NULL ;
}

};

int main(int argc, char* argv[])
{
MsgMap names ;
tbb::task_scheduler_init init ;

// Create the pipeline
tbb::pipeline pipeline;

// Create file-reading writing stage and add it to the pipeline
ReadMessage input_filter(10);
pipeline.add_filter(input_filter);

ProcessMessage process_filter(names);
pipeline.add_filter(process_filter);

PublishMessage output_filter;
pipeline.add_filter(output_filter);

// Run the pipeline
pipeline.run(1);

// Remove all filters from the pipeline
pipeline.clear();

return 0;
}



0 Kudos
3 Replies
Vivek_Rajagopalan
345 Views
>>>>>
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;

//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;

return &_data_msg ;
}

return NULL ;
}
>>>>>>

Your for loop never gets to return NULL. Everytime the filter visits the token, your for loop rolls only once, even though you have " i < 10".



0 Kudos
Jogi_Narain
Beginner
345 Views
Hi Vivek, thanks for replying.

I don't understand the answer... I thought it would only call the filter *once* and the for loop would then run 10 times. Why does this work for the while...?


>>>>>
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;

//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;

return &_data_msg ;
}

return NULL ;
}
>>>>>>

Your for loop never gets to return NULL. Everytime the filter visits the token, your for loop rolls only once, even though you have " i < 10".




0 Kudos
Jogi_Narain
Beginner
345 Views
Quoting - Jogi Narain
Ok -- think I am getting this... the start of the pipline will be called until it returns NULL. Hence the terminating condition cannot reside in the called function. Hence thats why the while worked it was calling a external resource that was counting down items.

Is my understanding correct?

0 Kudos
Reply