Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

Questions about the concurrent containers.

ginr
Beginner
1,268 Views

Hello,

I have a few questions about the concurrent containers, specifically, concurrent_hash_map, and concurrent_vector. Let me start by giving a small overview of my application.

My application has X containers, these can be either hashmaps, or vectors. Each container contains a bunch of key:values. The application is a server, so it accepts client operations.

There are two kinds of operations, writing a key/value, and reading a key/value, and a third operation which is not initiated by a client, flushing a container to disk.

When a key/value write is received, the following happens:

We figure out which concurrent container to put that write in(we maintain X, and they are partitioned by keyspace).
We see if that container already has that key(i.e, either iterate through it looking for that key, if its a vector, or do a find() for that key, if its a hashmap).
If the key is found, we read the current value.
We do something with this value, and then write back the new value to this key.
If the key is not found, we simply write it to the vector/hashmap.
We write the key/value to that container.
Done.

When a read is received, the following happens:

We figure out which concurrent container to read that key from as explained above.
We look for that key in the container.
--Do some other stuff here, unrelated to the container--
Return.

Finally, we have a third operation which happens periodically, specifically, when the size of the container goes over X. (Let's say 1,000 values, or 1MB).
This operation goes like so:

Every container has an associated "flusher" thread.
This thread checks the size of the container in a lazy fashion (every few seconds).
If its over X, it copies the entire container, and then clears it. (or, it swaps the container with an empty one).
--does various operations on the copied container, flushes to disk, etc--
Done.

We process between 1,000-10,000 writes and an equivalent amount of reads per second.

So, my questions are primarily related to the flushing process:

When I call swap() on a vector with an empty vector, what happens to any operations currently in flight/process on that vector?
I understand its not safe to call clear() on a vector while operations are in flight, should I surround the vector with a _rw_ lock, or, what's the best way to proceed here?
When I call swap() on a hash map with an empty hash map, what happens to any operations currently in process/flight on the hash map?

If doing a swap() isn't safe, how should I go about "flushing" a container? A flush basically needs to copy a container, and empty it, atomically. I don't mind blocking until any current reads on the container are done, on the copy/clear. Any help is appreciated :).

Each of my containers won't hold more than approximately 1,000 items at a time (Once they reach 1,000, they will be flushed), should I opt for hash_map (because of the O(1) lookup time), or the vector?

Given my above usage, is there anything I'm doing wrong? Both vector/hash_map support size(), swap(), and iteration, is there anything else I should take into consideration (lookpus are O(1) for the concurrent_hash_map, and O(n) for the vector, due to the iteration, but I doubt this will be a problem in practise). Also, concurrent_hash_maps aren't said to be efficient under a few thousand items, so maybe I should stick to the vector?

I also plan to use concurrent_queue to maintain a processing queue of reads/writes with N threads processing them, and a single thread adding to them(the one receiving requests from clients). Is there anything wrong with this?

Again, any help at all is appreciated, I apologize for the long post.

Thank you!

0 Kudos
10 Replies
Dmitry_Vyukov
Valued Contributor I
1,268 Views

KISS

If you are able to partition your data then your problems are solved, you do not need even concurrent containers.

Partition data to X partitions. Create X worker threads. Each thread deals with corresponding part of the data.

Input IO thread determines target partition and send a request to corresponding worker thread.

Since only one thread ever accesses a partition's data, a data structure (hash map preferably) can be plain old single-threaded.

As to flushing, a worker thread periodically checks as to whether a flush has to be done. If so, it swaps a data structure and then flushes it to disk. Optionally, a worker thread may offload actual flushing (writing to disk) to a dedicated thread (this way worker thread always ready to process user requests).

As to X, you may consider it to be k*N, where N is the number of processors in the system, and k = 1, 2, 3... depending on evenness of partitioning and presence of blocking during processing. If distribution is even and there is no blocking, then set k=1.

0 Kudos
Anton_M_Intel
Employee
1,268 Views

swap() methods are not thread-safe. You could consider swapping atomic pointers to containers instead. New implementation of the hash map (TBB>=2.2) is more efficient and works fine with 1000 elements and less. But I concur with Dmitry that if you can avoid global synchronization (even by concurrent containers) it is always better to do so.

Thanks

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,268 Views

> You could consider swapping atomic pointers to containers instead

And when delete an old container? ;)

0 Kudos
ginr
Beginner
1,268 Views

Hello,

I am partitioning the data, but the amount of worker threads is larger than the amount of partitions, because there is a blocking part in the lookup (after the read is done from memory, we also do a read from disk and "merge" the two versions). So, we have a few threads per partition(so, not using a concurrent container would cause contention). Right now, threads are not attached to a specific partition, we have two global concurrent queues which are fed by the main thread (that accepts connections). One is for read requests, another is for writes. A free thread pops an item out of there, processes it (regardless of which partition its for), and unlocks it. The flusher threads are dedicated to specific partitions.

I could change the design, and make it N reader threads per partition, with 1 writer per partition (and 1 flusher). I would still need a concurrent container in this case (or wrap it with a _rw_ lock).

Performance with a single IO thread is very low (due to the blocking), and hence, that's not something we want to do just for the convinience of not using a concurrent container.

My current idea is to have N readers, N writers, two queues as mentioned above, Q partitions, and Q flushers.
I'm going to wrap the concurrent container (regardless of whether its a hash_map, or vector) with a _rw_ lock, both reads, and writes to the container acquire a reader lock(as its concurrent, after all). Flushers are dedicated to partitions, they check size() ever so often, and when size()>X, they acquire a writer lock on the partitions _rw_ lock, swap() it with an empty partition, unlock it, work on the swapped partition, and eventually destroy the swapped copy.

The writer lock is held for a very small amount of time in this case, and should not affect performance much at all, imo.

The other option is the flusher creating a new container when the size() of the current container exceeds X, and then atomically swapping the pointers to the two, as Anton suggests. And work on the copy, and eventually destruct it. But, I'm not familiar with how to do this/proceed here, so any help is again, appreciated.

I hope I've stated my case clearly, please tell me if I'm doing it wrong!

And, I still haven't decided between the concurrent hash map/vector, any tips here would help me out quite a bit. (or just downsides to one, or the other).

Thanks!

0 Kudos
ginr
Beginner
1,267 Views

Hello,

Another thing I forgot to mention, I know the above strategy isn't the best for cache performance, its not something I'm very focused on, and something I'm not very familiar with at all, but, if someone could tell me how it could be improved, I will try to implement it in my design.

We generally run on 8/16 core systems.


Thank you!

0 Kudos
Anton_M_Intel
Employee
1,268 Views
And when delete an old container? ;)
I didn't work out the details :) Sure, it can be deleted only in serial points or when the last reference disappears, so there should be no concurrent accesses. Depends on the use case.
0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,267 Views

> Sure, it can be deleted only in serial points or when the last reference disappears, so there should be no concurrent accesses

The problem that some applications (servers) do not feature any "serial points", they just process requests continuously. One can get "stop the world" serial point by stopping all the worker threads and then starting then again, but this may be too costly.

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,267 Views

My current idea is to have N readers, N writers, two queues as mentioned above, Q partitions, and Q flushers.
I'm going to wrap the concurrent container (regardless of whether its a hash_map, or vector) with a _rw_ lock, both reads, and writes to the container acquire a reader lock(as its concurrent, after all). Flushers are dedicated to partitions, they check size() ever so often, and when size()>X, they acquire a writer lock on the partitions _rw_ lock, swap() it with an empty partition, unlock it, work on the swapped partition, and eventually destroy the swapped copy.

That should work.

You can implement this along the lines of:

[bash]struct part_t
{
  concurrent_hash_map map;
  atomic refcount;
};

part_t* partition;
rw_mutex guard;

part_t* acquire_partition()
{
  guard.read_lock();
  part_t* part = partition;
  part->refcount.increment();
  guard.read_unlock();
}

void release_partition(part_t* part)
{
  if (0 == part->refcount.decrement())
    delete part;
}

part_t* reset_partition()
{
  part_t* part = new part_t;
  part->refcount.store(1);
  guard.write_lock();
  part_t* prev = partition;
  partition = part;
  guard.write_unlock();  
}[/bash]

Then worker threads and flusher thread will be as:

[bash]void worker_thread()
{
  for (;;)
  {
    part_t* part = acquire_partition();
	process_request(part->map);
	release_partition(part);
  }
}

void flusher_thread()
{
  for (;;)
  {
    sleep(1000);
	if (partition->map.size() > 1000)
	{
	  part_t* part = reset_partition();
	  flush_to_disk(part);
      release_partition(part);
	}
  }
}
[/bash]

0 Kudos
Dmitry_Vyukov
Valued Contributor I
1,268 Views

Performance with a single IO thread is very low (due to the blocking), and hence, that's not something we want to do just for the convinience of not using a concurrent container.

Well, if IO is the problem then you may try to solve IO problem and not container-related problem.

One way is to switch to asynchronous IO. Then you will be able to create as much threads as you need, and not as much as dictated by IO.

Another way is to split work with in-memory container and on disk data. Then you will be able to create X threads for the former (possibly using plain single-threaded containers), and Y threads for the latter (depending on disk speed, etc).

You may further simplify previous variant as follows. Operation on an in-memory container takes sub-microsecond time, so there is not much sense in parallelizing it. So you may conduct operation on an in-memory container straight in the main thread that accepts incoming requests, and then offload disk IO part to a separate pool of worker threads.

0 Kudos
Chris_M__Thomasson
New Contributor I
1,268 Views
And when delete an old container? ;)
I didn't work out the details :) Sure, it can be deleted only in serial points or when the last reference disappears, so there should be no concurrent accesses. Depends on the use case.

FWIW, here is a fairlystraightforwardtechnique that can be used to determine exactly when it's safe to destroy the old container:

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/a53f24de178b419f

A simple usage pattern in pseudo-code might look like:

[plain]#define DEFER 1


typedef proxy proxy_type;


static proxy_type g_proxy;
static collection* g_collection = new collection;


void flush()
{
    proxy_type::collector& pc = g_proxy.acquire();
    collection* col = new collection;
    col = ATOMIC_SWAP(&g_collection, col);
    g_proxy.collect(pc, col);
    g_proxy.release(pc);
}


void iterate()
{
    proxy_type::collector& pc = g_proxy.acquire();
    g_collection->iterate();
    g_proxy.release(pc);
}[/plain]

The pattern is basically equal to that of Read, Copy and Update (RCU). You can amortize the number of times you need to acquire/release a proxy collector object by using the following usage patterns:

http://article.gmane.org/gmane.comp.lib.boost.devel/198747

(read all...)

The worker threads and flush threads could look like:

[bash]static proxy_type g_proxy;
static partition_type* g_part = [...];


void worker_thread()
{
    proxy_type::collector* pc = &g_proxy.acquire();

    for (;;)
    {
        request_type* r = try_to_get_request();

        if (! r)
        {
            g_proxy.release(*pc);

            r = wait_for_a_request();

            pc = &g_proxy.acquire();
        }

        partition_type* part = ATOMIC_LOAD(&g_part);

        process_a_request(r, part);

        pc = &g_proxy.sync(*pc);
    }

    g_proxy.release(*pc);
}


void flush_thread()
{
    for (;;)
    {
        wait_for_flush_interval();

        partition_type* part = new partition_type;

        part = ATOMIC_SWAP(&g_part, part);

        flush_partition_to_disk(part);

        proxy_type::collector& c = g_proxy.acquire();

        g_proxy.collect(c, part);

        g_proxy.release(c);
    }
}
[/bash]

This wouldeliminatethe need for acquire/releasing read access on a rw_mutex every single time you need to process a request.

0 Kudos
Reply