Skip to content

Commit

Permalink
[chore] [exporterhelper] Integrate capacity limiting into the communi…
Browse files Browse the repository at this point in the history
…cation channel (open-telemetry#9232)

Integrate capacity limiting into internal channels used by both memory
and persistent queues. Otherwise, with the independent capacity limiter,
it's hard to ensure that queue size is always accurate going forward.

Benchmarks before:
```
goos: darwin
goarch: arm64
Benchmark_QueueUsage_1000_requests-10      	    3252	    325010 ns/op	  246059 B/op	      10 allocs/op
Benchmark_QueueUsage_100000_requests-10    	      39	  29811116 ns/op	24002870 B/op	      10 allocs/op
Benchmark_QueueUsage_10000_items-10        	    3404	    349753 ns/op	  246052 B/op	      10 allocs/op
Benchmark_QueueUsage_1M_items-10           	      40	  29415583 ns/op	24002858 B/op	      10 allocs/op
BenchmarkPersistentQueue_TraceSpans
BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_1-10         	  338180	      3836 ns/op	    2851 B/op	      78 allocs/op
BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_10-10        	   81369	     15822 ns/op	   14598 B/op	     289 allocs/op
BenchmarkPersistentQueue_TraceSpans/#traces:_10_#spansPerTrace:_10-10       	   13066	     90155 ns/op	  130087 B/op	    2417 allocs/op
```

Benchmarks after:
```
Benchmark_QueueUsage_1000_requests-10      	    4210	    278175 ns/op	  246055 B/op	      10 allocs/op
Benchmark_QueueUsage_100000_requests-10    	      42	  25835945 ns/op	24002968 B/op	      10 allocs/op
Benchmark_QueueUsage_10000_items-10        	    4376	    279571 ns/op	  246056 B/op	      10 allocs/op
Benchmark_QueueUsage_1M_items-10           	      42	  26483907 ns/op	24002995 B/op	      10 allocs/op
BenchmarkPersistentQueue_TraceSpans
BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_1-10         	  328268	      4251 ns/op	    2854 B/op	      78 allocs/op
BenchmarkPersistentQueue_TraceSpans/#traces:_1_#spansPerTrace:_10-10        	  101683	     12238 ns/op	   14582 B/op	     289 allocs/op
BenchmarkPersistentQueue_TraceSpans/#traces:_10_#spansPerTrace:_10-10       	   13382	     86464 ns/op	  130154 B/op	    2417 allocs/op
```
  • Loading branch information
dmitryax authored and andrzej-stencel committed May 27, 2024
1 parent 9a7fd3f commit af64f29
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 273 deletions.
4 changes: 2 additions & 2 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func sizerFromConfig[T itemsCounter](Config) queue.Sizer[T] {
return &queue.RequestSizer[T]{}
}

func capacityFromConfig(cfg Config) int {
func capacityFromConfig(cfg Config) int64 {
// TODO: Handle other ways to measure the queue size once they are added.
return cfg.QueueSize
return int64(cfg.QueueSize)
}
23 changes: 9 additions & 14 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,50 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
*queueCapacityLimiter[T]
items chan queueRequest[T]
*sizedChannel[memQueueEl[T]]
sizer Sizer[T]
}

// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
type MemoryQueueSettings[T any] struct {
Sizer Sizer[T]
Capacity int
Capacity int64
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity),
items: make(chan queueRequest[T], set.Capacity),
sizedChannel: newSizedChannel[memQueueEl[T]](set.Capacity, nil, 0),
sizer: set.Sizer,
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
if !q.queueCapacityLimiter.claim(req) {
return ErrQueueIsFull
}
q.items <- queueRequest[T]{ctx: ctx, req: req}
return nil
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil)
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := <-q.items
item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
if !ok {
return false
}
q.queueCapacityLimiter.release(item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
close(q.items)
q.sizedChannel.shutdown()
return nil
}

type queueRequest[T any] struct {
type memQueueEl[T any] struct {
req T
ctx context.Context
}
10 changes: 9 additions & 1 deletion exporter/internal/queue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int)
func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) {
var wg sync.WaitGroup
wg.Add(requestsCount)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: 10 * requestsCount})
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: int64(10 * requestsCount)})
consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error {
wg.Done()
return nil
Expand All @@ -156,3 +156,11 @@ func TestZeroSizeNoConsumers(t *testing.T) {

assert.NoError(t, q.Shutdown(context.Background()))
}

type fakeReq struct {
itemsCount int
}

func (r fakeReq) ItemsCount() int {
return r.itemsCount
}
Loading

0 comments on commit af64f29

Please sign in to comment.