Skip to content

Commit

Permalink
[chore] [exporterhelper] Integrate capacity limiting into a helper queue
Browse files Browse the repository at this point in the history
Integrate capacity limiting into internal channels used by both memory and persistent queues. Otherwise, with the independent capacity limiter, it's hard to ensure that queue size is always accurate going forward.
  • Loading branch information
dmitryax committed Feb 23, 2024
1 parent 31ccef9 commit 308ee38
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 263 deletions.
21 changes: 8 additions & 13 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
*queueCapacityLimiter[T]
items chan queueRequest[T]
*sizedElementsChannel[memQueueEl[T]]
sizer Sizer[T]
}

// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
Expand All @@ -30,41 +30,36 @@ type MemoryQueueSettings[T any] struct {
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity),
items: make(chan queueRequest[T], set.Capacity),
sizedElementsChannel: newSizedElementsChannel[memQueueEl[T]](set.Capacity),
sizer: set.Sizer,
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
if !q.queueCapacityLimiter.claim(req) {
return ErrQueueIsFull
}
q.items <- queueRequest[T]{ctx: ctx, req: req}
return nil
return q.sizedElementsChannel.enqueue(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil)
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := <-q.items
item, ok := q.sizedElementsChannel.dequeue(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
if !ok {
return false
}
q.queueCapacityLimiter.release(item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
close(q.items)
q.sizedElementsChannel.shutdown()
return nil
}

type queueRequest[T any] struct {
type memQueueEl[T any] struct {
req T
ctx context.Context
}
8 changes: 8 additions & 0 deletions exporter/internal/queue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,11 @@ func TestZeroSizeNoConsumers(t *testing.T) {

assert.NoError(t, q.Shutdown(context.Background()))
}

type fakeReq struct {
itemsCount int
}

func (r fakeReq) ItemsCount() int {
return r.itemsCount
}
203 changes: 94 additions & 109 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"

"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -44,7 +43,10 @@ import (
// index index x
// xxxx deleted
type persistentQueue[T any] struct {
*queueCapacityLimiter[T]
// sizedElementsChannel is used by the persistent queue for two purposes:
// 1. a communication channel notifying the consumer that a new item is available.
// 2. capacity control based on the size of the items.
*sizedElementsChannel[permanentQueueEl]

set PersistentQueueSettings[T]
logger *zap.Logger
Expand All @@ -53,14 +55,10 @@ type persistentQueue[T any] struct {
// isRequestSized indicates whether the queue is sized by the number of requests.
isRequestSized bool

putChan chan struct{}

// mu guards everything declared below.
mu sync.Mutex
readIndex uint64
writeIndex uint64
initIndexSize uint64
initQueueSize *atomic.Uint64
currentlyDispatchedItems []uint64
refClient int64
stopped bool
Expand Down Expand Up @@ -98,12 +96,9 @@ type PersistentQueueSettings[T any] struct {
func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T] {
_, isRequestSized := set.Sizer.(*RequestSizer[T])
return &persistentQueue[T]{
queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity),
set: set,
logger: set.ExporterSettings.Logger,
initQueueSize: &atomic.Uint64{},
isRequestSized: isRequestSized,
putChan: make(chan struct{}, set.Capacity),
set: set,
logger: set.ExporterSettings.Logger,
isRequestSized: isRequestSized,
}
}

Expand Down Expand Up @@ -148,86 +143,100 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
pq.readIndex = 0
pq.writeIndex = 0
}
pq.initIndexSize = pq.writeIndex - pq.readIndex

// Ensure the communication channel has the same size as the queue
for i := 0; i < int(pq.initIndexSize); i++ {
pq.putChan <- struct{}{}
}
initIndexSize := pq.writeIndex - pq.readIndex

// Read snapshot of the queue size from storage. It's not a problem if the value cannot be fetched,
// or it's not accurate. The queue size will be corrected once the recovered queue is drained.
if pq.initIndexSize > 0 {
var secOpts []sizedElementsChannelOption[permanentQueueEl]

// Pre-allocate the communication channel with the size of the restored queue.
if initIndexSize > 0 {
initQueueSize := initIndexSize
// If the queue is sized by the number of requests, no need to read the queue size from storage.
if pq.isRequestSized {
pq.initQueueSize.Store(pq.initIndexSize)
return
if !pq.isRequestSized {
if restoredQueueSize, err := pq.restoreQueueSizeFromStorage(ctx); err == nil {
initQueueSize = restoredQueueSize
}
}

res, err := pq.client.Get(ctx, queueSizeKey)
if err == nil {
var restoredQueueSize uint64
restoredQueueSize, err = bytesToItemIndex(res)
pq.initQueueSize.Store(restoredQueueSize)
}
if err != nil {
if errors.Is(err, errValueNotSet) {
pq.logger.Warn("Cannot read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained. "+
"It's expected when the items sized queue enabled for the first time", zap.Error(err))
} else {
pq.logger.Error("Failed to read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err))
}
// Ensure the communication channel filled with evenly sized elements up to the total restored queue size.
secOpts = append(secOpts, withPreloadElements[permanentQueueEl](make([]permanentQueueEl, initIndexSize), int64(initQueueSize)))

}
pq.sizedElementsChannel = newSizedElementsChannel[permanentQueueEl](pq.set.Capacity, secOpts...)
}

// permanentQueueEl is the type of the elements passed to the sizedElementsChannel by the persistentQueue.
type permanentQueueEl struct{}

// restoreQueueSizeFromStorage restores the queue size from storage.
func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (uint64, error) {
var queueSize uint64
val, err := pq.client.Get(ctx, queueSizeKey)
if err == nil {
queueSize, err = bytesToItemIndex(val)
}
if err != nil {
if errors.Is(err, errValueNotSet) {
pq.logger.Warn("Cannot read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained. "+
"It's expected when the items sized queue enabled for the first time", zap.Error(err))
} else {
pq.logger.Error("Failed to read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err))

Check warning on line 185 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L184-L185

Added lines #L184 - L185 were not covered by tests
}
return 0, err
}
return queueSize, nil
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
for {
var (
req T
onProcessingFinished func(error)
consumed bool
)

// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := <-pq.putChan
_, ok := pq.sizedElementsChannel.dequeue(func(permanentQueueEl) int64 {
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
if !consumed {
return 0
}
return pq.set.Sizer.Sizeof(req)
})
if !ok {
return false
}

req, onProcessingFinished, consumed := pq.getNextItem(context.Background())
if consumed {
onProcessingFinished(consumeFunc(context.Background(), req))
return true
}
}
}

// Size returns the current size of the queue.
func (pq *persistentQueue[T]) Size() int {
return int(pq.initQueueSize.Load()) + pq.queueCapacityLimiter.Size()
}

func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
close(pq.putChan)
// If the queue is not initialized, there is nothing to shut down.
if pq.client == nil {
return nil
}

pq.mu.Lock()
defer pq.mu.Unlock()
backupErr := pq.backupQueueSize(ctx)
pq.sizedElementsChannel.shutdown()
// Mark this queue as stopped, so consumer don't start any more work.
pq.stopped = true
return multierr.Combine(
pq.backupQueueSize(ctx),
pq.unrefClient(ctx),
)
return multierr.Combine(backupErr, pq.unrefClient(ctx))
}

// backupQueueSize writes the current queue size to storage. The value is used to recover the queue size
// in case if the collector is killed.
func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error {
// Client can be nil if the queue is not initialized yet.
if pq.client == nil {
return nil
}

// No need to write the queue size if the queue is sized by the number of requests.
// That information is already stored as difference between read and write indexes.
if pq.isRequestSized {
Expand Down Expand Up @@ -258,34 +267,31 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {

// putInternal is the internal version that requires caller to hold the mutex lock.
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
if !pq.queueCapacityLimiter.claim(req) {
pq.logger.Warn("Maximum queue capacity reached")
return ErrQueueIsFull
}
err := pq.sizedElementsChannel.enqueue(permanentQueueEl{}, pq.set.Sizer.Sizeof(req), func() error {
itemKey := getItemKey(pq.writeIndex)
newIndex := pq.writeIndex + 1

reqBuf, err := pq.set.Marshaler(req)
if err != nil {
return err
}

Check warning on line 277 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L276-L277

Added lines #L276 - L277 were not covered by tests

itemKey := getItemKey(pq.writeIndex)
newIndex := pq.writeIndex + 1
// Carry out a transaction where we both add the item and update the write index
ops := []storage.Operation{
storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)),
storage.SetOperation(itemKey, reqBuf),
}
if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil {
return storageErr
}

reqBuf, err := pq.set.Marshaler(req)
pq.writeIndex = newIndex
return nil
})
if err != nil {
pq.queueCapacityLimiter.release(req)
return err
}

// Carry out a transaction where we both add the item and update the write index
ops := []storage.Operation{
storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)),
storage.SetOperation(itemKey, reqBuf),
}
if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil {
pq.queueCapacityLimiter.release(req)
return storageErr
}

pq.writeIndex = newIndex
// Inform the loop that there's some data to process
pq.putChan <- struct{}{}

// Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 5 {
Expand Down Expand Up @@ -337,16 +343,6 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
return request, nil, false
}

pq.releaseCapacity(request)

// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(ctx); qsErr != nil {
pq.logger.Error("Error writing queue size to storage", zap.Error(err))
}
}

// Increase the reference count, so the client is not closed while the request is being processed.
// The client cannot be closed because we hold the lock since last we checked `stopped`.
pq.refClient++
Expand All @@ -371,29 +367,18 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

}, true
}

// releaseCapacity releases the capacity of the queue. The caller must hold the mutex.
func (pq *persistentQueue[T]) releaseCapacity(req T) {
// If the recovered queue size is not emptied yet, decrease it first.
if pq.initIndexSize > 0 {
pq.initIndexSize--
if pq.initIndexSize == 0 {
pq.initQueueSize.Store(0)
return
}
reqSize := pq.queueCapacityLimiter.sizeOf(req)
if pq.initQueueSize.Load() < reqSize {
pq.initQueueSize.Store(0)
return
// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(ctx); qsErr != nil {
pq.logger.Error("Error writing queue size to storage", zap.Error(err))
}

Check warning on line 375 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L374-L375

Added lines #L374 - L375 were not covered by tests
}
pq.initQueueSize.Add(^(reqSize - 1))
return
}

// Otherwise, decrease the current queue size.
pq.queueCapacityLimiter.release(req)
// Ensure the used size and the channel size are in sync.
pq.sizedElementsChannel.syncSize()

}, true
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
Expand Down
Loading

0 comments on commit 308ee38

Please sign in to comment.