- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi All,
I have created a simple 3 stage pipe-line. The first stage creates some data, the second processes it and the third prints it out.
I am having a problem with the first stage. Instead of generating 10 messages then quitting it continues for ever (ok -- until I ctrl-c after 10 secs)
If I subsitute the "for" loop with a "while" loop reading data from a file it works fine!
Has anyone got any insight into why the "while" works and the "for" fails? Thanks
Regards
->Jogi
###########
void* operator()(void* item)
{
int i = 0 ;
while (!_ifs.eof()) {
i++ ;
_ifs >> _datap.name ;
_ifs >> _datap.val_a ;
_ifs >> _datap.val_b ;
_ifs >> _datap.val_c ;
if (_datap.name == "") break ;
std::cout << "Rm: " << _counter << " " << _datap.name << " " << _datap.val_a << " " << _datap.val_b << " " << _datap.val_c << std::endl ;
return &_datap ;
}
return NULL ;
}
##########
#include
#include
#include
#include
#include "tbb/task_scheduler_init.h"
#include "tbb/concurrent_hash_map.h"
#include "tbb/pipeline.h"
typedef struct
{
std::string name ;
int val_a ;
double val_b ;
int val_c ;
} MsgData;
typedef tbb::concurrent_hash_map<:STRING> MsgMap ;
class ReadMessage : public tbb::filter
{
public:
ReadMessage(int count = 10) : tbb::filter(serial_in_order), _count(count) {}
private:
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;
//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;
return &_data_msg ;
}
return NULL ;
}
int _count ;
MsgData _data_msg ;
};
class ProcessMessage : public tbb::filter
{
public:
ProcessMessage(MsgMap& name_map) : tbb::filter(parallel)
{
}
private:
void* operator()(void* item)
{
MsgData* datap = static_cast(item);
if (datap != NULL) {
//std::cout << datap->name << " " << datap->val_a << " " << datap->val_b << " " << datap->val_c << std::endl ;
MsgMap::accessor a ;
if (_name_map.find(a, datap->name)) {
a->second = *datap ;
}
else {
_name_map.insert(a, datap->name) ;
a->second = *datap ;
}
}
return datap ;
}
MsgMap _name_map ;
};
class PublishMessage : public tbb::filter
{
public:
PublishMessage() : tbb::filter(serial_in_order)
{
}
private:
void* operator()(void* item)
{
MsgData* datap = static_cast(item);
if (datap != NULL) {
std::cout << datap->name << " " << datap->val_a << " " << datap->val_b << " " << datap->val_c << std::endl ;
}
return NULL ;
}
};
int main(int argc, char* argv[])
{
MsgMap names ;
tbb::task_scheduler_init init ;
// Create the pipeline
tbb::pipeline pipeline;
// Create file-reading writing stage and add it to the pipeline
ReadMessage input_filter(10);
pipeline.add_filter(input_filter);
ProcessMessage process_filter(names);
pipeline.add_filter(process_filter);
PublishMessage output_filter;
pipeline.add_filter(output_filter);
// Run the pipeline
pipeline.run(1);
// Remove all filters from the pipeline
pipeline.clear();
return 0;
}
I have created a simple 3 stage pipe-line. The first stage creates some data, the second processes it and the third prints it out.
I am having a problem with the first stage. Instead of generating 10 messages then quitting it continues for ever (ok -- until I ctrl-c after 10 secs)
If I subsitute the "for" loop with a "while" loop reading data from a file it works fine!
Has anyone got any insight into why the "while" works and the "for" fails? Thanks
Regards
->Jogi
###########
void* operator()(void* item)
{
int i = 0 ;
while (!_ifs.eof()) {
i++ ;
_ifs >> _datap.name ;
_ifs >> _datap.val_a ;
_ifs >> _datap.val_b ;
_ifs >> _datap.val_c ;
if (_datap.name == "") break ;
std::cout << "Rm: " << _counter << " " << _datap.name << " " << _datap.val_a << " " << _datap.val_b << " " << _datap.val_c << std::endl ;
return &_datap ;
}
return NULL ;
}
##########
#include
#include
#include
#include
#include "tbb/task_scheduler_init.h"
#include "tbb/concurrent_hash_map.h"
#include "tbb/pipeline.h"
typedef struct
{
std::string name ;
int val_a ;
double val_b ;
int val_c ;
} MsgData;
typedef tbb::concurrent_hash_map<:STRING> MsgMap ;
class ReadMessage : public tbb::filter
{
public:
ReadMessage(int count = 10) : tbb::filter(serial_in_order), _count(count) {}
private:
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;
//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;
return &_data_msg ;
}
return NULL ;
}
int _count ;
MsgData _data_msg ;
};
class ProcessMessage : public tbb::filter
{
public:
ProcessMessage(MsgMap& name_map) : tbb::filter(parallel)
{
}
private:
void* operator()(void* item)
{
MsgData* datap = static_cast
if (datap != NULL) {
//std::cout << datap->name << " " << datap->val_a << " " << datap->val_b << " " << datap->val_c << std::endl ;
MsgMap::accessor a ;
if (_name_map.find(a, datap->name)) {
a->second = *datap ;
}
else {
_name_map.insert(a, datap->name) ;
a->second = *datap ;
}
}
return datap ;
}
MsgMap _name_map ;
};
class PublishMessage : public tbb::filter
{
public:
PublishMessage() : tbb::filter(serial_in_order)
{
}
private:
void* operator()(void* item)
{
MsgData* datap = static_cast
if (datap != NULL) {
std::cout << datap->name << " " << datap->val_a << " " << datap->val_b << " " << datap->val_c << std::endl ;
}
return NULL ;
}
};
int main(int argc, char* argv[])
{
MsgMap names ;
tbb::task_scheduler_init init ;
// Create the pipeline
tbb::pipeline pipeline;
// Create file-reading writing stage and add it to the pipeline
ReadMessage input_filter(10);
pipeline.add_filter(input_filter);
ProcessMessage process_filter(names);
pipeline.add_filter(process_filter);
PublishMessage output_filter;
pipeline.add_filter(output_filter);
// Run the pipeline
pipeline.run(1);
// Remove all filters from the pipeline
pipeline.clear();
return 0;
}
Link Copied
3 Replies
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
>>>>>
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;
//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;
return &_data_msg ;
}
return NULL ;
}
>>>>>>
Your for loop never gets to return NULL. Everytime the filter visits the token, your for loop rolls only once, even though you have " i < 10".
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;
//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;
return &_data_msg ;
}
return NULL ;
}
>>>>>>
Your for loop never gets to return NULL. Everytime the filter visits the token, your for loop rolls only once, even though you have " i < 10".
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Quoting - Vivek Rajagopalan
Hi Vivek, thanks for replying.
I don't understand the answer... I thought it would only call the filter *once* and the for loop would then run 10 times. Why does this work for the while...?
>>>>>
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;
//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;
return &_data_msg ;
}
return NULL ;
}
>>>>>>
Your for loop never gets to return NULL. Everytime the filter visits the token, your for loop rolls only once, even though you have " i < 10".
I don't understand the answer... I thought it would only call the filter *once* and the for loop would then run 10 times. Why does this work for the while...?
>>>>>
void* operator()(void* item)
{
for (int i = 0 ; i < 10 ; i++) {
std::stringstream strm ;
strm << "A" << i ;
_data_msg.name = strm.str() ;
_data_msg.val_a = i ;
_data_msg.val_b = i ;
_data_msg.val_c = i ;
//std::cout << i << " " << _data_msg.name << " " << _data_msg.val_a << " " << _data_msg.val_b << " " << _data_msg.val_c << std::endl ;
return &_data_msg ;
}
return NULL ;
}
>>>>>>
Your for loop never gets to return NULL. Everytime the filter visits the token, your for loop rolls only once, even though you have " i < 10".
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Quoting - Jogi Narain
Ok -- think I am getting this... the start of the pipline will be called until it returns NULL. Hence the terminating condition cannot reside in the called function. Hence thats why the while worked it was calling a external resource that was counting down items.
Is my understanding correct?
Is my understanding correct?

Reply
Topic Options
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page