#define TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR 1 #define TBB_PREVIEW_FLOW_GRAPH_NODES 1 #include #include #include #include #include #include #include #include #include #include #include "nbxDataBuffer.h" using async_node_type = tbb::flow::async_node< double, double>; using gateway_type = async_node_type::gateway_type; using msg_t = nbxDataBuffer*; /// /// Input filter /// class InputFilter { public: InputFilter(nbxDataBufferMetadata md) : next_buffer_index{ 0 }, buffers_counter{ 0 }, BUFFERS_CAPACITY(BUFFERS_CAPACITY_DEFAULT) { //reserve slots for buffers this->buffers.reserve(BUFFERS_CAPACITY); //add to buffer for (auto i = 0; i < this->BUFFERS_CAPACITY; i++) { //push to container this->buffers.emplace_back(std::make_unique()); } //allocate for (auto const& b : this->buffers) { //allocate b->ConfigureResultsBlock(md); } } ///// ///// Copy constructor. ///// //InputFilter(const InputFilter&) : // next_buffer_index{ 0 }, // buffers_counter{ 0 }, // BUFFERS_CAPACITY(BUFFERS_CAPACITY_DEFAULT) //{ //} InputFilter(const InputFilter&) = delete; /// /// Disable assign operator /// InputFilter& operator = (const InputFilter&) = delete; /// /// Move constructor. /// /// Object to move. /// Class instance. InputFilter(InputFilter&& other) noexcept : next_buffer_index{ 0 }, buffers_counter{ 0 }, BUFFERS_CAPACITY(BUFFERS_CAPACITY_DEFAULT) { //invoke move assignement *this = std::move(other); } /// /// Move assignement. /// /// Object to assign to. /// Move object. InputFilter& operator = (InputFilter&& other) noexcept { //check for self assignement if (this != &other) { //move the buffer this->next_buffer_index = other.next_buffer_index; this->buffers_counter = other.buffers_counter; this->buffers = std::move(other.buffers); } //done return *this; } /// /// Destructor. /// ~InputFilter() {}; /// /// Call operator. /// /// Flow graph control. /// Generated message (data buffer). msg_t operator()(tbb::flow_control& fc) const { //get item from circular buffer auto b = this->buffers.at(this->next_buffer_index).get(); //set ID b->SetID(this->buffers_counter); //check flag --> stop reading if (this->buffers_counter > 20) { //log LOG(INFO) << "Input completed."; //end fc.stop(); } //setup next buffer this->next_buffer_index = (this->next_buffer_index + 1) % BUFFERS_CAPACITY; //info LOG(INFO) << "Input node: buffer ID: " << b->GetID(); //increment this->buffers_counter += 1; //done return b; } private: /// /// Buffers capacity. This value decides /// maximum number of threads when executing pipeline. /// int BUFFERS_CAPACITY; /// /// Buffers capacity default capacity /// static const int BUFFERS_CAPACITY_DEFAULT = 15; /// /// Circular buffer. /// std::vector> buffers; /// /// Index of the next buffer to read. /// mutable int next_buffer_index; /// /// Counter of data buffers. /// mutable int buffers_counter; }; int build_and_run_GF_g0() { //graph tbb::flow::graph g; //metadata nbxDataBufferMetadata md(10, 20, 1); //input filter auto input_filter = std::make_shared(md); //InputFilter input_filter(md); // copy constructor works fine //source node tbb::flow::input_node input_node(g, std::cref(input_filter)); //limiter tbb::flow::limiter_node< msg_t > limiter_node(g, 7); //start inpuy input_node.activate(); //wait for completion g.wait_for_all(); //done return 0; } int main(int argc, char* argv[]) { //run graph build_and_run_GF_g0(); }