diff --git a/pkg/containers/chunked_queue.go b/pkg/containers/chunked_queue.go new file mode 100644 index 00000000000..2de7159e77d --- /dev/null +++ b/pkg/containers/chunked_queue.go @@ -0,0 +1,60 @@ +package containers + +import ( + "sync" + + "github.com/edwingeng/deque" +) + +// Deque implements Queue with edwingeng/deque +//nolint:structcheck +type Deque[T any] struct { + // mu protects deque, because it is not thread-safe. + mu sync.RWMutex + deque deque.Deque +} + +// NewDeque creates a new Deque instance +func NewDeque[T any]() *Deque[T] { + return &Deque[T]{ + deque: deque.NewDeque(), + } +} + +func (d *Deque[T]) Push(elem T) { + d.mu.Lock() + defer d.mu.Unlock() + + d.deque.PushBack(elem) +} + +func (d *Deque[T]) Pop() (T, bool) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.deque.Empty() { + var noVal T + return noVal, false + } + + return d.deque.PopFront().(T), true +} + +func (d *Deque[T]) Peek() (T, bool) { + d.mu.RLock() + defer d.mu.RUnlock() + + if d.deque.Empty() { + var noVal T + return noVal, false + } + + return d.deque.Front().(T), true +} + +func (d *Deque[T]) Size() int { + d.mu.RLock() + defer d.mu.RUnlock() + + return d.deque.Len() +} diff --git a/pkg/containers/queue.go b/pkg/containers/queue.go index bf1c449fe72..7743b3a2a0a 100644 --- a/pkg/containers/queue.go +++ b/pkg/containers/queue.go @@ -1,68 +1,9 @@ package containers -import ( - "sync" - - "github.com/edwingeng/deque" -) - -// Queue abstracts a generics double-ended queue, which is thread-safe +// Queue abstracts a generics FIFO queue, which is thread-safe type Queue[T any] interface { - Add(elem T) + Push(elem T) Pop() (T, bool) Peek() (T, bool) Size() int } - -// Deque implements Queue with edwingeng/deque -//nolint:structcheck -type Deque[T any] struct { - // mu protects deque, because it is not thread-safe. - mu sync.RWMutex - deque deque.Deque -} - -// NewDeque creates a new Deque instance -func NewDeque[T any]() *Deque[T] { - return &Deque[T]{ - deque: deque.NewDeque(), - } -} - -func (d *Deque[T]) Add(elem T) { - d.mu.Lock() - defer d.mu.Unlock() - - d.deque.PushBack(elem) -} - -func (d *Deque[T]) Pop() (T, bool) { - d.mu.Lock() - defer d.mu.Unlock() - - if d.deque.Empty() { - var noVal T - return noVal, false - } - - return d.deque.PopFront().(T), true -} - -func (d *Deque[T]) Peek() (T, bool) { - d.mu.RLock() - defer d.mu.RUnlock() - - if d.deque.Empty() { - var noVal T - return noVal, false - } - - return d.deque.Front().(T), true -} - -func (d *Deque[T]) Size() int { - d.mu.RLock() - defer d.mu.RUnlock() - - return d.deque.Len() -} diff --git a/pkg/containers/slice_queue.go b/pkg/containers/slice_queue.go new file mode 100644 index 00000000000..e57762ea309 --- /dev/null +++ b/pkg/containers/slice_queue.go @@ -0,0 +1,113 @@ +package containers + +import "sync" + +// SliceQueue is a FIFO queue implemented +// by a Go slice. +type SliceQueue[T any] struct { + mu sync.Mutex + elems []T + + // C is a signal for non-empty queue. + // A consumer can select for C and then Pop + // as many elements as possible in a for-select + // loop. + // Refer to an example in TestSliceQueueConcurrentWriteAndRead. + C chan struct{} + + pool *sync.Pool +} + +// NewSliceQueue creates a new SliceQueue. +func NewSliceQueue[T any]() *SliceQueue[T] { + return &SliceQueue[T]{ + C: make(chan struct{}, 1), + pool: &sync.Pool{}, + } +} + +func (q *SliceQueue[T]) Push(elem T) { + q.mu.Lock() + + signal := false + if len(q.elems) == 0 { + signal = true + if q.elems == nil { + q.elems = q.allocateSlice() + q.elems = q.elems[:0] + } + } + + q.elems = append(q.elems, elem) + q.mu.Unlock() + + if signal { + select { + case q.C <- struct{}{}: + default: + } + } +} + +func (q *SliceQueue[T]) Pop() (T, bool) { + q.mu.Lock() + + var zero T + if len(q.elems) == 0 { + q.mu.Unlock() + return zero, false + } + + ret := q.elems[0] + q.elems[0] = zero + q.elems = q.elems[1:] + + if len(q.elems) == 0 { + q.freeSlice(q.elems) + q.elems = nil + } else { + // non empty queue + select { + case q.C <- struct{}{}: + default: + } + } + + q.mu.Unlock() + return ret, true +} + +func (q *SliceQueue[T]) Peek() (retVal T, ok bool) { + q.mu.Lock() + defer q.mu.Unlock() + + if len(q.elems) == 0 { + ok = false + return + } + + return q.elems[0], true +} + +func (q *SliceQueue[T]) Size() int { + q.mu.Lock() + defer q.mu.Unlock() + + return len(q.elems) +} + +func (q *SliceQueue[T]) allocateSlice() []T { + ptr := q.pool.Get() + if ptr == nil { + return make([]T, 0, 16) + } + + return *(ptr.(*[]T)) +} + +func (q *SliceQueue[T]) freeSlice(s []T) { + if len(s) != 0 { + panic("only empty slice allowed") + } + q.pool.Put(&s) +} diff --git a/pkg/containers/slice_queue_test.go b/pkg/containers/slice_queue_test.go new file mode 100644 index 00000000000..c73736f1d79 --- /dev/null +++ b/pkg/containers/slice_queue_test.go @@ -0,0 +1,136 @@ +package containers + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSliceQueueBasics(t *testing.T) { + q := NewSliceQueue[int]() + + require.False(t, checkSignal(q.C)) + require.Equal(t, 0, q.Size()) + q.Push(1) + require.Equal(t, 1, q.Size()) + q.Push(2) + require.Equal(t, 2, q.Size()) + q.Push(3) + require.Equal(t, 3, q.Size()) + + val, ok := q.Peek() + require.True(t, ok) + require.Equal(t, 1, val) + + require.True(t, checkSignal(q.C)) + val, ok = q.Pop() + require.True(t, ok) + require.Equal(t, 1, val) + + val, ok = q.Peek() + require.True(t, ok) + require.Equal(t, 2, val) + + require.True(t, checkSignal(q.C)) + val, ok = q.Pop() + require.True(t, ok) + require.Equal(t, 2, val) + + val, ok = q.Peek() + require.True(t, ok) + require.Equal(t, 3, val) + + require.True(t, checkSignal(q.C)) + val, ok = q.Pop() + require.True(t, ok) + require.Equal(t, 3, val) + + require.False(t, checkSignal(q.C)) + _, ok = q.Pop() + require.False(t, ok) + + _, ok = q.Peek() + require.False(t, ok) +} + +func TestSliceQueueManyElements(t *testing.T) { + const numElems = 10000 + + q := NewSliceQueue[int]() + for i := 0; i < numElems; i++ { + q.Push(i) + } + require.Equal(t, numElems, q.Size()) + + for i := 0; i < numElems; i++ { + val, ok := q.Pop() + require.True(t, ok) + require.Equal(t, i, val) + } + require.Equal(t, 0, q.Size()) + + // Repeat the test + for i := 0; i < numElems; i++ { + q.Push(i) + } + require.Equal(t, numElems, q.Size()) + + for i := 0; i < numElems; i++ { + val, ok := q.Pop() + require.True(t, ok) + require.Equal(t, i, val) + } + require.Equal(t, 0, q.Size()) +} + +func TestSliceQueueConcurrentWriteAndRead(t *testing.T) { + const numElems = 1000000 + + q := NewSliceQueue[int]() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < numElems; i++ { + q.Push(i) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + counter := 0 + for { + select { + case <-q.C: + } + + for { + val, ok := q.Pop() + if !ok { + break + } + require.Equal(t, counter, val) + counter++ + if counter == numElems { + return + } + } + } + }() + + wg.Wait() + require.Equal(t, 0, q.Size()) +} + +func checkSignal(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} diff --git a/pkg/notifier/notifier.go b/pkg/notifier/notifier.go new file mode 100644 index 00000000000..a6047326ff9 --- /dev/null +++ b/pkg/notifier/notifier.go @@ -0,0 +1,187 @@ +package notifier + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "go.uber.org/atomic" + + "github.com/hanfei1991/microcosm/pkg/containers" +) + +type receiverID = int64 + +// Notifier is the sending endpoint of an event +// notification mechanism. It broadcasts a stream of +// events to a number of receivers. +type Notifier[T any] struct { + receivers sync.Map // receiverID -> *Receiver[T] + nextID atomic.Int64 + + // queue is unbounded. + queue *containers.SliceQueue[T] + + closed atomic.Bool + closeCh chan struct{} + synchronizeCh chan struct{} + + wg sync.WaitGroup +} + +// Receiver is the receiving endpoint of a single-producer-multiple-consumer +// notification mechanism. +type Receiver[T any] struct { + // C is a channel to read the events from. + // Note that it is part of the public interface of this package. + C chan T + + id receiverID + + closeOnce sync.Once + + // closed MUST be set to true before closing `C`. + closed atomic.Bool + + notifier *Notifier[T] +} + +// Close closes the receiver +func (r *Receiver[T]) Close() { + r.closeOnce.Do( + func() { + r.closed.Store(true) + // Waits for the synchronization barrier, which + // means that run() has finished the last iteration, + // and since we have set `closed` to true, the `C` channel, + // will not be written to anymore. So it is safe to close it now. + <-r.notifier.synchronizeCh + close(r.C) + r.notifier.receivers.Delete(r.id) + }) +} + +// NewNotifier creates a new Notifier. +func NewNotifier[T any]() *Notifier[T] { + ret := &Notifier[T]{ + receivers: sync.Map{}, + queue: containers.NewSliceQueue[T](), + closeCh: make(chan struct{}), + synchronizeCh: make(chan struct{}), + } + + ret.wg.Add(1) + go func() { + defer ret.wg.Done() + ret.run() + }() + return ret +} + +// NewReceiver creates a new Receiver associated with +// the given Notifier. +func (n *Notifier[T]) NewReceiver() *Receiver[T] { + ch := make(chan T, 16) + receiver := &Receiver[T]{ + id: n.nextID.Add(1), + C: ch, + notifier: n, + } + + n.receivers.Store(receiver.id, receiver) + return receiver +} + +// Notify sends a new notification event. +func (n *Notifier[T]) Notify(event T) { + n.queue.Push(event) +} + +// Close closes the notifier. +func (n *Notifier[T]) Close() { + if n.closed.Swap(true) { + // Ensures idempotency of closing once. + return + } + + close(n.closeCh) + n.wg.Wait() + + n.receivers.Range(func(_, value any) bool { + receiver := value.(*Receiver[T]) + receiver.Close() + return true + }) +} + +// Flush flushes all pending notifications. +// Note that for Flush to work as expected, a +// quiescent period is required, i.e. you should +// not send more events until Flush returns. +func (n *Notifier[T]) Flush(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-n.synchronizeCh: + // Checks the queue size after each iteration + // of run(). + } + + if n.queue.Size() == 0 { + return nil + } + } +} + +func (n *Notifier[T]) run() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + defer func() { + close(n.synchronizeCh) + }() + + for { + select { + case <-n.closeCh: + return + case n.synchronizeCh <- struct{}{}: + // no-op here. Just a synchronization barrier. + case <-n.queue.C: + Inner: + for { + event, ok := n.queue.Pop() + if !ok { + break Inner + } + + // TODO In the current implementation, congestion + // in once receiver will prevent all other receivers + // from receiving events. + n.receivers.Range(func(_, value any) bool { + receiver := value.(*Receiver[T]) + + if receiver.closed.Load() { + return true + } + + select { + case <-n.closeCh: + return false + case receiver.C <- event: + // send the event to the receiver. + } + return true + }) + + select { + case <-n.closeCh: + return + default: + } + } + } + } +} diff --git a/pkg/notifier/notifier_test.go b/pkg/notifier/notifier_test.go new file mode 100644 index 00000000000..a4d8e598797 --- /dev/null +++ b/pkg/notifier/notifier_test.go @@ -0,0 +1,87 @@ +package notifier + +import ( + "context" + "math" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNotifierBasics(t *testing.T) { + n := NewNotifier[int]() + defer n.Close() + + const ( + numReceivers = 10 + numEvents = 10000 + finEv = math.MaxInt + ) + var wg sync.WaitGroup + + for i := 0; i < numReceivers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + r := n.NewReceiver() + defer r.Close() + + var ev, lastEv int + for { + select { + case ev = <-r.C: + } + + if ev == finEv { + return + } + + if lastEv != 0 { + require.Equal(t, lastEv+1, ev) + } + lastEv = ev + } + }() + } + + for i := 1; i <= numEvents; i++ { + n.Notify(i) + } + + n.Notify(finEv) + err := n.Flush(context.Background()) + require.NoError(t, err) + + wg.Wait() +} + +func TestNotifierClose(t *testing.T) { + n := NewNotifier[int]() + defer n.Close() + + const ( + numReceivers = 1000 + ) + var wg sync.WaitGroup + + for i := 0; i < numReceivers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + r := n.NewReceiver() + defer r.Close() + + _, ok := <-r.C + require.False(t, ok) + }() + } + + time.Sleep(1 * time.Second) + n.Close() + + wg.Wait() +}