Hi, I`m using flow graph to create an image processing pipeline, there is a queue node on input where I put a couple of images and a limiter node directly after that, all processing multifunction nodes connect with each other if the step succeeded and to the limiter decrement if failed. Problem is that decrementer never decrements it`s counter even though i`ve forced the first processing node to fail and it definetely sends a continue_msg, and wait_for_all terminates after processing the amount of nodes that was supplied as a limit to the limit_node constructor. Below is the code: struct eProcessingNodePort { enum Val { Failure, Success }; }; template void ProcessResult(ImageProcessingGraph* g, ScratchImage* result, const InputT& input, PortsT& ports, const ThWideString& stageName) { if (result) { THOR_INF_W("Stage [%ls] has succeeded for input texture %ls")(textureToolsLogTag, stageName.c_str(), input.m_InputPath.c_str()); std::get(ports).try_put(ProcessorInput(input, result)); } else { THOR_ERR_W("Stage [%ls] has failed for input texture %ls")(textureToolsLogTag, stageName.c_str(), input.m_InputPath.c_str()); g->AddFailedTag(input.m_Tag); std::get(ports).try_put(continue_msg()); } } void InitNodes(ThSize maxImagesAtOnce) { m_Source = new SourceNode(*m_Graph); m_Limiter = new LimiterNode(*m_Graph, maxImagesAtOnce); m_Loader = new ImageLoaderNode(*m_Graph, unlimited, [this] (const InputType &input, ImageLoaderNode::output_ports_type &ports) { ScratchImage* result = 0; //if (!IsTagFailed(input.m_Tag)) <----------------------------------------------forced failure // result = LoadImageStep(input.m_InputPath, input.m_Options); if (result && input.m_NumItems > 1) { const TexMetadata& info = result->GetMetadata(); if ( info.arraySize > 1 || info.depth > 1 || info.mipLevels > 1 || info.IsCubemap() ) { THOR_ERR_W("Can't assemble complex surfaces, file = %ls")(textureToolsLogTag, input.m_InputPath.c_str()); delete result; result = 0; } } ProcessResult(this, result, input, ports, L"Load Image"); } ); m_Decompress = new ImageProcessorNode(*m_Graph, unlimited, [this] (const ProcessorInput &input, ImageProcessorNode::output_ports_type &ports) { ScratchImage* result = 0; if (!IsTagFailed(input.m_Tag)) result = DecompressStep(input.m_Image); ProcessResult(this, result, input, ports, L"Decompress"); } ); m_FlipRotate = new ImageProcessorNode(*m_Graph, unlimited, [this] (const ProcessorInput &input, ImageProcessorNode::output_ports_type &ports) { ScratchImage* result = 0; if (!IsTagFailed(input.m_Tag)) result = FlipRotateStep(input.m_Image, input.m_Options); ProcessResult(this, result, input, ports, L"FlipRotate"); } ); m_Resize = new ImageProcessorNode(*m_Graph, unlimited, [this] (const ProcessorInput &input, ImageProcessorNode::output_ports_type &ports) { ScratchImage* result = 0; if (!IsTagFailed(input.m_Tag)) result = ResizeStep(input.m_Image, input.m_Options); ProcessResult(this, result, input, ports, L"Resize"); } ); m_ConvertFormat = new ImageProcessorNode(*m_Graph, unlimited, [this] (const ProcessorInput &input, ImageProcessorNode::output_ports_type &ports) { ScratchImage* result = 0; if (!IsTagFailed(input.m_Tag)) result = ConvertStep(input.m_Image, input.m_Options); ProcessResult(this, result, input, ports, L"Convert Format"); } ); m_PremultiplyAlpha = new ImageProcessorNode(*m_Graph, unlimited, [this] (const ProcessorInput &input, ImageProcessorNode::output_ports_type &ports) { ScratchImage* result = 0; if (!IsTagFailed(input.m_Tag)) result = PremultiplyAlphaStep(input.m_Image, input.m_Options); ProcessResult(this, result, input, ports, L"Premultiply Alpha"); } ); m_Compress = new ImageProcessorNode(*m_Graph, unlimited, [this] (const ProcessorInput &input, ImageProcessorNode::output_ports_type &ports) { ScratchImage* result = 0; if (!IsTagFailed(input.m_Tag)) result = CompressStep(input.m_Image, input.m_Options); ProcessResult(this, result, input, ports, L"Compress"); } ); m_GenerateMipmaps = new ImageProcessorNode(*m_Graph, unlimited, [this] (const ProcessorInput &input, ImageProcessorNode::output_ports_type &ports) { ScratchImage* result = 0; if (!IsTagFailed(input.m_Tag)) result = GenerateMipmapsStep(input.m_Image, input.m_Options); ProcessResult(this, result, input, ports, L"Generate Mipmaps"); } ); m_Output = new OutputNode(*m_Graph, unlimited, [this] (const ProcessorInput& input)->continue_msg { OutputStep(input); return continue_msg(); } ); } virtual void LinkNodes() { make_edge(*m_Source, *m_Limiter); make_edge(*m_Limiter, *m_Loader); make_edge(output_port(*m_Loader), *m_Decompress); make_edge(output_port(*m_Loader), m_Limiter->decrement); LinkProcessors(m_Decompress, m_FlipRotate); LinkProcessors(m_FlipRotate, m_Resize); LinkProcessors(m_Resize, m_ConvertFormat); LinkProcessors(m_ConvertFormat, m_GenerateMipmaps); LinkProcessors(m_GenerateMipmaps, m_PremultiplyAlpha); LinkProcessors(m_PremultiplyAlpha, m_Compress); make_edge(output_port(*m_Compress), *m_Output); make_edge(output_port(*m_Compress), m_Limiter->decrement); make_edge(*m_Output, m_Limiter->decrement); } void LinkProcessors(ImageProcessorNode* from, ImageProcessorNode* to) { make_edge(output_port(*from), *to); make_edge(output_port(*from), m_Limiter->decrement); } typedef std::tuple MultiFunctionPorts; typedef queue_node SourceNode; typedef limiter_node LimiterNode; typedef multifunction_node ImageLoaderNode; typedef multifunction_node ImageProcessorNode; typedef function_node OutputNode; SourceNode* m_Source; LimiterNode* m_Limiter; ImageLoaderNode* m_Loader; ImageProcessorNode* m_Decompress; ImageProcessorNode* m_FlipRotate; ImageProcessorNode* m_Resize; ImageProcessorNode* m_ConvertFormat; ImageProcessorNode* m_PremultiplyAlpha; ImageProcessorNode* m_Compress; ImageProcessorNode* m_GenerateMipmaps; OutputNode* m_Output;