Skip to content

Commit

Permalink
pkg/notifier: implement notifier (pingcap#397)
Browse files Browse the repository at this point in the history
* pkg/notifier: implement notifier
  • Loading branch information
liuzix authored May 19, 2022
1 parent fb4028a commit 18a651e
Show file tree
Hide file tree
Showing 6 changed files with 585 additions and 61 deletions.
60 changes: 60 additions & 0 deletions pkg/containers/chunked_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package containers

import (
"sync"

"github.com/edwingeng/deque"
)

// Deque implements Queue with edwingeng/deque
//nolint:structcheck
type Deque[T any] struct {
// mu protects deque, because it is not thread-safe.
mu sync.RWMutex
deque deque.Deque
}

// NewDeque creates a new Deque instance
func NewDeque[T any]() *Deque[T] {
return &Deque[T]{
deque: deque.NewDeque(),
}
}

func (d *Deque[T]) Push(elem T) {
d.mu.Lock()
defer d.mu.Unlock()

d.deque.PushBack(elem)
}

func (d *Deque[T]) Pop() (T, bool) {
d.mu.Lock()
defer d.mu.Unlock()

if d.deque.Empty() {
var noVal T
return noVal, false
}

return d.deque.PopFront().(T), true
}

func (d *Deque[T]) Peek() (T, bool) {
d.mu.RLock()
defer d.mu.RUnlock()

if d.deque.Empty() {
var noVal T
return noVal, false
}

return d.deque.Front().(T), true
}

func (d *Deque[T]) Size() int {
d.mu.RLock()
defer d.mu.RUnlock()

return d.deque.Len()
}
63 changes: 2 additions & 61 deletions pkg/containers/queue.go
Original file line number Diff line number Diff line change
@@ -1,68 +1,9 @@
package containers

import (
"sync"

"github.com/edwingeng/deque"
)

// Queue abstracts a generics double-ended queue, which is thread-safe
// Queue abstracts a generics FIFO queue, which is thread-safe
type Queue[T any] interface {
Add(elem T)
Push(elem T)
Pop() (T, bool)
Peek() (T, bool)
Size() int
}

// Deque implements Queue with edwingeng/deque
//nolint:structcheck
type Deque[T any] struct {
// mu protects deque, because it is not thread-safe.
mu sync.RWMutex
deque deque.Deque
}

// NewDeque creates a new Deque instance
func NewDeque[T any]() *Deque[T] {
return &Deque[T]{
deque: deque.NewDeque(),
}
}

func (d *Deque[T]) Add(elem T) {
d.mu.Lock()
defer d.mu.Unlock()

d.deque.PushBack(elem)
}

func (d *Deque[T]) Pop() (T, bool) {
d.mu.Lock()
defer d.mu.Unlock()

if d.deque.Empty() {
var noVal T
return noVal, false
}

return d.deque.PopFront().(T), true
}

func (d *Deque[T]) Peek() (T, bool) {
d.mu.RLock()
defer d.mu.RUnlock()

if d.deque.Empty() {
var noVal T
return noVal, false
}

return d.deque.Front().(T), true
}

func (d *Deque[T]) Size() int {
d.mu.RLock()
defer d.mu.RUnlock()

return d.deque.Len()
}
113 changes: 113 additions & 0 deletions pkg/containers/slice_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package containers

import "sync"

// SliceQueue is a FIFO queue implemented
// by a Go slice.
type SliceQueue[T any] struct {
mu sync.Mutex
elems []T

// C is a signal for non-empty queue.
// A consumer can select for C and then Pop
// as many elements as possible in a for-select
// loop.
// Refer to an example in TestSliceQueueConcurrentWriteAndRead.
C chan struct{}

pool *sync.Pool
}

// NewSliceQueue creates a new SliceQueue.
func NewSliceQueue[T any]() *SliceQueue[T] {
return &SliceQueue[T]{
C: make(chan struct{}, 1),
pool: &sync.Pool{},
}
}

func (q *SliceQueue[T]) Push(elem T) {
q.mu.Lock()

signal := false
if len(q.elems) == 0 {
signal = true
if q.elems == nil {
q.elems = q.allocateSlice()
q.elems = q.elems[:0]
}
}

q.elems = append(q.elems, elem)
q.mu.Unlock()

if signal {
select {
case q.C <- struct{}{}:
default:
}
}
}

func (q *SliceQueue[T]) Pop() (T, bool) {
q.mu.Lock()

var zero T
if len(q.elems) == 0 {
q.mu.Unlock()
return zero, false
}

ret := q.elems[0]
q.elems[0] = zero
q.elems = q.elems[1:]

if len(q.elems) == 0 {
q.freeSlice(q.elems)
q.elems = nil
} else {
// non empty queue
select {
case q.C <- struct{}{}:
default:
}
}

q.mu.Unlock()
return ret, true
}

func (q *SliceQueue[T]) Peek() (retVal T, ok bool) {
q.mu.Lock()
defer q.mu.Unlock()

if len(q.elems) == 0 {
ok = false
return
}

return q.elems[0], true
}

func (q *SliceQueue[T]) Size() int {
q.mu.Lock()
defer q.mu.Unlock()

return len(q.elems)
}

func (q *SliceQueue[T]) allocateSlice() []T {
ptr := q.pool.Get()
if ptr == nil {
return make([]T, 0, 16)
}

return *(ptr.(*[]T))
}

func (q *SliceQueue[T]) freeSlice(s []T) {
if len(s) != 0 {
panic("only empty slice allowed")
}
q.pool.Put(&s)
}
136 changes: 136 additions & 0 deletions pkg/containers/slice_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package containers

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestSliceQueueBasics(t *testing.T) {
q := NewSliceQueue[int]()

require.False(t, checkSignal(q.C))
require.Equal(t, 0, q.Size())
q.Push(1)
require.Equal(t, 1, q.Size())
q.Push(2)
require.Equal(t, 2, q.Size())
q.Push(3)
require.Equal(t, 3, q.Size())

val, ok := q.Peek()
require.True(t, ok)
require.Equal(t, 1, val)

require.True(t, checkSignal(q.C))
val, ok = q.Pop()
require.True(t, ok)
require.Equal(t, 1, val)

val, ok = q.Peek()
require.True(t, ok)
require.Equal(t, 2, val)

require.True(t, checkSignal(q.C))
val, ok = q.Pop()
require.True(t, ok)
require.Equal(t, 2, val)

val, ok = q.Peek()
require.True(t, ok)
require.Equal(t, 3, val)

require.True(t, checkSignal(q.C))
val, ok = q.Pop()
require.True(t, ok)
require.Equal(t, 3, val)

require.False(t, checkSignal(q.C))
_, ok = q.Pop()
require.False(t, ok)

_, ok = q.Peek()
require.False(t, ok)
}

func TestSliceQueueManyElements(t *testing.T) {
const numElems = 10000

q := NewSliceQueue[int]()
for i := 0; i < numElems; i++ {
q.Push(i)
}
require.Equal(t, numElems, q.Size())

for i := 0; i < numElems; i++ {
val, ok := q.Pop()
require.True(t, ok)
require.Equal(t, i, val)
}
require.Equal(t, 0, q.Size())

// Repeat the test
for i := 0; i < numElems; i++ {
q.Push(i)
}
require.Equal(t, numElems, q.Size())

for i := 0; i < numElems; i++ {
val, ok := q.Pop()
require.True(t, ok)
require.Equal(t, i, val)
}
require.Equal(t, 0, q.Size())
}

func TestSliceQueueConcurrentWriteAndRead(t *testing.T) {
const numElems = 1000000

q := NewSliceQueue[int]()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < numElems; i++ {
q.Push(i)
}
}()

wg.Add(1)
go func() {
defer wg.Done()

counter := 0
for {
select {
case <-q.C:
}

for {
val, ok := q.Pop()
if !ok {
break
}
require.Equal(t, counter, val)
counter++
if counter == numElems {
return
}
}
}
}()

wg.Wait()
require.Equal(t, 0, q.Size())
}

func checkSignal(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}
Loading

0 comments on commit 18a651e

Please sign in to comment.