Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.
2464 Discussions

Tutorial for concurrent_queue with parallel_while?

ijfk
Beginner
820 Views
As stated before, I'm quite new to TBB and trying to write some example projects to get familiar and measure performance.

One thing that I'd like to do, is to utlilize "parallel_while" in combination with a concurrent_queue. The idea is that a network server constantly adds items to the queue, which is in turn processed in parallel by parallel_while.

I cannot seem to figure it out, and I also cannot seem to find any documentation.

I'm currently stuck at trying to get an iterator to the queue, doing something like this:

concurrent_queue <:STRING> fileQueue;
concurrent_queue <:STRING>::iterator myIt = fileQueue.begin();

which yields

error C2039: 'begin' : is not a member of 'tbb::strict_ppl::concurrent_queue'
with
[
T=std::string
]

I'm not sure why there wouldn't be a begin() member. Are there any examples somebody could point me to? Again, I'd just like to process queue items (order of processing is not relevant) in parallel.

Thanks.
0 Kudos
9 Replies
RafSchietekat
Valued Contributor III
820 Views
I guess you mean parallel_do() instead of parallel_while(). It used to be that parallel_while() worked with concurrent_queue() the way you intend to use them, but that code has been deprecated, although you could consult the Reference for the details on how to still use it. Alternatively, you could write an adapter iterator type yourself.
0 Kudos
ijfk
Beginner
820 Views
Thank you.

OK, I would use parallel_do then.

In regards to the queue, is the only way to get items out of the queue to call try_pop() or to use the const_iterator?
0 Kudos
Alexey-Kukanov
Employee
820 Views
The methods to obtain an iterator over concurrent_queue are called unsafe_begin() and unsafe_end(). And iterators should not be used concurrently with pushing and popping data to/from the queue; that's why the methods have unsafe_ prefix.

parallel_do + concurrent_queue will probably not work for you, because a) iterators are not concurrency-safe, and b) parallel_do will stop when encountering end() for the first time, despite of your network server being able to add more work later.

pipeline/parallel_pipeline or the new flow graph API might be a better suite for your scenario.
0 Kudos
ijfk
Beginner
820 Views
I see. Is there an example for the graph api? I found the reference, but I can't make much of it by reading it. An example would definitely be helpful.

I tried to access several examples from the intel web site, all of which returned a 404 not found error.

At this point I really don't know any more than when I started.

I seem to understand how a pipeline works, but it seems as if it's only useful if I have a fixed data set (e.g. a vector with 1000 items).

I figured I could just create a global concurrent queue, and have the network server populate the queue. Then, the first filter of the pipeline (say the Producer class) would obtain items from that queue and pass them to the other filters using try_pop(). The problem is that popping the item permanently removes it, so the other filters can't access it anymore. Storing it in a member variable of the Producer class doesn't work either, since the items are being pulled asynchronously by the pipeline.

Is there something I am missing?
0 Kudos
RafSchietekat
Valued Contributor III
820 Views
#2 "OK, I would use parallel_do then."
I meant that parallel_do is the one that takes 2 iterators. The problem you describe didn't apply to parallel_while().

#3 "parallel_do + concurrent_queue will probably not work for you, because a) iterators are not concurrency-safe, and b) parallel_do will stop when encountering end() for the first time, despite of your network server being able to add more work later."
An input iterator class comparable to std::istream_iterator can be written on top of concurrent_queue::try_pop(), either by the user or, preferably, provided with the library. If provided with the library, it could be called tbb::concurrent_queue_input_iterator (a bit unwieldy) or tbb::concurrent_queue::input_iterator (probably better), or it could be an opaque type obtained from new tbb::concurrent_queue operations pop_begin()/pop_end() or so (with the obvious conclusion about push_begin()/push_end()). The user of such an iterator would of course be responsible for consuming any element whose presence was detected, but there would also be no limitations on concurrency. If you like the idea, I could contribute such code.

#4 "I seem to understand how a pipeline works, but it seems as if it's only useful if I have a fixed data set (e.g. a vector with 1000 items)."
A concurrent_queue() is a perfectly good source of input, even if, as you rightly surmised, to continue work in subsequent filters you would need to copy popped values onto the heap or another convenient location like a ring buffer, or, preferably, pop them directly into such a location (although you should probably not copy directly to new'ed storage unless most attempts are expected to be successful).
0 Kudos
Alexey-Kukanov
Employee
820 Views
An input iterator class comparable to std::istream_iterator can be written on top of concurrent_queue::try_pop(), either by the user or, preferably, provided with the library. If provided with the library, it could be called tbb::concurrent_queue_input_iterator (a bit unwieldy) or tbb::concurrent_queue::input_iterator (probably better), or it could be an opaque type obtained from new tbb::concurrent_queue operations pop_begin()/pop_end() or so (with the obvious conclusion about push_begin()/push_end()). The user of such an iterator would of course be responsible for consuming any element whose presence was detected, but there would also be no limitations on concurrency. If you like the idea, I could contribute such code.


Before we added parallel_pipeline() function and the flow graph API, I for some time wanted such an interface that provides a[begin, end) iterator pair on top of potentially endless data source, i.e. the thing that parallel_do lacks compared to parallel_while. Now, I am not sure whether it is necessary, because a two-stage parallel_pipeline and a relatively simple flow graph are quite good alternatives I think. Except maybe if the algorithm also produces new items on the fly, which could be added to parallel_do() with its feeder interface but not to parallel_pipeline(); though with the graph adding on the fly is also possible.

So now I think the TBB team at Intel is unlikely to develop such feature. However, we will accept a contribution, and will make it available as an Extra download at the very least. If you (or anybody else) decides to develop such a thing, my recommendation is to make it a template classthat accepts any input source meeting a certain set of requirements (such as having try_pop()), rather to tie it to concurrent_queue only.

0 Kudos
Alexey-Kukanov
Employee
820 Views
Quoting ijfk
I see. Is there an example for the graph api? I found the reference, but I can't make much of it by reading it. An example would definitely be helpful.


Latest TBB open-source packages contain a couple of examples. Also, there are a few blog articles about the graph, though the API may have changed a bit since those were published.

Quoting ijfk
I seem to understand how a pipeline works, but it seems as if it's only useful if I have a fixed data set (e.g. a vector with 1000 items).

I figured I could just create a global concurrent queue, and have the network server populate the queue. Then, the first filter of the pipeline (say the Producer class) would obtain items from that queue and pass them to the other filters using try_pop(). The problem is that popping the item permanently removes it, so the other filters can't access it anymore. Storing it in a member variable of the Producer class doesn't work either, since the items are being pulled asynchronously by the pipeline.


Why do you think pipeline is only useful with a fixed data set? There is the problem with the master thread busy waiting if a worker thread is blocked in a pipeline filter; but other than that, it should work without imposing any limitation on the number of items available in the input source.

As for storing the data to pass to subsequent filters: as Raf noted, you need to allocate memory foran item, and pass the pointer to that memory to the next filter. And with parallel_pipeline() function, it's even easier than that: its filters are typed (i.e. accept and return variables of any type, rather than just pointers-to-void) and allocate/deallocate memory as necessary for data passing, so you don't even have to bother.

0 Kudos
RafSchietekat
Valued Contributor III
820 Views
#6 "a two-stage parallel_pipeline and a relatively simple flow graph are quite good alternatives I think"
They would do the trick, but why withhold STL-standard glue that could also connect any number of existing algorithms (subject to testing as described below)?

#6 "my recommendation is to make it a template class that accepts any input source meeting a certain set of requirements (such as having try_pop()), rather to tie it to concurrent_queue only"
With 3 classes providing try_pop() (concurrent_queue, concurrent_bounded_queue, concurrent_priority_queue), and since a higher-level selection mechanism can always be provided on top later on, that would be quite appropriate... unless the underlying interface ought to be extended as described below.

It is interesting to consider the responsibilities here. try_pop() imposes the allocation responsibility on the caller and merges probing and fetching in a single operation, so an iterator adaptor class has to resolve the impedance mismatch (engineering term) by providing temporary internal storage (involving a copy operation behind the scene), and there is a non-STL-like requirement (on top of the basic input iterator requirements) for the client to consume any input whose presence was probed through that same iterator instance (normally a client could test for end of input before testing for, e.g., number of elements in the current batch, but now it needs to short-circuit after any other test than end of input before fetching the element). With a bespoke iterator with privileged access beyond the try_pop() operation, the queue could assume more of those responsibilities and be safer with untested clients, perhaps leveraging tbb::enumerable_thread_specific, but that would require a study on the performance impact. Otherwise maybe there should be something stricter than "input iterator" in STL to reflect the additional usage requirement.

I'll see what I can come up with later today for a simple adaptor on top of try_pop().
0 Kudos
RafSchietekat
Valued Contributor III
820 Views
"Otherwise maybe there should be something stricter than "input iterator" in STL to reflect the additional usage requirement."
The specification seems vaguer than I thought, and even istream_iterator seems to buffer one value internally, so...

Here's a self-contained proof of concept, tested with Linux/g++/tbb30_20110419oss (latest tweak around 2011-07-25 17:20 UTC):
[cpp]// ANSI C
#include  // assert()
#include  // EXIT_SUCCESS
// C++
#include  // std::copy
#include  // std::cout
// TBB
#define TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE 1
#include "tbb/concurrent_queue.h"
#include "tbb/concurrent_priority_queue.h"
#include "tbb/mutex.h"
#include "tbb/parallel_for_each.h"

/**
 * @param Q Queue type with value_type and push().
 */
template 
class push_iterator : public std::iterator<:OUTPUT_ITERATOR_TAG> {
    private: // state
        typedef typename Q::value_type T;
        Q * m_Q;
    public:
        push_iterator(Q & ar_Q) : m_Q(&ar_Q) {}

        void operator=(const T& value) { m_Q->push(value); }

        push_iterator & operator* ()    { return *this; }
        push_iterator & operator++()    { return *this; }
        push_iterator & operator++(int) { return *this; }
};

/**
 * @param Q Queue type with value_type and try_get().
 */
template 
class try_pop_iterator : public std::iterator<:INPUT_ITERATOR_TAG> {
    private: // state
        typedef typename Q::value_type T;
        Q * m_Q;
        T m_T;
    public:
        try_pop_iterator()         : m_Q(NULL ) {}
        try_pop_iterator(Q & ar_Q) : m_Q(&ar_Q) { ++(*this); }
        bool operator==(const try_pop_iterator& rhs) const { return m_Q == rhs.m_Q; }
        bool operator!=(const try_pop_iterator& rhs) const { return !operator==(rhs); }
        const T& operator*() const { return *operator->(); }
        const T* operator->() const {
            assert(m_Q); // precondition
            return &m_T;
        }
        try_pop_iterator & operator++() {
            assert(m_Q); // precondition
            if (!m_Q->try_pop(m_T)) m_Q = NULL;
            return *this;
        }
};

tbb::mutex g_mutex;

struct Body {
    void operator()(const int & arg) const {
        tbb::mutex::scoped_lock l_anonymous(g_mutex);
        std::cout << arg << std::endl ;
    }
};

int main(int argc, char *argv[]) {
    // also works with tbb::concurrent_bounded_queue or tbb::concurrent_priority_queue
    typedef tbb::concurrent_queue typeof_l_queue; typeof_l_queue l_queue;

    typedef    push_iterator lt_push;
    typedef try_pop_iterator lt_pop ;

    int l_array[] = {10, 20, 30, 40, 50};
    std::copy(&l_array[0], &l_array[sizeof(l_array)/sizeof(*l_array)], lt_push(l_queue));

    // also works with std::for_each or tbb::parallel_do, same arguments
    tbb::parallel_for_each(lt_pop(l_queue), lt_pop(), Body());

    return EXIT_SUCCESS;
}[/cpp]

0 Kudos
Reply