/* Copyright 2005-2008 Intel Corporation. All Rights Reserved. This file is part of Threading Building Blocks. Threading Building Blocks is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 as published by the Free Software Foundation. Threading Building Blocks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Threading Building Blocks; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA As a special exception, you may use this file as part of a free software library without restriction. Specifically, if other files instantiate templates or use macros or inline functions from this file, or you compile this file and link it with other files to produce an executable, this file does not by itself cause the resulting executable to be covered by the GNU General Public License. This exception does not however invalidate any other reasons why the executable file might be covered by the GNU General Public License. */ /* Changes: - all member variables now have prefix m_ and are declared first (after friends) - abbreviate filter::{prev,next}_filter_in_pipeline to filter::m_{prev,next} - reorder filter members as m_pipeline, m_prev, m_next before the others - rename pipeline::filter_{list,end} to pipeline::m_filter_{first,last} with appropriate description - move stage_task above ordered_buffer to make ordered_buffer::put_token a non-template function - ordered_buffer::grow back inline for simplicity - eliminate not_in_pipeline() because NULL==m_pipeline now plays that role - get rid of current_version, __TBB_PIPELINE_VERSION, version_mask, pipeline::inject_token - added some curly braces to reduce risk of introducing bugs; who's->whose; corrsponding->corresponding - rename filter_ parameter to f and other changes that are not listen in such detail anymore - pipeline::m_end_of_input must be atomic (otherwise a race by definition) */ #ifndef __TBB_pipeline_H #define __TBB_pipeline_H #include "atomic.h" #include "task.h" #include namespace tbb { class pipeline; class filter; //! @cond INTERNAL namespace internal { typedef unsigned long Token; typedef long tokendiff_t; class stage_task; class ordered_buffer; } // namespace internal //! @endcond //! A stage in a pipeline. /** @ingroup algorithms */ class filter { friend class internal::stage_task; friend class pipeline; private: // state //! Pointer to the pipeline; NULL if not currently in a pipeline. pipeline* m_pipeline; //! Pointer to any previous filter in the pipeline; NULL otherwise. filter* m_prev; //! Pointer to any next filter in the pipeline; NULL otherwise. filter* m_next; //! Input buffer for filter that requires serial input; NULL otherwise. internal::ordered_buffer* m_input_buffer; //! Internal storage for is_serial() and is_ordered() const unsigned char m_filter_mode; private: //! m_filter_mode mask for parallel vs. serial static const unsigned char filter_is_serial = 0x1<<0; //! m_filter_mode mask for order // The bit was not set for parallel filters in TBB 2.1 and earlier, // but is_ordered() function always treats parallel filters as out of order static const unsigned char filter_is_out_of_order = 0x1<<1; public: enum mode { //! processes multiple items in parallel and in no particular order parallel = filter_is_out_of_order, //! processes items one at a time; all such filters process items in the same order serial_in_order = filter_is_serial, //! processes items one at a time and in no particular order serial_out_of_order = filter_is_serial | filter_is_out_of_order, //! @deprecated use serial_in_order instead serial = serial_in_order }; protected: filter( bool is_serial_ ) : m_pipeline(NULL), m_prev(NULL), m_next(NULL), m_input_buffer(NULL), m_filter_mode(static_cast(is_serial_ ? serial : parallel)) {} filter( mode filter_mode ) : m_pipeline(NULL), m_prev(NULL), m_next(NULL), m_input_buffer(NULL), m_filter_mode(static_cast(filter_mode)) {} public: //! True if filter is serial. bool is_serial() const { return bool( m_filter_mode & filter_is_serial ); } // ! True if filter must receive stream in order. bool is_ordered() const { return (m_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial; } //! Operate on an item from the input stream, and return item for output stream. /** Returns NULL if filter is a sink. */ virtual void* operator()( void* item ) = 0; //! Destroy filter. /** If the filter was added to a pipeline, the pipeline must be destroyed first. */ virtual __TBB_EXPORTED_METHOD ~filter(); }; //! A processing pipeling that applies filters to items. /** @ingroup algorithms */ class pipeline { friend class internal::stage_task; friend class filter; private: // state //! Pointer to any first filter in the pipeline; NULL otherwise. filter* m_filter_first; //! Pointer to any last filter in the pipeline; NULL otherwise. filter* m_filter_last; //! task whose reference count is used to determine when all stages are done. empty_task* m_end_counter; //! Number of idle tokens waiting for input stage. atomic m_input_tokens; //! Number of tokens created so far. internal::Token m_token_counter; //! False until fetch_input returns NULL. (TODO: what is fetch_input?) // TODO: this was not atomic before (which made it technically a race, though), so maybe relaxed_atomic is enough atomic m_end_of_input; public: //! Construct empty pipeline. __TBB_EXPORTED_METHOD pipeline(); /** Though the current implementation declares the destructor virtual, do not rely on this detail. The virtualness is deprecated and may disappear in future versions of TBB. */ virtual __TBB_EXPORTED_METHOD ~pipeline(); //! Add filter to end of pipeline. void __TBB_EXPORTED_METHOD add_filter( filter& f ); //! Run the pipeline to completion. void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens ); //! Remove all filters from the pipeline void __TBB_EXPORTED_METHOD clear(); private: //! Remove filter from pipeline (for use by ~filter()) void remove_filter( filter& f ); }; } // tbb #endif /* __TBB_pipeline_H */