From 0f98a66ba41767136a42b2438f0710726cb09da9 Mon Sep 17 00:00:00 2001 From: Andrey Pechkurov Date: Sun, 20 Aug 2023 20:48:45 +0300 Subject: [PATCH 1/5] Add concurrent queue with generics support (MPMCQueueOf) --- README.md | 12 +- mpmcqueueof.go | 142 +++++++++++++++++++ mpmcqueueof_test.go | 326 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 478 insertions(+), 2 deletions(-) create mode 100644 mpmcqueueof.go create mode 100644 mpmcqueueof_test.go diff --git a/README.md b/README.md index 54f5fdb..011ac05 100644 --- a/README.md +++ b/README.md @@ -99,12 +99,20 @@ q.Enqueue("foo") // optimistic insertion attempt; doesn't block inserted := q.TryEnqueue("bar") // consumer obtains an item from the queue -item := q.Dequeue() +item := q.Dequeue() // interface{} pointing at a string // optimistic obtain attempt; doesn't block item, ok := q.TryDequeue() ``` -Based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers. +`MPMCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.18 or later. + +```go +q := xsync.NewMPMCQueueOf[string](1024) +q.Enqueue("foo") +item := q.Dequeue() // string +``` + +The queue is based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers. The idea of the algorithm is to allow parallelism for concurrent producers and consumers by introducing the notion of tickets, i.e. values of two counters, one per producers/consumers. An atomic increment of one of those counters is the only noticeable contention point in queue operations. The rest of the operation avoids contention on writes thanks to the turn-based read/write access for each of the queue items. diff --git a/mpmcqueueof.go b/mpmcqueueof.go new file mode 100644 index 0000000..5266950 --- /dev/null +++ b/mpmcqueueof.go @@ -0,0 +1,142 @@ +//go:build go1.18 +// +build go1.18 + +package xsync + +import ( + "runtime" + "sync/atomic" + "unsafe" +) + +// A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent +// queue. It's a generic version of MPMCQueue. +// +// MPMCQueue instances must be created with NewMPMCQueueOf function. +// A MPMCQueueOf must not be copied after first use. +// +// Based on the data structure from the following C++ library: +// https://github.com/rigtorp/MPMCQueue +type MPMCQueueOf[I any] struct { + cap uint64 + head uint64 + //lint:ignore U1000 prevents false sharing + hpad [cacheLineSize - 8]byte + tail uint64 + //lint:ignore U1000 prevents false sharing + tpad [cacheLineSize - 8]byte + slots []slotOfPadded[I] +} + +type slotOfPadded[I any] struct { + slotOf[I] + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - (unsafe.Sizeof(slot{}) % cacheLineSize)]byte +} + +type slotOf[I any] struct { + turn uint64 + item I +} + +// NewMPMCQueueOf creates a new MPMCQueueOf instance with the given +// capacity. +func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] { + if capacity < 1 { + panic("capacity must be positive number") + } + return &MPMCQueueOf[I]{ + cap: uint64(capacity), + slots: make([]slotOfPadded[I], capacity), + } +} + +// Enqueue inserts the given item into the queue. +// Blocks, if the queue is full. +func (q *MPMCQueueOf[I]) Enqueue(item I) { + head := atomic.AddUint64(&q.head, 1) - 1 + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + for atomic.LoadUint64(&slot.turn) != turn { + runtime.Gosched() + } + slot.item = item + atomic.StoreUint64(&slot.turn, turn+1) +} + +// Dequeue retrieves and removes the item from the head of the queue. +// Blocks, if the queue is empty. +func (q *MPMCQueueOf[I]) Dequeue() I { + var zeroedI I + tail := atomic.AddUint64(&q.tail, 1) - 1 + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + for atomic.LoadUint64(&slot.turn) != turn { + runtime.Gosched() + } + item := slot.item + slot.item = zeroedI + atomic.StoreUint64(&slot.turn, turn+1) + return item +} + +// TryEnqueue inserts the given item into the queue. Does not block +// and returns immediately. The result indicates that the queue isn't +// full and the item was inserted. +func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { + head := atomic.LoadUint64(&q.head) + for { + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.head, head, head+1) { + slot.item = item + atomic.StoreUint64(&slot.turn, turn+1) + return true + } + } else { + prevHead := head + head = atomic.LoadUint64(&q.head) + if head == prevHead { + return false + } + } + runtime.Gosched() + } +} + +// TryDequeue retrieves and removes the item from the head of the +// queue. Does not block and returns immediately. The ok result +// indicates that the queue isn't empty and an item was retrieved. +func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) { + tail := atomic.LoadUint64(&q.tail) + for { + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { + var zeroedI I + item = slot.item + ok = true + slot.item = zeroedI + atomic.StoreUint64(&slot.turn, turn+1) + return + } + } else { + prevTail := tail + tail = atomic.LoadUint64(&q.tail) + if tail == prevTail { + return + } + } + runtime.Gosched() + } +} + +func (q *MPMCQueueOf[I]) idx(i uint64) uint64 { + return i % q.cap +} + +func (q *MPMCQueueOf[I]) turn(i uint64) uint64 { + return i / q.cap +} diff --git a/mpmcqueueof_test.go b/mpmcqueueof_test.go new file mode 100644 index 0000000..437cd16 --- /dev/null +++ b/mpmcqueueof_test.go @@ -0,0 +1,326 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright notice. The following tests are partially based on +// the following file from the Go Programming Language core repo: +// https://github.com/golang/go/blob/831f9376d8d730b16fb33dfd775618dffe13ce7a/src/runtime/chan_test.go + +package xsync_test + +import ( + "runtime" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + . "github.com/puzpuzpuz/xsync/v2" +) + +func TestQueueOf_InvalidSize(t *testing.T) { + defer func() { recover() }() + NewMPMCQueueOf[int](0) + t.Fatal("no panic detected") +} + +func TestQueueOfEnqueueDequeueInt(t *testing.T) { + q := NewMPMCQueueOf[int](10) + for i := 0; i < 10; i++ { + q.Enqueue(i) + } + for i := 0; i < 10; i++ { + if got := q.Dequeue(); got != i { + t.Fatalf("got %v, want %d", got, i) + } + } +} + +func TestQueueOfEnqueueDequeueString(t *testing.T) { + q := NewMPMCQueueOf[string](10) + for i := 0; i < 10; i++ { + q.Enqueue(strconv.Itoa(i)) + } + for i := 0; i < 10; i++ { + if got := q.Dequeue(); got != strconv.Itoa(i) { + t.Fatalf("got %v, want %d", got, i) + } + } +} + +func TestQueueOfEnqueueDequeueStruct(t *testing.T) { + type foo struct { + bar int + baz int + } + q := NewMPMCQueueOf[foo](10) + for i := 0; i < 10; i++ { + q.Enqueue(foo{i, i}) + } + for i := 0; i < 10; i++ { + if got := q.Dequeue(); got.bar != i || got.baz != i { + t.Fatalf("got %v, want %d", got, i) + } + } +} + +func TestQueueOfEnqueueDequeueStructRef(t *testing.T) { + type foo struct { + bar int + baz int + } + q := NewMPMCQueueOf[*foo](11) + for i := 0; i < 10; i++ { + q.Enqueue(&foo{i, i}) + } + q.Enqueue(nil) + for i := 0; i < 10; i++ { + if got := q.Dequeue(); got.bar != i || got.baz != i { + t.Fatalf("got %v, want %d", got, i) + } + } + if last := q.Dequeue(); last != nil { + t.Fatalf("got %v, want nil", last) + } +} + +func TestQueueOfEnqueueBlocksOnFull(t *testing.T) { + q := NewMPMCQueueOf[string](1) + q.Enqueue("foo") + cdone := make(chan bool) + flag := int32(0) + go func() { + q.Enqueue("bar") + if atomic.LoadInt32(&flag) == 0 { + t.Error("enqueue on full queue didn't wait for dequeue") + } + cdone <- true + }() + time.Sleep(50 * time.Millisecond) + atomic.StoreInt32(&flag, 1) + if got := q.Dequeue(); got != "foo" { + t.Fatalf("got %v, want foo", got) + } + <-cdone +} + +func TestQueueOfDequeueBlocksOnEmpty(t *testing.T) { + q := NewMPMCQueueOf[string](2) + cdone := make(chan bool) + flag := int32(0) + go func() { + q.Dequeue() + if atomic.LoadInt32(&flag) == 0 { + t.Error("dequeue on empty queue didn't wait for enqueue") + } + cdone <- true + }() + time.Sleep(50 * time.Millisecond) + atomic.StoreInt32(&flag, 1) + q.Enqueue("foobar") + <-cdone +} + +func TestQueueOfTryEnqueueDequeue(t *testing.T) { + q := NewMPMCQueueOf[int](10) + for i := 0; i < 10; i++ { + if !q.TryEnqueue(i) { + t.Fatalf("failed to enqueue for %d", i) + } + } + for i := 0; i < 10; i++ { + if got, ok := q.TryDequeue(); !ok || got != i { + t.Fatalf("got %v, want %d, for status %v", got, i, ok) + } + } +} + +func TestQueueOfTryEnqueueOnFull(t *testing.T) { + q := NewMPMCQueueOf[string](1) + if !q.TryEnqueue("foo") { + t.Error("failed to enqueue initial item") + } + if q.TryEnqueue("bar") { + t.Error("got success for enqueue on full queue") + } +} + +func TestQueueOfTryDequeueBlocksOnEmpty(t *testing.T) { + q := NewMPMCQueueOf[int](2) + if _, ok := q.TryDequeue(); ok { + t.Error("got success for enqueue on empty queue") + } +} + +func hammerQueueOfBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { + runtime.GOMAXPROCS(gomaxprocs) + q := NewMPMCQueueOf[int](numThreads) + startwg := sync.WaitGroup{} + startwg.Add(1) + csum := make(chan int, numThreads) + // Start producers. + for i := 0; i < numThreads; i++ { + go func(n int) { + startwg.Wait() + for j := n; j < numOps; j += numThreads { + q.Enqueue(j) + } + }(i) + } + // Start consumers. + for i := 0; i < numThreads; i++ { + go func(n int) { + startwg.Wait() + sum := 0 + for j := n; j < numOps; j += numThreads { + item := q.Dequeue() + sum += item + } + csum <- sum + }(i) + } + startwg.Done() + // Wait for all the sums from producers. + sum := 0 + for i := 0; i < numThreads; i++ { + s := <-csum + sum += s + } + // Assert the total sum. + expectedSum := numOps * (numOps - 1) / 2 + if sum != expectedSum { + t.Fatalf("sums don't match for %d num ops, %d num threads: got %d, want %d", + numOps, numThreads, sum, expectedSum) + } +} + +func TestQueueOfBlockingCalls(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) + n := 100 + if testing.Short() { + n = 10 + } + hammerQueueOfBlockingCalls(t, 1, 100*n, n) + hammerQueueOfBlockingCalls(t, 1, 1000*n, 10*n) + hammerQueueOfBlockingCalls(t, 4, 100*n, n) + hammerQueueOfBlockingCalls(t, 4, 1000*n, 10*n) + hammerQueueOfBlockingCalls(t, 8, 100*n, n) + hammerQueueOfBlockingCalls(t, 8, 1000*n, 10*n) +} + +func hammerQueueOfNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { + runtime.GOMAXPROCS(gomaxprocs) + q := NewMPMCQueueOf[int](numThreads) + startwg := sync.WaitGroup{} + startwg.Add(1) + csum := make(chan int, numThreads) + // Start producers. + for i := 0; i < numThreads; i++ { + go func(n int) { + startwg.Wait() + for j := n; j < numOps; j += numThreads { + for !q.TryEnqueue(j) { + // busy spin until success + } + } + }(i) + } + // Start consumers. + for i := 0; i < numThreads; i++ { + go func(n int) { + startwg.Wait() + sum := 0 + for j := n; j < numOps; j += numThreads { + var ( + item int + ok bool + ) + for { + // busy spin until success + if item, ok = q.TryDequeue(); ok { + sum += item + break + } + } + } + csum <- sum + }(i) + } + startwg.Done() + // Wait for all the sums from producers. + sum := 0 + for i := 0; i < numThreads; i++ { + s := <-csum + sum += s + } + // Assert the total sum. + expectedSum := numOps * (numOps - 1) / 2 + if sum != expectedSum { + t.Fatalf("sums don't match for %d num ops, %d num threads: got %d, want %d", + numOps, numThreads, sum, expectedSum) + } +} + +func TestQueueOfNonBlockingCalls(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) + n := 10 + if testing.Short() { + n = 1 + } + hammerQueueOfNonBlockingCalls(t, 1, n, n) + hammerQueueOfNonBlockingCalls(t, 2, 10*n, 2*n) + hammerQueueOfNonBlockingCalls(t, 4, 100*n, 4*n) +} + +func benchmarkQueueOfProdCons(b *testing.B, queueSize, localWork int) { + callsPerSched := queueSize + procs := runtime.GOMAXPROCS(-1) / 2 + if procs == 0 { + procs = 1 + } + N := int32(b.N / callsPerSched) + c := make(chan bool, 2*procs) + q := NewMPMCQueueOf[int](queueSize) + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < callsPerSched; g++ { + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + q.Enqueue(1) + } + } + q.Enqueue(0) + c <- foo == 42 + }() + go func() { + foo := 0 + for { + v := q.Dequeue() + if v == 0 { + break + } + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + <-c + } +} + +func BenchmarkQueueOfProdCons(b *testing.B) { + benchmarkQueueOfProdCons(b, 1000, 0) +} + +func BenchmarkOfQueueProdConsWork100(b *testing.B) { + benchmarkQueueOfProdCons(b, 1000, 100) +} From a0168dbc8fcd6da7309f108d896d308d182ac1e1 Mon Sep 17 00:00:00 2001 From: Andrey Pechkurov Date: Sun, 20 Aug 2023 21:20:39 +0300 Subject: [PATCH 2/5] Use proper type in the padding --- mpmcqueueof.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mpmcqueueof.go b/mpmcqueueof.go index 5266950..d480c11 100644 --- a/mpmcqueueof.go +++ b/mpmcqueueof.go @@ -31,7 +31,7 @@ type MPMCQueueOf[I any] struct { type slotOfPadded[I any] struct { slotOf[I] //lint:ignore U1000 prevents false sharing - pad [cacheLineSize - (unsafe.Sizeof(slot{}) % cacheLineSize)]byte + pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte } type slotOf[I any] struct { From c614e5792a46d9f43fa7c9f30e650234afe5e9c2 Mon Sep 17 00:00:00 2001 From: Andrey Pechkurov Date: Mon, 21 Aug 2023 10:42:38 +0300 Subject: [PATCH 3/5] Add a hack for slot padding --- mpmcqueueof.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/mpmcqueueof.go b/mpmcqueueof.go index d480c11..6aea83b 100644 --- a/mpmcqueueof.go +++ b/mpmcqueueof.go @@ -30,12 +30,20 @@ type MPMCQueueOf[I any] struct { type slotOfPadded[I any] struct { slotOf[I] + // Unfortunately, proper padding like the below one: + // + // pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte + // + // won't compile, so here we add a best-effort padding for items up to + // 56 bytes size. //lint:ignore U1000 prevents false sharing - pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte + pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte } type slotOf[I any] struct { - turn uint64 + // atomic.Uint64 is used here to get proper 8 byte alignment on + // 32-bit archs. + turn atomic.Uint64 item I } @@ -57,11 +65,11 @@ func (q *MPMCQueueOf[I]) Enqueue(item I) { head := atomic.AddUint64(&q.head, 1) - 1 slot := &q.slots[q.idx(head)] turn := q.turn(head) * 2 - for atomic.LoadUint64(&slot.turn) != turn { + for slot.turn.Load() != turn { runtime.Gosched() } slot.item = item - atomic.StoreUint64(&slot.turn, turn+1) + slot.turn.Store(turn + 1) } // Dequeue retrieves and removes the item from the head of the queue. @@ -71,12 +79,12 @@ func (q *MPMCQueueOf[I]) Dequeue() I { tail := atomic.AddUint64(&q.tail, 1) - 1 slot := &q.slots[q.idx(tail)] turn := q.turn(tail)*2 + 1 - for atomic.LoadUint64(&slot.turn) != turn { + for slot.turn.Load() != turn { runtime.Gosched() } item := slot.item slot.item = zeroedI - atomic.StoreUint64(&slot.turn, turn+1) + slot.turn.Store(turn + 1) return item } @@ -88,10 +96,10 @@ func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { for { slot := &q.slots[q.idx(head)] turn := q.turn(head) * 2 - if atomic.LoadUint64(&slot.turn) == turn { + if slot.turn.Load() == turn { if atomic.CompareAndSwapUint64(&q.head, head, head+1) { slot.item = item - atomic.StoreUint64(&slot.turn, turn+1) + slot.turn.Store(turn + 1) return true } } else { @@ -113,13 +121,13 @@ func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) { for { slot := &q.slots[q.idx(tail)] turn := q.turn(tail)*2 + 1 - if atomic.LoadUint64(&slot.turn) == turn { + if slot.turn.Load() == turn { if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { var zeroedI I item = slot.item ok = true slot.item = zeroedI - atomic.StoreUint64(&slot.turn, turn+1) + slot.turn.Store(turn + 1) return } } else { From e0dd412ba9b825fb0baf87f5c7c095e88e653d99 Mon Sep 17 00:00:00 2001 From: Andrey Pechkurov Date: Mon, 21 Aug 2023 10:45:29 +0300 Subject: [PATCH 4/5] Fix MPMCQueueOf min Go version --- README.md | 2 +- mpmcqueueof.go | 4 ++-- mpmcqueueof_test.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 011ac05..07e5999 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ item := q.Dequeue() // interface{} pointing at a string item, ok := q.TryDequeue() ``` -`MPMCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.18 or later. +`MPMCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.19 or later. ```go q := xsync.NewMPMCQueueOf[string](1024) diff --git a/mpmcqueueof.go b/mpmcqueueof.go index 6aea83b..38a8fa3 100644 --- a/mpmcqueueof.go +++ b/mpmcqueueof.go @@ -1,5 +1,5 @@ -//go:build go1.18 -// +build go1.18 +//go:build go1.19 +// +build go1.19 package xsync diff --git a/mpmcqueueof_test.go b/mpmcqueueof_test.go index 437cd16..7f4e161 100644 --- a/mpmcqueueof_test.go +++ b/mpmcqueueof_test.go @@ -1,5 +1,5 @@ -//go:build go1.18 -// +build go1.18 +//go:build go1.19 +// +build go1.19 // Copyright notice. The following tests are partially based on // the following file from the Go Programming Language core repo: From 93ef071c7f998f5cb1a1fef407467c4271b6a158 Mon Sep 17 00:00:00 2001 From: Andrey Pechkurov Date: Mon, 21 Aug 2023 10:56:20 +0300 Subject: [PATCH 5/5] Improve readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 07e5999..94dca04 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ q.Enqueue("foo") // optimistic insertion attempt; doesn't block inserted := q.TryEnqueue("bar") // consumer obtains an item from the queue -item := q.Dequeue() // interface{} pointing at a string +item := q.Dequeue() // interface{} pointing to a string // optimistic obtain attempt; doesn't block item, ok := q.TryDequeue() ```