Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2465 Discussions

Sometime messages are not forwarded by a broadcast_node to a multifunction_node

gabor_totharsanyi
908 Views

Hello TBB devs,

We're working on a new project based on TBB flow graphs. We found an issue recently that I already reported in this post:
https://community.intel.com/t5/Intel-oneAPI-Threading-Building/Function-node-doesn-t-process-messages-sequentially/m-p/1216316#M14710 

Now it seems that we have found another issue that sometimes the messages submitted to a broadcast node are not forwarded to the connected multifunction node. The issue occurs very rarely about once in every 50.000 executions.

To reproduce the problem we created a test where we sequentially feed the broadcast node with new data and wait for the data to arrive at the multifunction node without the subsequent steps being overlapped.
Here's the test code:

 

#include <gtest/gtest.h>
#include <tbb/flow_graph.h>

#include <chrono>
#include <future>

TEST(TBB, broadcast_node_multifunction_node_message_passing_1)
{
  const int MESSAGE_COUNT = 1'000'000;

  tbb::flow::graph g;

  tbb::flow::broadcast_node<int> broadcastNode{g};

  std::promise<int> output;
  using multifunction_node = tbb::flow::multifunction_node<int, std::tuple<int>>;
  multifunction_node multiNode{
    g, tbb::flow::serial,
    [&](auto&& inp_, [[maybe_unused]] auto&& outp_) {
      output.set_value(inp_);
    }
  };

  tbb::flow::make_edge(broadcastNode, multiNode);

  for (int i = 0; i < MESSAGE_COUNT; ++i)
  {
    const auto task = std::async([&] {
      //gtest macros are thread-safe only on systems where pthreads is available
      ASSERT_TRUE(broadcastNode.try_put(i));
    });
    task.wait();
    auto fut = output.get_future();
    ASSERT_EQ(fut.wait_for(5s), std::future_status::ready) << "Item " << i << " not processed";
    ASSERT_EQ(i, fut.get());
    output = std::promise<int>{};
  }

  g.wait_for_all();
}

 

The second assert (the future status test) fails at random positions.
Note that try_put still returns true.

We implemented the same test with busy waits and a single thread to feed the graph and we got the same result:

 

TEST(TBB, broadcast_node_multifunction_node_message_passing_2)
{
  const int MESSAGE_COUNT = 5'000'000;

  tbb::flow::graph g;

  tbb::flow::broadcast_node<int> broadcastNode{g};

  std::atomic_int output = -1;
  using multifunction_node = tbb::flow::multifunction_node<int, std::tuple<int>>;
  multifunction_node multiNode{
    g, tbb::flow::serial,
    [&](auto&& inp_, [[maybe_unused]] auto&& outp_) {
      output.store(inp_);
    }
  };

  tbb::flow::make_edge(broadcastNode, multiNode);

  std::atomic_bool canProceed = false;
  std::atomic_int inputData = -1;
  auto producer = std::thread([&] {
    while (true)
    {
      if (canProceed.load())
      {
        canProceed.store(false);
        const auto val = inputData.load();
        //gtest macros are thread-safe only on systems where pthreads is available
        ASSERT_TRUE(broadcastNode.try_put(val));
        if (val == MESSAGE_COUNT - 1)
        {
          break;
        }
      }
      std::this_thread::yield();
    }
  });

  for (int i = 0; i < MESSAGE_COUNT; ++i)
  {
    inputData.store(i);
    canProceed.store(true);
    const auto start = std::chrono::high_resolution_clock::now();
    while (output.load() != i && std::chrono::high_resolution_clock::now() - start < 5s)
    {
      std::this_thread::yield();
    }
    ASSERT_EQ(output.load(), i) << "Item " << i << " not processed";
    ASSERT_EQ(i, output.load());
  }

  g.wait_for_all();

  if (producer.joinable()) { producer.join(); }
}

 


The issue seems to disappear when replacing the broadcast_node with a lightweight multifunction node without changing the functionality:

 

TEST(TBB, broadcast_node_multifunction_node_message_passing)
{
  const int MESSAGE_COUNT = 100'000;

  tbb::flow::graph g;

  using lightweight_multifunction_node =
    tbb::flow::multifunction_node<int, std::tuple<int>, tbb::flow::lightweight>;

  lightweight_multifunction_node lwMultiNode{
    g, tbb::flow::serial,
    [&](auto&& inp_, [[maybe_unused]] auto&& outp_) {
      std::get<0>(outp_).try_put(inp_);
    }
  };

  std::promise<int> output;
  using multifunction_node = tbb::flow::multifunction_node<int, std::tuple<int>>;
  multifunction_node multiNode{
    g, tbb::flow::serial,
    [&](auto&& inp_, [[maybe_unused]] auto&& outp_) {
      output.set_value(inp_);
    }
  };

  tbb::flow::make_edge(tbb::flow::output_port<0>(lwMultiNode), multiNode);

  for (int i = 0; i < MESSAGE_COUNT; ++i)
  {
    const auto task = std::async([&] {
      //gtest macros are thread-safe only on systems where pthreads is available
      ASSERT_TRUE(lwMultiNode.try_put(i));
    });
    task.wait();
    auto fut = output.get_future();
    ASSERT_EQ(fut.wait_for(5s), std::future_status::ready) << "Item " << i << " not processed";
    ASSERT_EQ(i, fut.get());
    output = std::promise<int>{};
  }

  g.wait_for_all();
}

 

The TBB version we're using is 2020.U3 and we could reproduce the issue on multiple x86_64 platforms with few to many core CPUs.
We also tested version 2018.U9 and it seems the issue is present there as well.


I'm looking forward to receiving your reply.

Thanks in advance!

0 Kudos
5 Replies
gabor_totharsanyi
871 Views

It turned out that the tasks are not always executed by calling try_put alone, so running wait_for_all on the graph parallely in a separate thread solved the problem. (I'm not sure, if this is the intended behavior, though.)

0 Kudos
Kevin_O_Intel1
Employee
853 Views

Hi.

I will reproduce the issue and get you answer.

Regards


0 Kudos
Kevin_O_Intel1
Employee
839 Views

Seems like the answer I provided on the other thread covers this as well. Can you confirm?

Thanks


0 Kudos
gabor_totharsanyi
818 Views

Hi Kevin!

I think this is a totally separate problem.

Basically my question regarding this phenomenon is, if calling try_put on a node would always ensure that the submitted data will be processed without having to call wait_for_all on the graph.

In other words, if I have a multifunction_node with queueing policy for example and I call try_put on the node with some input data, will the framework ensure that the submitted data gets processed by the multifunction node's body as the effect of calling the try_put function alone, or do I need to call wait_for_all to ensure that the processing will take place.

This is what the originally posted example is about, because sometimes the call to try_put alone doesn't trigger the processing.

I wasn't sure, if this behavior was correct, but this is the expected behavior, then I assume that the wait_for_all function has to be called each time I have some input that I want to be processed by the graph before receiving any additional input. In this case it would make sense to run wait_for_all in a loop in a separate thread.

Thanks

0 Kudos
Kevin_O_Intel1
Employee
793 Views

Thank you for clarifying. 

My understanding is that this is the intended behavior.

But let me know if you have additional questions.

0 Kudos
Reply