- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Features:
- multiple-producer/single-consumer
- strong fifo (wrt one producer)
- unbounded (not in presented demo implementation)
- no atomic_rmw or memory barriers (at least on x86, on other
architectures #StoreStore may be needed while enqueuing) while
enqueuing/dequeuing *and* while node management/reclamation.
The idea.
Before putting node to shared buffer producer puts node to his own
local list, and then put node to shared buffer. In shared buffer node
can get... lost. You don't mishear. Node is stored to shared buffer
with plain store, no CAS, so node can get lost.
Consumer remember last consumed node for every producer. And before
consuming node consumer check whether this the next node after last
consumed node. If this is the next node after last consumed node, then
all is ok. And if this is not the next node after last consumed node,
then some nodes from this consumer get lost, and consumer restore lost
nodes using producer local list of nodes.
Here is the code:
----------------------------------------------------------------------------------------
struct producer_local_t;
struct node_t
{
void* user_data;
// used to organize producer local
// list of enqueued nodes
node_t* local_next;
// incremented with every produced node
unsigned tag;
producer_local_t* producer_local;
// whether node is already consumed or not
bool is_consumed;
{
// last consumed node
node_t* last_node;
// last consumed node tag
unsigned last_tag;
{
producer_local_t local;
unsigned tag_counter;
// newest allocated node
node_t* head;
// oldest allocated node
node_t* tail;
producer_t()
{
local.last_node = 0;
local.last_tag = 0;
tag_counter = 0;
head = 0;
tail = 0;
}
{
// for simplicity assuming that the buffer is infinite
static const unsigned infinity = 100;
node_t* buffer[infinity];
node_t** consume_pos;
node_t** produce_pos_hint;
mpsc_queue_t()
{
memset(buffer, 0, sizeof(buffer));
consume_pos = buffer;
produce_pos_hint = buffer;
}
void enqueue(producer_t* producer, void* data)
{
// multiple producers here
if (0 == producer->local.last_node)
first_time_setup(producer);
// allocate node for enqueue
node_t* node = 0;
// try to reuse oldest node
if (producer->tail->is_consumed)
{
// reuse oldest node
node = producer->tail;
producer->tail = producer->tail->local_next;
}
else
{
// allocate new node
node = new node_t;
node->producer_local = &producer->local;
}
// fill node
node->is_consumed = false;
node->local_next = 0;
node->tag = producer->tag_counter++;
node->user_data = data;
producer->head->local_next = node;
producer->head = node;
//#StoreStore
// get next empty position
node_t** pos = oracle();
// hazard store
// this store potentially can be
// overwritten by other producer
*pos = node;
// one more hazard store
produce_pos_hint = pos + 1;
}
void first_time_setup(producer_t* producer)
{
// allocate new node
node_t* node = new node_t;
node->producer_local = &producer->local;
node->is_consumed = false;
node->local_next = 0;
node->tag = producer->tag_counter++;
node->user_data = 0;
producer->head = node;
producer->tail = node;
producer->local.last_node = node;
}
node_t** oracle()
{
// for now just find first empty cell in buffer
node_t** node = produce_pos_hint;
while (*node) ++node;
return node;
}
void* dequeue()
{
// only single consumer here
node_t* node = *consume_pos;
if (0 == node)
return 0;
// Only rely on data-dependency here
// chech whether we have lost some nodes
// since previous consumed node from the producer or not
producer_local_t* local = node->producer_local;
if (local->last_tag + 1 == node->tag)
// producer tags are consistent
// i.e. we don't lost any nodes,
// so just increment pos
++consume_pos;
else
// producer tags are inconsistent
// i.e. we have lost some nodes,
// so get next node from previous
// consumed node from the producer
// and _don't_ increment pos
node = local->last_node->local_next;
// we don't need last consumed node,
// so mark the node accordingly
local->last_node->is_consumed = true;
// set last consumed node
local->last_node = node;
local->last_tag = node->tag;
return node->user_data;
}
Comments/suggestions/remarks/criticism are welcome
Dmitriy V'jukov
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
To make this queue unbounded you can use nested list as underlying data structure. Nests must be large enough. And to manage nests lifetime you can use any of PDR techniques: RCU/SMR/ROP/vZOOM/ref-counting. Something like this:
struct nest_t
{
node_t* buffer[1023];
nest_t* next;
};
And of course you can use this queue as bounded with cyclic bounded buffer. Since consumer is only one, he can freely zeroize cells after himself.
Dmitriy V'jukov
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
oracle() function must use produce_pos_hint variable and some smart algorithm to determine enqueue position. Something like this:
1. Check current enqueue position hint
2. If fail then check several next positions
3. If fail then use binary search in [enqueue_position_hint, end_of_beffer] to find right position
Note: hint can't point to position after actual enqueue position, only before.
Dmitriy V'jukov
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If/when a producer's node is lost, that will not even be
discovered until the next time that the producer manages to produce a
node that *isn't* lost, if ever. So if there are many producers and/or
some producers produce a node only occasionally, lost nodes can sit
unconsumed for an indefinite time.
In some envirionments this is not a problem at all (I suppose that node loss frequency will be low enough).
And if this is a problem than I propose several solutions.
1. Consumer can (periodically/episodically) check local queues of all producers for lost nodes (global list of all producers must be maintained).
2. Consumers can periodically enqueue fake nodes to queue to push frozen nodes in the queue.
Dmitriy V'jukov
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Yes, this is rational suggestion.
If there are few threads, then - yes. It will be very good solution.
But I am targeted and thinking about manycore machines with, for
example, 100 cores (Intel promise 80 cores in 5 years). And you can
have, for example, 2 threads per core. So 200 threads. Every consumer
must pull 200 spsc queues... And to block when all queues are empty,
consumer must check all 200 queues 2 times...
And to determine total node count in N spsc queues, consumer have to
check all N queues too. Node count needed for load-balancing,
statistics, feedback etc...
I am thinking about solution when consumer have, for example, N/10
mpsc queues instead of N spsc queues (N - number of threads). So and
number of queues would be moderate and contention would be moderate
too.
Dmitriy V'jukov
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page