Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2465 Discussions

Function node doesn't process messages sequentially

gabor_totharsanyi
1,416 Views

Hello TBB devs,

We're working on a new project for which we decided to go with TBB (2020.U3) as the main message processing framework. The project is basically feature complete, but we discovered a bug lately which occurred once in about every 10000 executions. After we tracked down the issue, it turned out to be a problem with the sequentiality of the message processing in the function node. It seems that the sequentiality is not always kept.

We wrote a dedicated test, which clearly shows the issue. We enter a sequence of numbers from 0 to N into a chain of 3 function nodes and we expect the same sequence of numbers at the end of the chain.
(Note that all the nodes have the default policy, which is queueing).

Here's the code:

 

#include <gtest/gtest.h>
#include <tbb/flow_graph.h>

TEST(TBB, function_node_message_order)
{
  const int MESSAGE_COUNT = 1'000'000;

  std::vector<int> received;
  received.reserve(MESSAGE_COUNT);
  tbb::flow::graph g;

  tbb::flow::function_node<int, int> funcNode1{
    g, tbb::flow::serial, [&](auto&& inp_) {
      return inp_;
    }
  };

  tbb::flow::function_node<int, int> funcNode2{
    g, tbb::flow::serial, [&](auto&& inp_) {
      return inp_;
    }
  };

  tbb::flow::function_node<int, int> funcNode3{
    g, tbb::flow::serial, [&](auto&& inp_) {
      received.push_back(inp_);
      return 0;
    }
  };

  tbb::flow::make_edge(funcNode1, funcNode2);
  tbb::flow::make_edge(funcNode2, funcNode3);

  for (int i = 0; i < MESSAGE_COUNT; ++i)
  {
    funcNode1.try_put(i);
  }

  g.wait_for_all();

  ASSERT_EQ(received.size(), MESSAGE_COUNT);
  for (int i = 0; i < MESSAGE_COUNT; ++i)
  {
    ASSERT_EQ(i, received[i]) << "Item " << i << " swapped with " << received[i];
  }
}

 

The last assertion fails at random positions.

We tried using an input node as well to feed the chain, but it didn't solve the issue.

When we replaced the function nodes with multifunction nodes without changing the functionality, the issue disappeared:

 

#include <gtest/gtest.h>
#include <tbb/flow_graph.h>

TEST(TBB, multifunction_node_message_order)
{
  const int MESSAGE_COUNT = 1'000'000;

  std::vector<int> received;
  received.reserve(MESSAGE_COUNT);
  tbb::flow::graph g;

  using multifunction_node = tbb::flow::multifunction_node<int, std::tuple<int>>;
  multifunction_node multiNode1{
    g, tbb::flow::serial,
    [&](auto&& inp_, [[maybe_unused]] auto&& outp_) {
      std::get<0>(outp_).try_put(inp_);
    }
  };

  multifunction_node multiNode2{
    g, tbb::flow::serial,
    [&](auto&& inp_, [[maybe_unused]] auto&& outp_) {
      std::get<0>(outp_).try_put(inp_);
    }
  };

  multifunction_node multiNode3{
    g, tbb::flow::serial,
    [&](auto&& inp_, [[maybe_unused]] auto&& outp_) {
      received.push_back(inp_);
    }
  };

  tbb::flow::make_edge(multiNode1, multiNode2);
  tbb::flow::make_edge(multiNode2, multiNode3);

  for (int i = 0; i < MESSAGE_COUNT; ++i)
  {
    multiNode1.try_put(i);
  }

  g.wait_for_all();

  ASSERT_EQ(received.size(), MESSAGE_COUNT);
  for (int i = 0; i < MESSAGE_COUNT; ++i)
  {
    ASSERT_EQ(i, received[i]) << "Item " << i << " swapped with " << received[i];
  }
}

 

Is this is the expected behavior of the function node (and then we misunderstood the documentation), or is it rather a bug/known issue?

I have found a similar report here:
https://community.intel.com/t5/Intel-oneAPI-Threading-Building/Nondeterministic-processing-order-for-function-node-with/td-p/1164061 


Thanks in advance!

1 Solution
Kevin_O_Intel1
Employee
1,298 Views

Some new insight:

When using a:


tbb::flow::function_node<int, int> my_node{g, tbb::flow::serial, …);


The “serial” policy guarantees that the body of the node will not execute concurrently with another copy of that body. It does not, however, make any guarantee about the order in which the messages are received in downstream nodes. To enforce ordering before a node, developers can use tbb::flow::sequencer_node.


How does this out-of-order behavior occur in practice? In your reproducer for example, it could be that funcNode2 finished executing its body on 955 on thread 1 before funcNode2 started executing its body on 996 on thread 2. But the execution of the body and the send/receive of the output message are not done atomically. So, its possible that after thread 1 finished the funcNode2 body on 995, thread 2 could start and finish the funcNode2 body on 996 and also send the result from 996, all before thread 1 completes the send of the output generated when the body was applied to 995. This is unlikely, but possible, which is why you see that most of the time things arrive in-order.  



View solution in original post

0 Kudos
6 Replies
Kevin_O_Intel1
Employee
1,382 Views

Hi,

I will duplicate the issue and file a bug report.

Thanks for reporting the issue.

Regards


Kevin_O_Intel1
Employee
1,334 Views

It would be helpful if you could attach a test case that I could execute. Snippets of code are great but I'd like to observe the behavior during execution.

Thanks


0 Kudos
GergelyGocza
Beginner
1,325 Views

Hi Kevin,

I could repro the problem with TBB 2020 U3 on Arch Linux on AMD and Intel CPU as well (Ryzen 3700x  and 8th gen Core I5).

Everything was compiled with GCC 10.2.0 with C++17 standard.

The following code resulted in similar problem as in the opening post:

#include <tbb/flow_graph.h>

using namespace tbb::flow;

int main()
{
  graph g;

  input_node<int> input{
    g,
    [](tbb::flow_control& fc_)
    {
      static int i{0};

      if (i > 1'000'000) { fc_.stop(); }

      return i++;
    }};

  function_node<int, int> f1{
    g,
    serial,
    [](auto&& input_)
    {
      static int i = 0;

      if (input_ != i)
      {
        throw std::runtime_error("PROBLEM IN F1 at " + std::to_string(i));
      }

      i++;
      return input_;
    }
  };

  function_node<int> f2{
    g,
    serial,
    [](auto&& input_)
    {
      static int i = 0;

      if (input_ != i)
      {
        throw std::runtime_error("PROBLEM IN F2 at " + std::to_string(i));
      }

      i++;
    }
  };

  make_edge(input, f1);
  make_edge(f1, f2);

  input.activate();
  g.wait_for_all();

  return 0;
}

It fails at various positions with output like this:

terminate called after throwing an instance of 'std::runtime_error'
  what():  PROBLEM IN F2 at 35183

 

Thanks,
Gergely

Kevin_O_Intel1
Employee
1,303 Views

Hi,

I duplicated what you are seeing with something similar. I was only able to get it to report diffs when I used a large number of iterations. Around 1000. Let me check with our architects.


0 Kudos
Kevin_O_Intel1
Employee
1,299 Views

Some new insight:

When using a:


tbb::flow::function_node<int, int> my_node{g, tbb::flow::serial, …);


The “serial” policy guarantees that the body of the node will not execute concurrently with another copy of that body. It does not, however, make any guarantee about the order in which the messages are received in downstream nodes. To enforce ordering before a node, developers can use tbb::flow::sequencer_node.


How does this out-of-order behavior occur in practice? In your reproducer for example, it could be that funcNode2 finished executing its body on 955 on thread 1 before funcNode2 started executing its body on 996 on thread 2. But the execution of the body and the send/receive of the output message are not done atomically. So, its possible that after thread 1 finished the funcNode2 body on 995, thread 2 could start and finish the funcNode2 body on 996 and also send the result from 996, all before thread 1 completes the send of the output generated when the body was applied to 995. This is unlikely, but possible, which is why you see that most of the time things arrive in-order.  



0 Kudos
gabor_totharsanyi
1,288 Views

Hi Kevin,

Thanks for the reply! I have one comment regarding the documentation, though. However, the documentation doesn't state that the messages would be processed sequentially with queueing policy and serial concurrency, these two words combined together can easily mislead any developer, since there's nothing really explicit about this in the documentation (or at least I couldn't find it). Also, having seen a lot of online examples, where people assumed that the messages are processed in order, also proves what I've just underlined before.

Thanks,
Gabor

0 Kudos
Reply