- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi.
I wrote paralllelized streaming data processing program using flow graph.
Flow graph is really simple right now. (but will be complicated, so prefer flow graph than parallel_pipeline)
"
(input_node) make_test_dataset-> (function_node) data_process_node -> (function_node) store_processed_data_and_free_chunk_memory
"
Problem is that output frame order is shuffled.
I found this problem is originated by OS scheduling, maybe interrupt or something.
The process time of function_node is varying so latter frame can be finished sometimes.
Setting core affinity severely mitigates frame shuffling problem.
Solution may be indexing of each frame and re-ordering, but are there solutions with tbb's functions?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello.
I'm sorry but, I found mistakes in original source codes during writing reproducer.
I had to monitor the compute_node (function_node) but monitored output_node yesterday.
Problem is solved with sequencer_node, but I attached simple reproducer with mistake.
#define _USE_MATH_DEFINES
#include <mkl.h>
#include <oneapi/tbb/flow_graph.h>
#include <cmath>
#include <cstring>
#include <iostream>
#include <unistd.h>
class src_body{
MKL_LONG m_F,m_f,m_N;
MKL_Complex8* m_arr_all_frame;
public:
src_body(MKL_LONG F,MKL_LONG N,MKL_Complex8* arr_all_frame):m_F(F),m_N(N),m_arr_all_frame(arr_all_frame){};
MKL_Complex8* operator()(oneapi::tbb::flow_control& fc){
if(m_f<m_F){
MKL_Complex8* arr{(MKL_Complex8*)mkl_malloc(m_N*8,64)};
memcpy(arr,m_arr_all_frame+m_N*m_f,m_N*8);
m_f++;
return arr;
}else{
fc.stop();
return nullptr;
}
}
};
int main(){
MKL_LONG N{1048576},F{500};
MKL_Complex8* arr_all_frame{(MKL_Complex8*)mkl_malloc(N*F*8,64)};
MKL_Complex8* arr_out{(MKL_Complex8*)mkl_malloc(N*F*8,64)};
oneapi::tbb::flow::graph g;
std::atomic<int> f_compute_node{},f_out_node{};
// init test dataset
for(MKL_LONG f=0;f<F;f++){
for(MKL_LONG n=0;n<N;n++){
arr_all_frame[f*N+n].real=sin(2*M_PI*10*f*n*1e-6);
arr_all_frame[f*N+n].imag=0;
}
}
// define node
oneapi::tbb::flow::input_node<MKL_Complex8*> read_adc_node(g,src_body(F,N,arr_all_frame));
oneapi::tbb::flow::function_node<MKL_Complex8*,MKL_Complex8*> compute_node(g,4,[&](MKL_Complex8* arr){
int i_max=rand();
for(int i=0;i<i_max;i++){
float f=i+1;
}
f_compute_node++;
// std::cout<<f_compute_node++<<std::endl; // monitor frame order1
return arr;
});
oneapi::tbb::flow::function_node<MKL_Complex8*,int> output_node(g,1,[&](MKL_Complex8* arr){
std::cout<<f_compute_node<<std::endl; // monitor frame order2 (Mistake: monitored here)
memcpy(arr_out+N*f_out_node++,arr,N*8);
mkl_free(arr);
return 0;
});
// connect edge
oneapi::tbb::flow::make_edge(read_adc_node,compute_node);
oneapi::tbb::flow::make_edge(compute_node,output_node);
// flow data
std::chrono::high_resolution_clock::time_point t0{std::chrono::high_resolution_clock::now()};
read_adc_node.activate();
g.wait_for_all();
std::chrono::duration<float,std::milli> d{std::chrono::high_resolution_clock::now()-t0};
std::cout<<"process time : "<<d.count()<<" ms"<<std::endl;
mkl_free(arr_out);
mkl_free(arr_all_frame);
return 0;
}
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page