Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2477 Discussions

task_scheduler_observer is no longer called per-thread

jefffaust
New Contributor I
2,538 Views

This question is a follow-up to

 

https://community.intel.com/t5/Intel-oneAPI-Threading-Building/task-scheduler-observer-behavior-change-after-upgrade/m-p/1416026

 

We use task_scheduler_observer to initiate thread-local storage for a 3rd party component. This component builds up a cache for each thread. The previous behavior is that on_scheduler_entry was called once per thread. Now it is called many times per thread, causing us to lose that hard-earned cache.

 

I can't seem to upload the test code. I get the error 

"The attachment's test.cpp content type (text/plain) does not match its file extension and has been removed."

It's definitely a text file. I even saved as ASCII in case unicode was not allowed.

 

I'll try attaching afterwards.

 

-Jeff

0 Kudos
1 Solution
Alexey-Kukanov
Employee
2,174 Views

Hello Jeff,

 

It seems that the C++ thread-local destructor guarantees do not automatically apply for detached threads, which TBB uses by default. It is possible to extend the TLS based solution to this case; perhaps the simplest way for that is to use TBB's feature to wait for thread completion, as Mark suggested below. In that case, threads are not detached but joined. A couple more references to the information & examples: https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Migration_Guide/Task_Scheduler_Init.html#terminating-onetbb-scheduler, https://spec.oneapi.io/versions/latest/elements/oneTBB/source/task_scheduler/scheduling_controls/task_scheduler_handle_cls.html. In case you use TBB from multiple application threads or create additional task_arenas, you'd need to make sure that all task_arenas are terminated and app threads are either complete or also call tbb::finalize().

An alternative approach to ensure that TLS destructors are called in detached threads is to use https://en.cppreference.com/w/cpp/thread/notify_all_at_thread_exit or https://en.cppreference.com/w/cpp/thread/promise/set_value_at_thread_exit. You'd need to call one of these methods in each TBB thread (presumably, in the on_scheduler_entry() observer),  and then in the main thread wait for futures obtained from the promises/packaged tasks, as shown in the examples at cppreference.

 

Hope that helps. We'd appreciate your response.

View solution in original post

0 Kudos
16 Replies
jefffaust
New Contributor I
2,529 Views

I was not able to upload a file. Here's the code:

 

#include <algorithm>
#include <iostream>
#include <oneapi/tbb/parallel_pipeline.h>
#include <oneapi/tbb/task_scheduler_observer.h>
#include <random>
#include <span>
#include <utility>

// Environment:
// OS: Windows 10
// TBB Version: 2021.6.0
// Compiler: Visual Studio 2022 17.2.3 with /std:c++20

// a nice but slightly tortured exercise for parallel_pipeline, inspired from https://en.wikipedia.org/wiki/Monte_Carlo_method.
class CalculatePi
{
  public:
    struct Position
    {
        double x;
        double y;
    };

    using Sample = Position;
    struct Result
    {
        bool inUnitCircle;
    };

    using Measurement = std::pair<Sample, Result>;
    using Measurements = std::span<Measurement>;

    explicit CalculatePi(size_t numSamples)
        : m_N(numSamples)
    {
    }

    size_t generate(Measurements measurements)
    {
        if (m_generated >= m_N)
        {
            return 0;
        }

        // only generate up to m_N samples
        const auto count = std::min(measurements.size(), static_cast<size_t>(m_N - m_generated));
        measurements = { measurements.begin(), count };

        // generate random points in a unit square
        for (auto& [sample, result] : measurements)
        {
            sample.x = m_distrib(m_mt);
            sample.y = m_distrib(m_mt);
        }

        m_generated += count;
        return count;
    }

    void calculate(Measurement& m)
    {
        calculate(m.first, m.second);
    }

    void calculate(const Position& p, Result& r)
    {
        r.inUnitCircle = p.x * p.x + p.y * p.y < 1.0;
    }

    void aggregate(Measurements ms)
    {
        m_total += ms.size();
        m_inCircle += std::ranges::count_if(ms, [](Measurement& m) { return m.second.inUnitCircle; });
    }

    double pi() const
    {
        return 4 * static_cast<double>(m_inCircle) / m_total;
    }

  private:
    std::mt19937_64 m_mt{ 0 };
    std::uniform_real_distribution<> m_distrib{ 0., 1. };

    size_t m_total{};
    size_t m_inCircle{};
    const size_t m_N;
    size_t m_generated{};
};

class TaskObserver : public tbb::task_scheduler_observer
{
  public:
    TaskObserver() = default;
    TaskObserver(tbb::task_arena& a)
        : tbb::task_scheduler_observer(a)
    {
    }

    void on_scheduler_entry(bool is_worker) override
    {
        if (is_worker)
        {
            // This call is to set up TLS for a 3rd party component. The intent is that this will be called one time for each new
            // TBB thread, and eventually stopTLSForModeller will be called for each. Even though these two calls in themselves are
            // not expensive, the modeler caches information, and this cache is expensive to populate, and we don't want to destroy
            // it until we need to.

            // The old behavior is that totalEntry/Exit would equal the number of threads.
            ++totalEntry;
        }
    }

    void on_scheduler_exit(bool is_worker) override
    {
        if (is_worker)
        {
            ++totalExit;
        }
    }

    std::atomic<size_t> totalEntry{};
    std::atomic<size_t> totalExit{};
};

void runPipeline()
{
    using Range = std::pair<size_t, size_t>;

    const size_t TokensPerThread = 4;
    const size_t SamplesPerToken = 128;
    auto numTokens = tbb::this_task_arena::max_concurrency() * TokensPerThread;
    auto bufferSize = SamplesPerToken * numTokens;

    // circular buffer for pipeline
    auto measurements = std::vector<CalculatePi::Measurement>(bufferSize);

#ifdef _DEBUG
    const size_t count = 1'000'000;
#else
    const size_t count = 100'000'000;
#endif

    auto calculator = CalculatePi(count);
    size_t samplesGenerated{};

    auto generate = [&](tbb::flow_control& fc) {
        auto startIdx = samplesGenerated % bufferSize;
        auto count = calculator.generate(std::span(measurements.begin() + startIdx, SamplesPerToken));
        if (count == 0)
        {
            fc.stop();
        }

        samplesGenerated += count;
        return Range(startIdx, startIdx + count);
    };

    auto calculate = [&](Range r) {
        for (auto i = r.first; i < r.second; ++i)
        {
            calculator.calculate(measurements[i]);
        }
        return r;
    };

    auto aggregate = [&](Range r) {
        calculator.aggregate(std::span(measurements.begin() + r.first, r.second - r.first));
    };

    auto filters = tbb::filter<void, Range>(tbb::filter_mode::serial_in_order, generate)
                   & tbb::filter<Range, Range>(tbb::filter_mode::parallel, calculate)
                   & tbb::filter<Range, void>(tbb::filter_mode::serial_in_order, aggregate);

    tbb::parallel_pipeline(numTokens, filters);

    std::cout << "Pi is " << calculator.pi() << std::endl;
}

void runPipelineWithoutTaskArena()
{
    std::cout << "Without Task Arena" << std::endl;
    TaskObserver observer;
    observer.observe();
    runPipeline();
    std::cout << "Entry/Exit: " << observer.totalEntry << '/' << observer.totalExit << std::endl << std::endl;
}

void runPipelineWithTaskArena()
{
    std::cout << "With Task Arena" << std::endl;
    tbb::task_arena arena;
    TaskObserver observer(arena);
    observer.observe();
    arena.execute([] { runPipeline(); });
    std::cout << "Entry/Exit: " << observer.totalEntry << '/' << observer.totalExit << std::endl;
}

int main()
{
    runPipelineWithoutTaskArena();
    runPipelineWithTaskArena();
}

 

0 Kudos
NoorjahanSk_Intel
Moderator
2,514 Views

Hi,


Thanks for reaching out to us.


Could you please provide us with the expected and observed results? 

Also please let us know how you are checking the number of times on_scheduler_entry was called.


Thanks & Regards,

Noorjahan.


0 Kudos
jefffaust
New Contributor I
2,504 Views

Hi Noorjahan,

 

I expect entry/exit to be called once per thread, so 10 on my machine. At least, that was the previous behavior we were relying on. The current number changes from run-to-run and scales with the number of samples. With 1e6 samples, it's ~3500 calls to entry/exit. With 1e8 samples, it's around 150k calls.

I'm counting by incrementing simple std::atomic<size_t> members.

 

-Jeff

0 Kudos
jefffaust
New Contributor I
2,413 Views

Hi Noorjahan,

 

Any feedback on this? Do you have the information you need?

 

-Jeff

 

0 Kudos
Mark_L_Intel
Moderator
2,410 Views

Hello Jeffrey,


Your findings have been confirmed. A possible resolution has been discussed.




0 Kudos
Mark_L_Intel
Moderator
2,389 Views

Hello Jeffrey,

 

It is confirmed that during the revamp effort (moving to oneTBB), this specific observer feature you are mentioning had been dropped.  It seems unlikely that the old behavior will be returned. The current plan is to add a new observer class or an alternative callback mechanism. The current estimate (for such implementation) is sometime next year.

 

 The workaround is to use a thread_local variable (e.g. of type bool) to track whether a particular thread has already called the observer before or not.

 

 

 

0 Kudos
jefffaust
New Contributor I
2,383 Views

Hi Mark,

 

I'm happy this has been identified as a problem, and that it will be addressed.

 

Unfortunately, the workaround is not sufficient. We would need to know when the final on_scheduler_exit is invoked so we can free up the subsystem data. We can't clean up this information later because we have to do it on each of the threads thread. It's a 3rd party library, so we can't simply iterate over that TLS data.

 

For a workaround, we would need a way to invoke a function on existing TBB threads. Is there a way to do this?

 

-Jeff

 

0 Kudos
Alexey-Kukanov
Employee
2,358 Views

Hello Jeff,

I think the best way to achieve your goals would be to create a special class for thread-local initialization and destruction, e.g. like this:

 

 

class third_party_init_and_cleanup {
    bool init_done = false;
public:
    void init() {
        // do the necessary initialization
        init_done = true;
    }
    void cleanup() {
        // do the necessary cleanup
        init_done = false;
    }
    bool ready() { 
        return init_done;
    }
    third_party_init_and_cleanup() {
        init();
    }
    ~third_party_init_and_cleanup() {
        if (init_done)
            cleanup();
    }
};

thread_local third_party_init_and_cleanup tl_init_clean;

 

 

Then you can create a thread-local variable of this type; the language implementation should guarantee then that the destructor is called on each thread. And in order to make sure the variable  (and the library of interest) is initialized, you could use it in the TBB observer on_scheduler_entry() method, e.g. like this:

 

//  within on_scheduler_entry
    if (!tl_init_clean.ready())
        tl_init_clean.init();

 

 

0 Kudos
Mark_L_Intel
Moderator
2,300 Views

Hello Jeff,


Any feedback on Alexey Kukanov post from 10-28-2022 above?


0 Kudos
jefffaust
New Contributor I
2,294 Views

think it will work, but have yet to try it, sorry. Other priorities at the moment, but I expect to give it a try in the next couple days.

 

-Jeff

 

0 Kudos
jefffaust
New Contributor I
2,279 Views

Hi Alexey and Mark,

 

I don't see the destructor being called for the thread_local object for any threads created by TBB. I tried this first in our product, and then in a standalone application.

Is there something I'm supposed to call to shut down TBB?

 

0 Kudos
Alexey-Kukanov
Employee
2,175 Views

Hello Jeff,

 

It seems that the C++ thread-local destructor guarantees do not automatically apply for detached threads, which TBB uses by default. It is possible to extend the TLS based solution to this case; perhaps the simplest way for that is to use TBB's feature to wait for thread completion, as Mark suggested below. In that case, threads are not detached but joined. A couple more references to the information & examples: https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Migration_Guide/Task_Scheduler_Init.html#terminating-onetbb-scheduler, https://spec.oneapi.io/versions/latest/elements/oneTBB/source/task_scheduler/scheduling_controls/task_scheduler_handle_cls.html. In case you use TBB from multiple application threads or create additional task_arenas, you'd need to make sure that all task_arenas are terminated and app threads are either complete or also call tbb::finalize().

An alternative approach to ensure that TLS destructors are called in detached threads is to use https://en.cppreference.com/w/cpp/thread/notify_all_at_thread_exit or https://en.cppreference.com/w/cpp/thread/promise/set_value_at_thread_exit. You'd need to call one of these methods in each TBB thread (presumably, in the on_scheduler_entry() observer),  and then in the main thread wait for futures obtained from the promises/packaged tasks, as shown in the examples at cppreference.

 

Hope that helps. We'd appreciate your response.

0 Kudos
Mark_L_Intel
Moderator
2,252 Views

Could you provide a snippet to illustrate what you are seeing vs. what you would expect in regards to your question.


FYI, oneTBB supports explicit library termination:

https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Initializing_and_Terminating_the_Library.html


0 Kudos
Mark_L_Intel
Moderator
2,190 Views

Could you please reply? Are you still interested in this issue?


0 Kudos
jefffaust
New Contributor I
2,144 Views

Hi Mark,

 

I'm on vacation until next week, but this is high on my list. I need to try what you recommended above. 

Thanks for your patience,

 

-Jeff

 

0 Kudos
jefffaust
New Contributor I
2,130 Views

Calling tbb::finalize was the last missing piece. Everything is now working like it did before.

 

Thanks for the help.

0 Kudos
Reply