From 52938f92f4d2c41769e1ab66843d0a0b5287e7a2 Mon Sep 17 00:00:00 2001 From: b97tsk Date: Fri, 9 Aug 2024 08:00:00 +0800 Subject: [PATCH] update(Unicast): preserve buffers for later reuse --- internal/queue/queue.go | 33 ++++++++++++++++++++------------- internal/queue/queue_test.go | 6 ++++-- unicast.go | 17 ++++++++++++++--- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index ae4804e4..c7591992 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -13,6 +13,14 @@ func (q *Queue[E]) Init() { q.head, q.tail = nil, nil } +// Clear clears q, preserving existing buffer. +func (q *Queue[E]) Clear() { + clear(q.head) + clear(q.tail) + s := q.tail[:0] + q.head, q.tail = s, s +} + // Cap returns the capacity of the internal buffer. If Cap() equals to Len(), // new Push(x) causes the internal buffer to grow. func (q *Queue[E]) Cap() int { @@ -27,9 +35,8 @@ func (q *Queue[E]) Len() int { // Push inserts an element at the end of q. func (q *Queue[E]) Push(x E) { if q.Len() == q.Cap() { // Grow if full. - buf := append(append(q.head, q.tail...), x) - q.head, q.tail = buf, buf[:0] - + s := append(append(q.head, q.tail...), x) + q.head, q.tail = s, s[:0] return } @@ -42,7 +49,7 @@ func (q *Queue[E]) Push(x E) { // Pop removes and returns the first element. It panics if q is empty. func (q *Queue[E]) Pop() E { - if len(q.head) > 0 { + if len(q.head) != 0 { x := q.head[0] var zero E @@ -55,8 +62,8 @@ func (q *Queue[E]) Pop() E { } if n, m := q.Len(), q.Cap(); n == m>>2 && m > smallSize { // Shrink if sparse. - buf := make([]E, n<<1) - q.head, q.tail = buf[:q.CopyTo(buf)], buf[:0] + s := make([]E, n<<1) + q.head, q.tail = s[:q.CopyTo(s)], s[:0] } return x @@ -86,7 +93,7 @@ func (q *Queue[E]) At(i int) E { // Front returns the first element. It panics if q is empty. func (q *Queue[E]) Front() E { - if len(q.head) > 0 { + if len(q.head) != 0 { return q.head[0] } @@ -95,11 +102,11 @@ func (q *Queue[E]) Front() E { // Back returns the last element. It panics if q is empty. func (q *Queue[E]) Back() E { - if n := len(q.tail); n > 0 { + if n := len(q.tail); n != 0 { return q.tail[n-1] } - if n := len(q.head); n > 0 { + if n := len(q.head); n != 0 { return q.head[n-1] } @@ -116,12 +123,12 @@ func (q *Queue[E]) CopyTo(dst []E) int { // Clone clones q. func (q *Queue[E]) Clone() Queue[E] { - var buf []E + var s []E if q.head != nil { - buf = make([]E, q.Len(), q.Cap()) - q.CopyTo(buf) + s = make([]E, q.Len(), q.Cap()) + q.CopyTo(s) } - return Queue[E]{buf, buf[:0]} + return Queue[E]{s, s[:0]} } diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index 300a0a35..22e731cc 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -11,6 +11,8 @@ func TestQueue(t *testing.T) { var q queue.Queue[string] + q.Init() + t.Logf("Len=%v Cap=%v", q.Len(), q.Cap()) if q.Len() != 0 { @@ -67,10 +69,10 @@ func TestQueue(t *testing.T) { t.FailNow() } - cloned.Init() + cloned.Clear() if cloned.Len() != 0 { - t.Logf("Init(): Len=%v Cap=%v", cloned.Len(), cloned.Cap()) + t.Logf("Clear(): Len=%v Cap=%v", cloned.Len(), cloned.Cap()) t.FailNow() } diff --git a/unicast.go b/unicast.go index 9f6845a8..67dc24de 100644 --- a/unicast.go +++ b/unicast.go @@ -52,12 +52,14 @@ type unicast[T any] struct { waiters int emitting bool lastn Notification[struct{}] - buf queue.Queue[T] + buf, alt queue.Queue[T] context context.Context done <-chan struct{} observer Observer[T] } +const bufLimit = 32 + func (u *unicast[T]) Emit(n Notification[T]) { u.mu.Lock() @@ -96,7 +98,6 @@ func (u *unicast[T]) Emit(n Notification[T]) { } if n.Kind == KindNext { - const bufLimit = 32 // Only up to this number of values can fill into u.Buf. Again: for u.buf.Len() >= max(u.buf.Cap(), bufLimit) { if n.Kind == 0 && u.waiters != 0 { @@ -162,6 +163,7 @@ func (u *unicast[T]) startEmitting(n Notification[T]) { u.emitting = false u.lastn = Stop[struct{}](err) u.buf.Init() + u.alt.Init() u.mu.Unlock() u.co.Broadcast() u.observer.Stop(err) @@ -176,7 +178,7 @@ func (u *unicast[T]) startEmitting(n Notification[T]) { for { var buf queue.Queue[T] - u.buf, buf = buf, u.buf + buf, u.buf, u.alt = u.buf, u.alt, buf u.mu.Unlock() u.co.Broadcast() @@ -212,6 +214,15 @@ func (u *unicast[T]) startEmitting(n Notification[T]) { u.mu.Lock() + if n := buf.Cap(); n != 0 && n <= bufLimit { + buf.Clear() + if u.buf.Cap() == 0 { + u.buf = buf + } else { + u.alt = buf + } + } + if u.buf.Len() == 0 { lastn := u.lastn