Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent queue with generics support (MPMCQueueOf) #104

Merged
merged 5 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,20 @@ q.Enqueue("foo")
// optimistic insertion attempt; doesn't block
inserted := q.TryEnqueue("bar")
// consumer obtains an item from the queue
item := q.Dequeue()
item := q.Dequeue() // interface{} pointing to a string
// optimistic obtain attempt; doesn't block
item, ok := q.TryDequeue()
```

Based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers.
`MPMCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.19 or later.

```go
q := xsync.NewMPMCQueueOf[string](1024)
q.Enqueue("foo")
item := q.Dequeue() // string
```

The queue is based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers.

The idea of the algorithm is to allow parallelism for concurrent producers and consumers by introducing the notion of tickets, i.e. values of two counters, one per producers/consumers. An atomic increment of one of those counters is the only noticeable contention point in queue operations. The rest of the operation avoids contention on writes thanks to the turn-based read/write access for each of the queue items.

Expand Down
150 changes: 150 additions & 0 deletions mpmcqueueof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//go:build go1.19
// +build go1.19

package xsync

import (
"runtime"
"sync/atomic"
"unsafe"
)

// A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent
// queue. It's a generic version of MPMCQueue.
//
// MPMCQueue instances must be created with NewMPMCQueueOf function.
// A MPMCQueueOf must not be copied after first use.
//
// Based on the data structure from the following C++ library:
// https://github.com/rigtorp/MPMCQueue
type MPMCQueueOf[I any] struct {
cap uint64
head uint64
//lint:ignore U1000 prevents false sharing
hpad [cacheLineSize - 8]byte
tail uint64
//lint:ignore U1000 prevents false sharing
tpad [cacheLineSize - 8]byte
slots []slotOfPadded[I]
}

type slotOfPadded[I any] struct {
slotOf[I]
// Unfortunately, proper padding like the below one:
//
// pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte
//
// won't compile, so here we add a best-effort padding for items up to
// 56 bytes size.
//lint:ignore U1000 prevents false sharing
pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
}

type slotOf[I any] struct {
// atomic.Uint64 is used here to get proper 8 byte alignment on
// 32-bit archs.
turn atomic.Uint64
item I
}

// NewMPMCQueueOf creates a new MPMCQueueOf instance with the given
// capacity.
func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] {
if capacity < 1 {
panic("capacity must be positive number")
}
return &MPMCQueueOf[I]{
cap: uint64(capacity),
slots: make([]slotOfPadded[I], capacity),
}
}

// Enqueue inserts the given item into the queue.
// Blocks, if the queue is full.
func (q *MPMCQueueOf[I]) Enqueue(item I) {
head := atomic.AddUint64(&q.head, 1) - 1
slot := &q.slots[q.idx(head)]
turn := q.turn(head) * 2
for slot.turn.Load() != turn {
runtime.Gosched()
}
slot.item = item
slot.turn.Store(turn + 1)
}

// Dequeue retrieves and removes the item from the head of the queue.
// Blocks, if the queue is empty.
func (q *MPMCQueueOf[I]) Dequeue() I {
var zeroedI I
tail := atomic.AddUint64(&q.tail, 1) - 1
slot := &q.slots[q.idx(tail)]
turn := q.turn(tail)*2 + 1
for slot.turn.Load() != turn {
runtime.Gosched()
}
item := slot.item
slot.item = zeroedI
slot.turn.Store(turn + 1)
return item
}

// TryEnqueue inserts the given item into the queue. Does not block
// and returns immediately. The result indicates that the queue isn't
// full and the item was inserted.
func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool {
head := atomic.LoadUint64(&q.head)
for {
slot := &q.slots[q.idx(head)]
turn := q.turn(head) * 2
if slot.turn.Load() == turn {
if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
slot.item = item
slot.turn.Store(turn + 1)
return true
}
} else {
prevHead := head
head = atomic.LoadUint64(&q.head)
if head == prevHead {
return false
}
}
runtime.Gosched()
}
}

// TryDequeue retrieves and removes the item from the head of the
// queue. Does not block and returns immediately. The ok result
// indicates that the queue isn't empty and an item was retrieved.
func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) {
tail := atomic.LoadUint64(&q.tail)
for {
slot := &q.slots[q.idx(tail)]
turn := q.turn(tail)*2 + 1
if slot.turn.Load() == turn {
if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
var zeroedI I
item = slot.item
ok = true
slot.item = zeroedI
slot.turn.Store(turn + 1)
return
}
} else {
prevTail := tail
tail = atomic.LoadUint64(&q.tail)
if tail == prevTail {
return
}
}
runtime.Gosched()
}
}

func (q *MPMCQueueOf[I]) idx(i uint64) uint64 {
return i % q.cap
}

func (q *MPMCQueueOf[I]) turn(i uint64) uint64 {
return i / q.cap
}
Loading