Skip to content

Commit

Permalink
Follow-up on review comments for the bounded queue (#2008)
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling authored Jan 9, 2020
1 parent 08a0a4b commit eb00999
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
3 changes: 2 additions & 1 deletion cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func makeJaegerSpan(service string, rootSpan bool, debugEnabled bool) (*jaeger.S

func TestSpanProcessor(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w).(*spanProcessor)
p := NewSpanProcessor(w, Options.QueueSize(1)).(*spanProcessor)
defer p.Stop()

res, err := p.ProcessSpans([]*model.Span{
Expand All @@ -229,6 +229,7 @@ func TestSpanProcessorErrors(t *testing.T) {
p := NewSpanProcessor(w,
Options.Logger(logger),
Options.ServiceMetrics(serviceMetrics),
Options.QueueSize(1),
).(*spanProcessor)

res, err := p.ProcessSpans([]*model.Span{
Expand Down
44 changes: 26 additions & 18 deletions pkg/queue/bounded_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"unsafe"

"github.com/uber/jaeger-lib/metrics"
uatomic "go.uber.org/atomic"
)

// BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue,
Expand All @@ -30,14 +31,15 @@ import (
// channels, with a special Reaper goroutine that wakes up when the queue is full and consumers
// the items from the top of the queue until its size drops back to maxSize
type BoundedQueue struct {
size int32
onDroppedItem func(item interface{})
items *chan interface{}
stopCh chan struct{}
workers int
stopWG sync.WaitGroup
stopped int32
size *uatomic.Uint32
capacity *uatomic.Uint32
stopped *uatomic.Uint32
items *chan interface{}
onDroppedItem func(item interface{})
consumer func(item interface{})
workers int
stopCh chan struct{}
}

// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
Expand All @@ -48,6 +50,9 @@ func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *Bounde
onDroppedItem: onDroppedItem,
items: &queue,
stopCh: make(chan struct{}),
capacity: uatomic.NewUint32(uint32(capacity)),
stopped: uatomic.NewUint32(0),
size: uatomic.NewUint32(0),
}
}

Expand All @@ -60,14 +65,15 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{}))
for i := 0; i < q.workers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func(queue chan interface{}) {
go func() {
startWG.Done()
defer q.stopWG.Done()
queue := *q.items
for {
select {
case item, ok := <-queue:
if ok {
atomic.AddInt32(&q.size, -1)
q.size.Sub(1)
q.consumer(item)
} else {
// channel closed, finish worker
Expand All @@ -78,31 +84,30 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{}))
return
}
}
}(*q.items)
}()
}
startWG.Wait()
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *BoundedQueue) Produce(item interface{}) bool {
if atomic.LoadInt32(&q.stopped) != 0 {
if q.stopped.Load() != 0 {
q.onDroppedItem(item)
return false
}

// we might have two concurrent backing queues at the moment
// their combined size is stored in q.size, and their combined capacity
// should match the capacity of the new queue
if q.Size() >= q.Capacity() && q.Capacity() > 0 {
// current consumers of the queue (like tests) expect the queue capacity = 0 to work
// so, we don't drop items when the capacity is 0
if q.Size() >= q.Capacity() {
// note that all items will be dropped if the capacity is 0
q.onDroppedItem(item)
return false
}

select {
case *q.items <- item:
atomic.AddInt32(&q.size, 1)
q.size.Add(1)
return true
default:
// should not happen, as overflows should have been captured earlier
Expand All @@ -116,20 +121,20 @@ func (q *BoundedQueue) Produce(item interface{}) bool {
// Stop stops all consumers, as well as the length reporter if started,
// and releases the items channel. It blocks until all consumers have stopped.
func (q *BoundedQueue) Stop() {
atomic.StoreInt32(&q.stopped, 1) // disable producer
q.stopped.Store(1) // disable producer
close(q.stopCh)
q.stopWG.Wait()
close(*q.items)
}

// Size returns the current size of the queue
func (q *BoundedQueue) Size() int {
return int(atomic.LoadInt32(&q.size))
return int(q.size.Load())
}

// Capacity returns capacity of the queue
func (q *BoundedQueue) Capacity() int {
return cap(*q.items)
return int(q.capacity.Load())
}

// StartLengthReporting starts a timer-based goroutine that periodically reports
Expand All @@ -152,7 +157,7 @@ func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge me

// Resize changes the capacity of the queue, returning whether the action was successful
func (q *BoundedQueue) Resize(capacity int) bool {
if capacity == cap(*q.items) {
if capacity == q.Capacity() {
// noop
return false
}
Expand All @@ -169,6 +174,9 @@ func (q *BoundedQueue) Resize(capacity int) bool {

// gracefully drain the existing queue
close(previous)

// update the capacity
q.capacity.Store(uint32(capacity))
}

return swapped
Expand Down
10 changes: 3 additions & 7 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func TestBoundedQueue(t *testing.T) {
_, g := mFact.Snapshot()
if g["size"] == 0 {
time.Sleep(time.Millisecond)
} else {
break
}
}

Expand Down Expand Up @@ -313,16 +315,10 @@ func TestZeroSize(t *testing.T) {
q := NewBoundedQueue(0, func(item interface{}) {
})

var wg sync.WaitGroup
wg.Add(1)
q.StartConsumers(1, func(item interface{}) {
wg.Done()
})

assert.True(t, q.Produce("a")) // in process
wg.Wait()

// if we didn't finish with a timeout, then we are good
assert.False(t, q.Produce("a")) // in process
}

func BenchmarkBoundedQueue(b *testing.B) {
Expand Down

0 comments on commit eb00999

Please sign in to comment.