Community
cancel
Showing results for 
Search instead for 
Did you mean: 
Highlighted
New Contributor I
325 Views

OneAPI callbacks and async operations

Can OneAPI/SYCL use callbacks like OpenCL?

The purpose is to:
1. enqueue kernel operation
2. async the runtime calls the callback function when the kernel finishes
3. callback function: do some management and enqueue read operation
4. async the runtime calls the second callback function when the read op finishes
5. second callback function: do some management, continue in 1 or do other stuff.

How can be done this with OneAPI? What could be the most performant way? I need to do such decisions and management as soon as the op finishes, and dynamically do new ops.
I searched for events and callbacks in oneAPI and none of the examples show any similar (neither the guide)

0 Kudos
6 Replies
Highlighted
Moderator
307 Views

Hi,

 

SYCL is a high-level abstraction over OpenCL. It abstracts the kernel enqueue operation, performed by the command group handler.

 

As per SYCL 1.2.1 specifications currently, there is no direct functionality in SYCL to trigger a host callback from a device function. However, Developers can manually wait on an event and call the callback function afterward.

 

find the below skeleton code which mimics the callback functionality.

 

 

//test_event.cpp

#include <future>
#include <CL/sycl.hpp>
#include <array>
#include <iostream>

using namespace cl::sycl;
class test_event;

int main(){

{ //SYCL scope
cpu_selector device_selector;
//buffers
queue q(device_selector);
auto submit_event=q.submit([&](handler &h) {
//accessors
range<1> num_items{100};
h.parallel_for<class test_event>(num_items, [=](id<1> i) {
// your logic
});
});

auto fut = std::async([&]() {
  // wait for the kernels to finish
  submit_event.wait();
  // calling the callback function after the kernel execution
  //your own callback funtion
std::cout<<"This is a callback function"<<std::endl;
});
}
}

 

 

 

Command to compile the code:

$dpcpp test_event.cpp -lpthread -o test_event

$./test_event

 

For more information on the event class, you can refer to the SYCL 1.2.1 specs document. page: 81

https://www.khronos.org/registry/SYCL/specs/sycl-1.2.1.pdf

 

Thanks & Regards

Goutham

 

0 Kudos
Highlighted
New Contributor I
269 Views

Hi, Goutham.

Thanks for answering and for the example.

I tried to continue with your example, providing the usage of callbacks to get the functionality I described in the post, but it was a mess with async functions (not recursive). What I would like to know is better code (performance) with oneAPI, it does not matter if its critic.

What I have done as a test, is using C++ threads to support parallelism between devices (two different queues), and doing management after every kernel+read operation. So, I can select custom chunks or policies.

I have two questions:

1. Is there any way to say in oneAPI/sycl to perform the kernel operation but not the buffer writing, and then to perform the buffer writing? So, I can do operations after the kernel ops, and after the buffer ops. Now it only works with both kernel+writing buffs (after the submit).

2. My code below shows how I am doing now the management (the comment line: // TODO: Logic after executing/writing), where I could use mutex, etc to control the policies between the queues/devices. But I did not achieve following the async path (callbacks). What I want is the better approach regarding less overhead. Am I doing right?

Currently, the overheads are really high compared with pure cpu or pure gpu execution. The double queue (one per device), even using threads, is really slow, so, no parallelism is exploited or the overheads are higher (eg. N = 1024 * 1024 * 200 gives 7-7.3s in GPU+CPU, but only 5.6 in CPU and 7 in GPU.) If it would be parallel and in a perfect scenario (minimum overheads) it would need in CPU+GPU around 3s. Why is not achieved?

 

#include <future>
#include <CL/sycl.hpp>
#include <array>
#include <iostream>

using namespace cl::sycl;

class test_event_cpu;

class test_event_gpu;

auto start = std::chrono::high_resolution_clock::now();

enum class Mode {
    CPU, GPU, CPUGPU
};

void process(bool cpu, int *ptr1, int *ptr2, int psize, int poffset) {
    { // SYCL scope
        std::cout << "selecting\n";
        device_selector *sel;
        if (cpu) {
            sel = new cpu_selector();
            std::cout << "cpu selector\n";
        } else {
            sel = new gpu_selector();
            std::cout << "gpu selector\n";
        }

        queue q(*sel);
        bool cont = true;

        int rest_size = psize;
        int offset = poffset;
        int min_split = psize * 0.5 > 256 ? psize * 0.5 : 256;

        while (cont) {

            int size = 0;
            if (rest_size > 0) {
                if (rest_size >= min_split) {
                    size = min_split;
                    rest_size -= min_split;
                } else {
                    size = rest_size;
                    rest_size = 0;
                }
            } else {
                cont = false;
                break;
            }

            auto R = sycl::range<1>(size);
            sycl::buffer<int, 1> bufv1((ptr1 + offset), R);
            sycl::buffer<int, 1> bufv2((ptr2 + offset), R);

            auto end = std::chrono::high_resolution_clock::now();
            std::chrono::duration<double> diff = end - start;
            if (cpu) {
                std::cout << diff.count() << " cpu submit\n";
            } else {
                std::cout << diff.count() << " gpu submit\n";
            }

            std::cout << "Running on: " << q.get_device().get_info<sycl::info::device::name>() << " size: " << size
                      << " offset: " << offset << "\n";
            auto submit_event = q.submit([&](handler &h) {
                auto av1 = bufv1.get_access<sycl::access::mode::read_write>(h);
                auto av2 = bufv2.get_access<sycl::access::mode::read_write>(h);

//        range<1> num_items{100};
                h.parallel_for<class test_event>(R, [=](id<1> i) {
                    av1[i] += av2[i]; //Storing output in Input 1
                });
            });

            submit_event.wait();

            // TODO: Logic after executing/writing

            end = std::chrono::high_resolution_clock::now();
            diff = end - start;
            if (cpu) {
                std::cout << diff.count() << " cpu callback function\n";
            } else {
                std::cout << diff.count() << " gpu callback function\n";
            }

            offset += size;

        }

        free(sel);
    }
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> diff = end - start;
    std::cout << diff.count() << " exit\n";
}

int main(int argc, char *argv[]) {
    std::string type;
    std::string n;
    Mode mode = Mode::CPU;
    bool check = false;
    float prop = 0.5;
    int N = 0;
    if (argc > 1) {
        type = argv[1];
    }
    if (argc > 2) {
        N = std::stoi(argv[2]);
    }
    if (argc > 3) {
        prop = std::stof(argv[3]);
        if (prop <= 0.0 || prop >= 1.0) {
            std::cout << "prop cpu should be between 0.0 and 1.0 (both exclusive)\n";
            return 1;
        }
    }
    if (argc > 4) {
        check = std::string(argv[4]) == "check";
    }
    bool valid = false;
    if (N > 0) {
        if (type == "cpugpu") {
            mode = Mode::CPUGPU;
            valid = true;
        } else if (type == "cpu") {
            mode = Mode::CPU;
            valid = true;
        } else if (type == "gpu") {
            mode = Mode::GPU;
            valid = true;
        }
    }

    if (!valid) {
        std::cout << "usage: (cpu|gpu|cpugpu) <N> [<prop float> (check)]\n";
        std::cout << "   prop cpu: 0.5 (default)\n";
        return 1;
    }

    std::vector < int > v1(N); //Input Vector 1
    std::vector<int> v2(N); //Input Vector 2
    for (int i = 0; i < N; i++) {
        v1[i] = i;
        v2[i] = N - i;
    }

//  for(int i =0; i < 10; i++)
//    std::cout<< v1[i] << " " << v2[i] << "\n";

    if (mode == Mode::CPU) {
        process(true, v1.data(), v2.data(), N, 0);
    } else if (mode == Mode::GPU) {
        process(false, v1.data(), v2.data(), N, 0);
    } else {
        std::thread t1(process, false, v1.data(), v2.data(), N / 2, 0);
        process(true, v1.data(), v2.data(), N / 2, N / 2);
        t1.join();
    }
    if (check) {
        std::vector<int> validate_vector(N, N); //Validating
        validate_vector == v1 ? std::cout << "Vector addition: Success\n" : std::cout << "Vector addition: Failure\n";
    }
}

 

 

Executions:

 

$ /usr/bin/time ./vector-add cpu $(( 1024 * 1024 * 200 ))
selecting
cpu selector
4.951 cpu submit
Running on: Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz size: 104857600 offset: 0
5.30803 cpu callback function
5.30845 cpu submit
Running on: Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz size: 104857600 offset: 104857600
5.41896 cpu callback function
5.47205 exit
5.27user 0.92system 0:05.64elapsed 109%CPU (0avgtext+0avgdata 1811968maxresident)k
0inputs+0outputs (0major+426445minor)pagefaults 0swaps

$ /usr/bin/time ./vector-add gpu $(( 1024 * 1024 * 200 ))
selecting
gpu selector
4.91563 gpu submit
Running on: Intel(R) Gen9 size: 104857600 offset: 0
6.25979 gpu callback function
6.46781 gpu submit
Running on: Intel(R) Gen9 size: 104857600 offset: 104857600
7.12051 gpu callback function
7.28631 exit
5.21user 2.23system 0:07.50elapsed 99%CPU (0avgtext+0avgdata 1790868maxresident)k
0inputs+0outputs (0major+433372minor)pagefaults 0swaps



$ /usr/bin/time ./vector-add cpugpu $(( 1024 * 1024 * 200 )) 0.5
selecting
cpu selector
selecting
gpu selector
5.06096 gpu submit
Running on: Intel(R) Gen9 size: 52428800 offset: 0
5.39732 cpu submit
Running on: Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz size: 52428800 offset: 104857600
6.02278 gpu callback function
6.38588 cpu callback function
6.39302 cpu submit
Running on: Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz size: 52428800 offset: 157286400
6.46705 gpu submit
Running on: Intel(R) Gen9 size: 52428800 offset: 52428800
6.54794 cpu callback function
7.0073 gpu callback function
7.10593 exit
7.11082 exit
6.18user 2.05system 0:07.38elapsed 111%CPU (0avgtext+0avgdata 1886576maxresident)k
0inputs+0outputs (0major+444851minor)pagefaults 0swaps

$ /usr/bin/time ./vector-add cpugpu $(( 1024 * 1024 * 200 )) 0.3
selecting
cpu selector
selecting
gpu selector
4.88729 gpu submit
Running on: Intel(R) Gen9 size: 52428800 offset: 0
5.26138 cpu submit
Running on: Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz size: 52428800 offset: 104857600
5.80888 gpu callback function
6.13874 cpu callback function
6.14046 cpu submit
Running on: Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz size: 52428800 offset: 157286400
6.24131 gpu submit
Running on: Intel(R) Gen9 size: 52428800 offset: 52428800
6.24927 cpu callback function
6.66753 gpu callback function
6.75738 exit
6.7659 exit
6.09user 1.88system 0:07.01elapsed 113%CPU (0avgtext+0avgdata 1886944maxresident)k
0inputs+0outputs (0major+444864minor)pagefaults 0swaps
$ /usr/bin/time ./vector-add cpugpu $(( 1024 * 1024 * 200 )) 0.7
selecting
cpu selector
selecting
gpu selector
4.73818 gpu submit
Running on: Intel(R) Gen9 size: 52428800 offset: 0
5.08337 cpu submit
Running on: Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz size: 52428800 offset: 104857600
5.65442 gpu callback function
6.11104 cpu callback function
6.11228 cpu submit
Running on: Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz size: 52428800 offset: 157286400
6.14915 gpu submit
Running on: Intel(R) Gen9 size: 52428800 offset: 52428800
6.20512 cpu callback function
6.65678 gpu callback function
6.72001 exit
6.75553 exit
5.81user 1.89system 0:07.07elapsed 109%CPU (0avgtext+0avgdata 1886596maxresident)k
0inputs+0outputs (0major+444859minor)pagefaults 0swaps

 

0 Kudos
Highlighted
Moderator
230 Views

Hi,

We are working on your query internally. We will get back to you about the same soon.


Thanks & Regards

Goutham


0 Kudos
Highlighted
Moderator
204 Views

Hi,

Apologize for the delay!

 

Regarding your 1st query " Is there any way to say in oneAPI/sycl to perform the kernel operation but not the buffer writing":

You can use a queue.wait() method, which ensures that the kernel gets completed but the data doesn't get copied back until the buffer destruction(SYCL Scope) gets called or you explicitly use host accessor to access the data.

 

So, you can use the queue.wait() method after the submit call and perform your operations which you wanted to execute before the buffer gets copied back to the host, after that, either use host accessor to access the updated host data or directly use the host data after buffer destruction (end SYCL Scope).

 

Please find the below sample code with buffer destructor(SYCL Scope).

 

#include <CL/sycl.hpp>

#define SIZE 1024

using namespace cl::sycl;

int main() {
  std::vector<int> a(SIZE), b(SIZE), c(SIZE);
  for (int i = 0; i<SIZE; ++i) {
    a[i] = i;
    b[i] = -i;
    c[i] = i;
     }

  std::cout<<"Data initialization \n a    b     c "<<std::endl;
  for(int i =0;i<10;i++){
        std::cout<<" "<<a[i]<<"    "<<b[i]<<"    "<<c[i]<<std::endl;
     }

  { // SYCL SCOPE
    range<1> a_size{SIZE};

    queue d_queue(sycl::default_selector{}); // default_selector

    std::cout<<"Running on: "<<d_queue.get_device().get_info<sycl::info::device::name>()<<std::endl;

    buffer<int, 1> a_device(a.data(), a_size);
    buffer<int, 1> b_device(b.data(), a_size);
    buffer<int, 1> c_device(c.data(), a_size);

    d_queue.submit([&](handler &cgh) {
      auto c_res= c_device.get_access<access::mode::write>(cgh);
      auto a_in = a_device.get_access<access::mode::read>(cgh);
      auto b_in = b_device.get_access<access::mode::read>(cgh);

        cgh.parallel_for<class ex1>(a_size,[=](id<1> idx) {
        c_res[idx] = a_in[idx] + b_in[idx];

        });

     });

   d_queue.wait(); //Waiting for kernel to finish (but the buffer data c_device is not copied back to to vector c yet)

  std::cout<<"Before buffer destruction: old values"<<std::endl;
  for(int i =0;i<10;i++){
        std::cout<<c[i]<<" "; //OLD VALEUS
     }
   std::cout<<std::endl;
} //SYCL END (Buffer destruction)

  std::cout<<"After buffer destruction: updated values"<<std::endl;
  for(int i =0;i<10;i++){
        std::cout<<c[i]<<" ";
     }
printf("\n");


}

 

Output:

Data initialization
 a    b     c
 0    0    0
 1    -1    1
 2    -2    2
 3    -3    3
 4    -4    4
 5    -5    5
 6    -6    6
 7    -7    7
 8    -8    8
 9    -9    9
Running on: Intel(R) Gen9
Before buffer destruction old values
0 1 2 3 4 5 6 7 8 9
After buffer destruction updated values
0 0 0 0 0 0 0 0 0 0

 

Is this what you are looking for? If not, could you please elaborate more on what exactly you are looking for.

 

Regarding the execution time of two different device queues:

Currently as the CPU "Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz" and iGPU "Intel(R) Gen9" are on two different platforms, hence two different contexts are getting created for them.

As they are two different SYCL contexts and any buffer you create will be a part of a context, so sync has to happen across contexts. Here, in your code sample, this context overhead is significant compared to kernel execution. Hence we see the time difference.

 

For a more detailed explanation on this, please refer to the below comment posted by @RahulV_intel

 

https://community.intel.com/t5/Intel-oneAPI-Base-Toolkit/Basic-example-of-parallel-execution-2-devic...

 

Please let us know if this helps.

 

 

Thanks & Regards

Goutham

 

Highlighted
Moderator
136 Views

Hi,

Could you please confirm if your issue is resolved? If not let us know the same so that we can help you resolve it.


Thanks & Regards

Goutham


0 Kudos
Highlighted
Moderator
124 Views

Hi,

As we haven't heard back from you, we are considering that your issue has been resolved and we have answered all your queries. So we will no longer respond to this thread.

If you require any additional assistance from Intel, please start a new thread.

Any further interaction in this thread will be considered community only 


Have a Good day!


Thanks & Regards

Goutham


0 Kudos