From 6b47a4d752350fa0b5774e6ac1eea0998dbb1dfd Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Sun, 7 Jan 2024 16:29:05 -0800 Subject: [PATCH] [chore] [exporterhelper] Integrate capacity limiting into a helper queue Integrate capacity limiting into internal channels used by both memory and persistent queues. Otherwise, with the independent capacity limiter, it's hard to ensure that queue size is always accurate going forward. --- .../internal/bounded_memory_queue.go | 21 +-- .../internal/bounded_memory_queue_test.go | 8 + .../internal/persistent_queue.go | 177 ++++++++---------- .../internal/persistent_queue_test.go | 68 ++++++- exporter/exporterhelper/internal/queue.go | 23 +++ .../exporterhelper/internal/queue_capacity.go | 74 -------- .../internal/queue_capacity_test.go | 58 ------ .../internal/sized_elements_channel.go | 101 ++++++++++ .../internal/sized_elements_channel_test.go | 44 +++++ 9 files changed, 323 insertions(+), 251 deletions(-) delete mode 100644 exporter/exporterhelper/internal/queue_capacity.go delete mode 100644 exporter/exporterhelper/internal/queue_capacity_test.go create mode 100644 exporter/exporterhelper/internal/sized_elements_channel.go create mode 100644 exporter/exporterhelper/internal/sized_elements_channel_test.go diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index 85435d2aa61..dff4792eeab 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -16,8 +16,8 @@ import ( // the producer are dropped. type boundedMemoryQueue[T any] struct { component.StartFunc - *queueCapacityLimiter[T] - items chan queueRequest[T] + *sizedElementsChannel[memQueueEl[T]] + sizer Sizer[T] } // MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. @@ -30,29 +30,24 @@ type MemoryQueueSettings[T any] struct { // callback for dropped items (e.g. useful to emit metrics). func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), - items: make(chan queueRequest[T], set.Capacity), + sizedElementsChannel: newSizedElementsChannel[memQueueEl[T]](set.Capacity), + sizer: set.Sizer, } } // Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { - if !q.queueCapacityLimiter.claim(req) { - return ErrQueueIsFull - } - q.items <- queueRequest[T]{ctx: ctx, req: req} - return nil + return q.sizedElementsChannel.enqueue(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := <-q.items + item, ok := q.sizedElementsChannel.dequeue(func(el memQueueEl[T]) uint64 { return q.sizer.Sizeof(el.req) }) if !ok { return false } - q.queueCapacityLimiter.release(item.req) // the memory queue doesn't handle consume errors _ = consumeFunc(item.ctx, item.req) return true @@ -60,11 +55,11 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { - close(q.items) + q.sizedElementsChannel.shutdown() return nil } -type queueRequest[T any] struct { +type memQueueEl[T any] struct { req T ctx context.Context } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index a26d32120cd..bcd7f58d5e3 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -156,3 +156,11 @@ func TestZeroSizeNoConsumers(t *testing.T) { assert.NoError(t, q.Shutdown(context.Background())) } + +type fakeReq struct { + itemsCount int +} + +func (r fakeReq) ItemsCount() int { + return r.itemsCount +} diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index 0a6629cb87e..0f8dadcc3b6 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -10,7 +10,6 @@ import ( "fmt" "strconv" "sync" - "sync/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -43,7 +42,10 @@ import ( // index index x // xxxx deleted type persistentQueue[T any] struct { - *queueCapacityLimiter[T] + // sizedElementsChannel is used by the persistent queue for two purposes: + // 1. a communication channel notifying the consumer that a new item is available. + // 2. capacity control based on the size of the items. + *sizedElementsChannel[permanentQueueEl] set PersistentQueueSettings[T] logger *zap.Logger @@ -52,14 +54,10 @@ type persistentQueue[T any] struct { // isRequestSized indicates whether the queue is sized by the number of requests. isRequestSized bool - putChan chan struct{} - // mu guards everything declared below. mu sync.Mutex readIndex uint64 writeIndex uint64 - initIndexSize uint64 - initQueueSize *atomic.Uint64 currentlyDispatchedItems []uint64 refClient int64 stopped bool @@ -97,12 +95,10 @@ type PersistentQueueSettings[T any] struct { func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T] { _, isRequestSized := set.Sizer.(*RequestSizer[T]) return &persistentQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), + sizedElementsChannel: newSizedElementsChannel[permanentQueueEl](set.Capacity), set: set, logger: set.ExporterSettings.Logger, - initQueueSize: &atomic.Uint64{}, isRequestSized: isRequestSized, - putChan: make(chan struct{}, set.Capacity), } } @@ -147,39 +143,46 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex pq.readIndex = 0 pq.writeIndex = 0 } - pq.initIndexSize = pq.writeIndex - pq.readIndex - // Ensure the communication channel has the same size as the queue - for i := 0; i < int(pq.initIndexSize); i++ { - pq.putChan <- struct{}{} - } + initIndexSize := pq.writeIndex - pq.readIndex - // Read snapshot of the queue size from storage. It's not a problem if the value cannot be fetched, - // or it's not accurate. The queue size will be corrected once the recovered queue is drained. - if pq.initIndexSize > 0 { + // Pre-allocate the communication channel with the size of the restored queue. + if initIndexSize > 0 { + initQueueSize := initIndexSize // If the queue is sized by the number of requests, no need to read the queue size from storage. - if pq.isRequestSized { - pq.initQueueSize.Store(pq.initIndexSize) - return + if !pq.isRequestSized { + if restoredQueueSize, err := pq.restoreQueueSizeFromStorage(ctx); err == nil { + initQueueSize = restoredQueueSize + } } - res, err := pq.client.Get(ctx, queueSizeKey) - if err == nil { - var restoredQueueSize uint64 - restoredQueueSize, err = bytesToItemIndex(res) - pq.initQueueSize.Store(restoredQueueSize) - } - if err != nil { - if errors.Is(err, errValueNotSet) { - pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ - "The reported queue size will be inaccurate until the initial queue is drained. "+ - "It's expected when the items sized queue enabled for the first time", zap.Error(err)) - } else { - pq.logger.Error("Failed to read the queue size snapshot from storage. "+ - "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) - } + // Ensure the communication channel filled with evenly sized elements up to the total restored queue size. + pq.sizedElementsChannel.initBulkEnqueue(make([]permanentQueueEl, initIndexSize), initQueueSize) + } +} + +// permanentQueueEl is the type of the elements passed to the sizedElementsChannel by the persistentQueue. +type permanentQueueEl struct{} + +// restoreQueueSizeFromStorage restores the queue size from storage. +func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (uint64, error) { + var queueSize uint64 + val, err := pq.client.Get(ctx, queueSizeKey) + if err == nil { + queueSize, err = bytesToItemIndex(val) + } + if err != nil { + if errors.Is(err, errValueNotSet) { + pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ + "The reported queue size will be inaccurate until the initial queue is drained. "+ + "It's expected when the items sized queue enabled for the first time", zap.Error(err)) + } else { + pq.logger.Error("Failed to read the queue size snapshot from storage. "+ + "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) } + return 0, err } + return queueSize, nil } // Consume applies the provided function on the head of queue. @@ -187,14 +190,24 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex // The function returns true when an item is consumed or false if the queue is stopped. func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { for { + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + // If we are stopped we still process all the other events in the channel before, but we // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := <-pq.putChan + _, ok := pq.sizedElementsChannel.dequeue(func(permanentQueueEl) uint64 { + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if !consumed { + return 0 + } + return pq.set.Sizer.Sizeof(req) + }) if !ok { return false } - - req, onProcessingFinished, consumed := pq.getNextItem(context.Background()) if consumed { onProcessingFinished(consumeFunc(context.Background(), req)) return true @@ -202,31 +215,24 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error } } -// Size returns the current size of the queue. -func (pq *persistentQueue[T]) Size() int { - return int(pq.initQueueSize.Load()) + pq.queueCapacityLimiter.Size() -} - func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { - close(pq.putChan) + // If the queue is not initialized, there is nothing to shut down. + if pq.client == nil { + return nil + } + pq.mu.Lock() defer pq.mu.Unlock() + backupErr := pq.backupQueueSize(ctx) + pq.sizedElementsChannel.shutdown() // Mark this queue as stopped, so consumer don't start any more work. pq.stopped = true - return multierr.Combine( - pq.backupQueueSize(ctx), - pq.unrefClient(ctx), - ) + return multierr.Combine(backupErr, pq.unrefClient(ctx)) } // backupQueueSize writes the current queue size to storage. The value is used to recover the queue size // in case if the collector is killed. func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error { - // Client can be nil if the queue is not initialized yet. - if pq.client == nil { - return nil - } - // No need to write the queue size if the queue is sized by the number of requests. // That information is already stored as difference between read and write indexes. if pq.isRequestSized { @@ -257,34 +263,31 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { - if !pq.queueCapacityLimiter.claim(req) { - pq.logger.Warn("Maximum queue capacity reached") - return ErrQueueIsFull - } + err := pq.sizedElementsChannel.enqueue(permanentQueueEl{}, pq.set.Sizer.Sizeof(req), func() error { + itemKey := getItemKey(pq.writeIndex) + newIndex := pq.writeIndex + 1 - itemKey := getItemKey(pq.writeIndex) - newIndex := pq.writeIndex + 1 + reqBuf, err := pq.set.Marshaler(req) + if err != nil { + return err + } - reqBuf, err := pq.set.Marshaler(req) + // Carry out a transaction where we both add the item and update the write index + ops := []storage.Operation{ + storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), + storage.SetOperation(itemKey, reqBuf), + } + if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { + return storageErr + } + + pq.writeIndex = newIndex + return nil + }) if err != nil { - pq.queueCapacityLimiter.release(req) return err } - // Carry out a transaction where we both add the item and update the write index - ops := []storage.Operation{ - storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), - storage.SetOperation(itemKey, reqBuf), - } - if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { - pq.queueCapacityLimiter.release(req) - return storageErr - } - - pq.writeIndex = newIndex - // Inform the loop that there's some data to process - pq.putChan <- struct{}{} - // Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 5 { @@ -336,8 +339,6 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), return request, nil, false } - pq.releaseCapacity(request) - // Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 0 { @@ -373,28 +374,6 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), }, true } -// releaseCapacity releases the capacity of the queue. The caller must hold the mutex. -func (pq *persistentQueue[T]) releaseCapacity(req T) { - // If the recovered queue size is not emptied yet, decrease it first. - if pq.initIndexSize > 0 { - pq.initIndexSize-- - if pq.initIndexSize == 0 { - pq.initQueueSize.Store(0) - return - } - reqSize := pq.queueCapacityLimiter.sizeOf(req) - if pq.initQueueSize.Load() < reqSize { - pq.initQueueSize.Store(0) - return - } - pq.initQueueSize.Add(^(reqSize - 1)) - return - } - - // Otherwise, decrease the current queue size. - pq.queueCapacityLimiter.release(req) -} - // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage // and moves the items at the back of the queue. func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Context) { diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 166449bdb25..4a879813588 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -517,8 +517,6 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.NoError(t, err) } - // get one item out, but don't mark it as processed - <-ps.putChan require.True(t, ps.Consume(func(ctx context.Context, traces tracesRequest) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) @@ -846,27 +844,29 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 100) - // The queue items size cannot be restored to the previous size. Falls back to 0. - assert.Equal(t, 0, newPQ.Size()) + // The queue items size cannot be restored, fall back to request-based size + assert.Equal(t, 2, newPQ.Size()) assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - // Only new items are reflected - assert.Equal(t, 10, newPQ.Size()) + // Only new items are correctly reflected + assert.Equal(t, 12, newPQ.Size()) - // Consuming old items should does not affect the size. + // Consuming a restored request should reduce the restored size by 2 - maximum size of the restored items. assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) assert.Equal(t, 10, newPQ.Size()) + // Consuming another restored request should not affect the restored size since it's already dropped to 0. assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { assert.Equal(t, 25, traces.traces.SpanCount()) return nil })) assert.Equal(t, 10, newPQ.Size()) + // Once the restored queue size is dropped to 0, consuming requests should be correctly reflected in the queue size. assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { assert.Equal(t, 10, traces.traces.SpanCount()) return nil @@ -876,6 +876,60 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.NoError(t, newPQ.Shutdown(context.Background())) } +// This test covers the case when the queue is restarted with the less capacity than needed to restore the queued items. +// In that case, the queue has to be restored anyway even if it exceeds the capacity limit. +func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { + ext := NewMockStorageExtension(nil) + pq := createTestPersistentQueueWithRequestsCapacity(t, ext, 100) + + assert.Equal(t, 0, pq.Size()) + + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(1, 5))) + + // Read the first request just to populate the read index in the storage. + // Otherwise, the write index won't be restored either. + assert.True(t, pq.Consume(func(ctx context.Context, traces tracesRequest) error { + assert.Equal(t, 40, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 3, pq.Size()) + + assert.NoError(t, pq.Shutdown(context.Background())) + + // The queue is restarted with the less capacity than needed to restore the queued items, but with the same + // underlying storage. No need to drop requests that are over capacity since they are already in the storage. + newPQ := createTestPersistentQueueWithRequestsCapacity(t, ext, 2) + + // The queue items size cannot be restored, fall back to request-based size + assert.Equal(t, 3, newPQ.Size()) + + // Queue is full + assert.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { + assert.Equal(t, 20, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 2, newPQ.Size()) + + // Still full + assert.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { + assert.Equal(t, 25, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 1, newPQ.Size()) + + // Now it can accept new items + assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.NoError(t, newPQ.Shutdown(context.Background())) +} + func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[tracesRequest], compare []uint64) { pq.mu.Lock() defer pq.mu.Unlock() diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/exporterhelper/internal/queue.go index 8bd8879a940..f2f23af97c0 100644 --- a/exporter/exporterhelper/internal/queue.go +++ b/exporter/exporterhelper/internal/queue.go @@ -34,3 +34,26 @@ type Queue[T any] interface { // Capacity returns the capacity of the queue. Capacity() int } + +type itemsCounter interface { + ItemsCount() int +} + +// Sizer is an interface that returns the size of the given element. +type Sizer[T any] interface { + Sizeof(T) uint64 +} + +// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. +type ItemsSizer[T itemsCounter] struct{} + +func (is *ItemsSizer[T]) Sizeof(el T) uint64 { + return uint64(el.ItemsCount()) +} + +// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. +type RequestSizer[T any] struct{} + +func (rs *RequestSizer[T]) Sizeof(T) uint64 { + return 1 +} diff --git a/exporter/exporterhelper/internal/queue_capacity.go b/exporter/exporterhelper/internal/queue_capacity.go deleted file mode 100644 index 0466de0d8b2..00000000000 --- a/exporter/exporterhelper/internal/queue_capacity.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - -import ( - "sync/atomic" -) - -type itemsCounter interface { - ItemsCount() int -} - -// Sizer is an interface that returns the size of the given element. -type Sizer[T any] interface { - SizeOf(T) uint64 -} - -// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. -type ItemsSizer[T itemsCounter] struct{} - -func (is *ItemsSizer[T]) SizeOf(el T) uint64 { - return uint64(el.ItemsCount()) -} - -// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. -type RequestSizer[T any] struct{} - -func (rs *RequestSizer[T]) SizeOf(T) uint64 { - return 1 -} - -type queueCapacityLimiter[T any] struct { - used *atomic.Uint64 - cap uint64 - sz Sizer[T] -} - -func (bcl queueCapacityLimiter[T]) Capacity() int { - return int(bcl.cap) -} - -func (bcl queueCapacityLimiter[T]) Size() int { - return int(bcl.used.Load()) -} - -func (bcl queueCapacityLimiter[T]) claim(el T) bool { - size := bcl.sizeOf(el) - if bcl.used.Add(size) > bcl.cap { - bcl.releaseSize(size) - return false - } - return true -} - -func (bcl queueCapacityLimiter[T]) release(el T) { - bcl.releaseSize(bcl.sizeOf(el)) -} - -func (bcl queueCapacityLimiter[T]) releaseSize(size uint64) { - bcl.used.Add(^(size - 1)) -} - -func (bcl queueCapacityLimiter[T]) sizeOf(el T) uint64 { - return bcl.sz.SizeOf(el) -} - -func newQueueCapacityLimiter[T any](sizer Sizer[T], capacity int) *queueCapacityLimiter[T] { - return &queueCapacityLimiter[T]{ - used: &atomic.Uint64{}, - cap: uint64(capacity), - sz: sizer, - } -} diff --git a/exporter/exporterhelper/internal/queue_capacity_test.go b/exporter/exporterhelper/internal/queue_capacity_test.go deleted file mode 100644 index 7a3b3ad41f2..00000000000 --- a/exporter/exporterhelper/internal/queue_capacity_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRequestsCapacityLimiter(t *testing.T) { - rl := newQueueCapacityLimiter[fakeReq](&RequestSizer[fakeReq]{}, 2) - assert.Equal(t, 0, rl.Size()) - assert.Equal(t, 2, rl.Capacity()) - - req := fakeReq{itemsCount: 5} - - assert.True(t, rl.claim(req)) - assert.Equal(t, 1, rl.Size()) - - assert.True(t, rl.claim(req)) - assert.Equal(t, 2, rl.Size()) - - assert.False(t, rl.claim(req)) - assert.Equal(t, 2, rl.Size()) - - rl.release(req) - assert.Equal(t, 1, rl.Size()) -} - -func TestItemsCapacityLimiter(t *testing.T) { - rl := newQueueCapacityLimiter[fakeReq](&ItemsSizer[fakeReq]{}, 7) - assert.Equal(t, 0, rl.Size()) - assert.Equal(t, 7, rl.Capacity()) - - req := fakeReq{itemsCount: 3} - - assert.True(t, rl.claim(req)) - assert.Equal(t, 3, rl.Size()) - - assert.True(t, rl.claim(req)) - assert.Equal(t, 6, rl.Size()) - - assert.False(t, rl.claim(req)) - assert.Equal(t, 6, rl.Size()) - - rl.release(req) - assert.Equal(t, 3, rl.Size()) -} - -type fakeReq struct { - itemsCount int -} - -func (r fakeReq) ItemsCount() int { - return r.itemsCount -} diff --git a/exporter/exporterhelper/internal/sized_elements_channel.go b/exporter/exporterhelper/internal/sized_elements_channel.go new file mode 100644 index 00000000000..1e5abbd1f6c --- /dev/null +++ b/exporter/exporterhelper/internal/sized_elements_channel.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import "sync/atomic" + +// sizedElementsChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. +// The channel will accept elements until the total size of the elements reaches the capacity. +type sizedElementsChannel[T any] struct { + used *atomic.Uint64 + cap uint64 + ch chan T + + // The following fields are used by the persistent queue to initialize the queue with the elements recovered from + // the disk. Synchronization is not required because the persistent queue doesn't access it concurrently. + // dequeue() will decreases the initElementsCount and initTotalSize before the used field. + initTotalSize uint64 + initElementsCount uint64 +} + +// newSizedElementsChannel creates a sized elements channel. Each element is assigned a size by the provided sizer. +// chanCapacity is the capacity of the underlying channel which usually should be equal to the capacity of the queue to +// avoid blocking the producer. +func newSizedElementsChannel[T any](capacity int) *sizedElementsChannel[T] { + return &sizedElementsChannel[T]{ + used: &atomic.Uint64{}, + cap: uint64(capacity), + ch: make(chan T, capacity), + } +} + +// enqueue puts the element into the queue with the given sized if there is enough capacity. +// Returns an error if the queue is full. The callback is called before the element is committed to the queue. +// If the callback returns an error, the element is not put into the queue and the error is returned. +func (vcq *sizedElementsChannel[T]) enqueue(el T, size uint64, callback func() error) error { + if vcq.initTotalSize+vcq.used.Add(size) > vcq.cap { + vcq.used.Add(^(size - 1)) + return ErrQueueIsFull + } + if callback != nil { + if err := callback(); err != nil { + vcq.used.Add(^(size - 1)) + return err + } + } + vcq.ch <- el + return nil +} + +// initBulkEnqueue puts the elements into the queue with the given size. It's used by the persistent queue to +// initialize the queue with the elements recovered from the disk. +func (vcq *sizedElementsChannel[T]) initBulkEnqueue(els []T, totalSize uint64) { + vcq.initElementsCount = uint64(len(els)) + vcq.initTotalSize = totalSize + if cap(vcq.ch) < len(els) { + vcq.ch = make(chan T, len(els)) + } + for _, el := range els { + vcq.ch <- el + } +} + +// dequeue removes the element from the queue and returns it. +// The call blocks until there is an item available or the queue is stopped. +// The function returns true when an item is consumed or false if the queue is stopped and emptied. +// The callback is called before the element is removed from the queue. It must return the size of the element. +func (vcq *sizedElementsChannel[T]) dequeue(callback func(T) (size uint64)) (T, bool) { + el, ok := <-vcq.ch + if !ok { + return el, false + } + + size := callback(el) + + if vcq.initElementsCount > 0 { + vcq.initElementsCount-- + if vcq.initElementsCount == 0 || vcq.initTotalSize < size { + vcq.initTotalSize = 0 + } else { + vcq.initTotalSize -= size + } + return el, true + } + + vcq.used.Add(^(size - 1)) + return el, true +} + +// shutdown closes the queue channel to initiate draining of the queue. +func (vcq *sizedElementsChannel[T]) shutdown() { + close(vcq.ch) +} + +func (vcq *sizedElementsChannel[T]) Size() int { + return int(vcq.initTotalSize + vcq.used.Load()) +} + +func (vcq *sizedElementsChannel[T]) Capacity() int { + return int(vcq.cap) +} diff --git a/exporter/exporterhelper/internal/sized_elements_channel_test.go b/exporter/exporterhelper/internal/sized_elements_channel_test.go new file mode 100644 index 00000000000..0862f77411b --- /dev/null +++ b/exporter/exporterhelper/internal/sized_elements_channel_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestVariedCapacityChannel(t *testing.T) { + q := newSizedElementsChannel[int](7) + assert.NoError(t, q.enqueue(1, 1, nil)) + assert.Equal(t, 1, q.Size()) + assert.Equal(t, 7, q.Capacity()) + + // failed callback should not allow the element to be added + assert.Error(t, q.enqueue(2, 2, func() error { return errors.New("failed") })) + assert.Equal(t, 1, q.Size()) + + assert.NoError(t, q.enqueue(3, 3, nil)) + assert.Equal(t, 4, q.Size()) + + // should not be able to send to the full queue + assert.Error(t, q.enqueue(4, 4, nil)) + assert.Equal(t, 4, q.Size()) + + el, ok := q.dequeue(func(el int) uint64 { return uint64(el) }) + assert.Equal(t, 1, el) + assert.True(t, ok) + assert.Equal(t, 3, q.Size()) + + el, ok = q.dequeue(func(el int) uint64 { return uint64(el) }) + assert.Equal(t, 3, el) + assert.True(t, ok) + assert.Equal(t, 0, q.Size()) + + q.shutdown() + el, ok = q.dequeue(func(el int) uint64 { return uint64(el) }) + assert.False(t, ok) + assert.Equal(t, 0, el) +}