Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Streaming data processing

Kohn-Sham
New Contributor I
493 Views

Hi.

 

I wrote paralllelized streaming data processing program using flow graph.

 

Flow graph is really simple right now. (but will be complicated, so prefer flow graph than parallel_pipeline)

"

 (input_node) make_test_dataset-> (function_node) data_process_node -> (function_node) store_processed_data_and_free_chunk_memory

"

 

Problem is that output frame order is shuffled.

I found this problem is originated by OS scheduling, maybe interrupt or something.

The process time of function_node is varying so latter frame can be finished sometimes.

Setting core affinity severely mitigates frame shuffling problem.

 

Solution may be indexing of each frame and re-ordering, but are there solutions with tbb's functions?

 

 

0 Kudos
1 Solution
Mark_L_Intel
Moderator
444 Views

Hello @Kohn-Sham, please provide a reproducer. Have you looked at sequencer_node

View solution in original post

0 Kudos
2 Replies
Mark_L_Intel
Moderator
445 Views

Hello @Kohn-Sham, please provide a reproducer. Have you looked at sequencer_node

0 Kudos
Kohn-Sham
New Contributor I
409 Views

Hello.

I'm sorry but, I found mistakes in original source codes during writing reproducer.

I had to monitor the compute_node (function_node) but monitored output_node yesterday.

Problem is solved with sequencer_node, but I attached simple reproducer with mistake.

 

 

#define _USE_MATH_DEFINES
#include <mkl.h>
#include <oneapi/tbb/flow_graph.h>
#include <cmath>
#include <cstring>
#include <iostream>
#include <unistd.h>


class src_body{
    MKL_LONG m_F,m_f,m_N;
    MKL_Complex8* m_arr_all_frame;

public:
    src_body(MKL_LONG F,MKL_LONG N,MKL_Complex8* arr_all_frame):m_F(F),m_N(N),m_arr_all_frame(arr_all_frame){};
    MKL_Complex8* operator()(oneapi::tbb::flow_control& fc){
        if(m_f<m_F){
            MKL_Complex8* arr{(MKL_Complex8*)mkl_malloc(m_N*8,64)};
            memcpy(arr,m_arr_all_frame+m_N*m_f,m_N*8);
            m_f++;
            return arr;
        }else{
            fc.stop();
            return nullptr;
        }
    }
};

int main(){
    MKL_LONG            N{1048576},F{500};
    MKL_Complex8*       arr_all_frame{(MKL_Complex8*)mkl_malloc(N*F*8,64)};
    MKL_Complex8*       arr_out{(MKL_Complex8*)mkl_malloc(N*F*8,64)};
    oneapi::tbb::flow::graph g;
    std::atomic<int>    f_compute_node{},f_out_node{};

    // init test dataset
    for(MKL_LONG f=0;f<F;f++){
        for(MKL_LONG n=0;n<N;n++){
            arr_all_frame[f*N+n].real=sin(2*M_PI*10*f*n*1e-6);
            arr_all_frame[f*N+n].imag=0;
        }
    }
    
    // define node
    oneapi::tbb::flow::input_node<MKL_Complex8*> read_adc_node(g,src_body(F,N,arr_all_frame));
    oneapi::tbb::flow::function_node<MKL_Complex8*,MKL_Complex8*> compute_node(g,4,[&](MKL_Complex8* arr){
        int i_max=rand();
        for(int i=0;i<i_max;i++){
            float f=i+1;
        }
        f_compute_node++;
        // std::cout<<f_compute_node++<<std::endl; // monitor frame order1
        return arr;
    });
    oneapi::tbb::flow::function_node<MKL_Complex8*,int> output_node(g,1,[&](MKL_Complex8* arr){
        std::cout<<f_compute_node<<std::endl; // monitor frame order2 (Mistake: monitored here)
        memcpy(arr_out+N*f_out_node++,arr,N*8);
        mkl_free(arr);
        return 0;
    });

    // connect edge
    oneapi::tbb::flow::make_edge(read_adc_node,compute_node);
    oneapi::tbb::flow::make_edge(compute_node,output_node);

    // flow data
    std::chrono::high_resolution_clock::time_point t0{std::chrono::high_resolution_clock::now()};
    read_adc_node.activate();
    g.wait_for_all();
    std::chrono::duration<float,std::milli> d{std::chrono::high_resolution_clock::now()-t0};
    std::cout<<"process time : "<<d.count()<<" ms"<<std::endl;
    
    mkl_free(arr_out);
    mkl_free(arr_all_frame);
    return 0;
}

 

 

0 Kudos
Reply