Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the BatchingProcessor without polling #5107

Closed
wants to merge 16 commits into from
Closed
129 changes: 122 additions & 7 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"
"errors"
"slices"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -31,6 +35,18 @@ type BatchingProcessor struct {
exportInterval time.Duration
exportTimeout time.Duration
exportMaxBatchSize int

// batch is the active batch of records that have not yet been exported.
batch *batch

// ctx is the parent context for the BatchingProcessor asynchronous
// operations. When this context is canceled exports are canceled and
// polling is stopped.
ctx context.Context
cancel context.CancelFunc

// stopped holds the stopped state of the BatchingProcessor.
stopped atomic.Bool
}

// NewBatchingProcessor decorates the provided exporter
Expand All @@ -43,39 +59,138 @@ func NewBatchingProcessor(exporter Exporter, opts ...BatchingOption) *BatchingPr
exporter = defaultNoopExporter
}
cfg := newBatchingConfig(opts)

// TODO: Add polling to the BatchingProcessor.

ctx, cancel := context.WithCancel(context.Background())
return &BatchingProcessor{
exporter: exporter,

maxQueueSize: cfg.maxQSize.Value,
exportInterval: cfg.expInterval.Value,
exportTimeout: cfg.expTimeout.Value,
exportMaxBatchSize: cfg.expMaxBatchSize.Value,

batch: newBatch(cfg.maxQSize.Value),
ctx: ctx,
cancel: cancel,
}
}

// enqueueFunc is used for testing until #5105 is merged and integrated.
var enqueueFunc = func(_ context.Context, _ []Record, ch chan error) {
if ch != nil {
ch <- nil
}
}

// enqueue attempts to enqueue an export. If the exportCh is full, the export
// will be dropped and an error logged.
func (b *BatchingProcessor) enqueue(ctx context.Context, r []Record, rCh chan error) {
// TODO (#5105): Implement this. Until this is implemented call enqueueFunc
// so the rest of the BatchingProcessor can be tested.
enqueueFunc(ctx, r, rCh)
}

// OnEmit batches provided log record.
func (b *BatchingProcessor) OnEmit(ctx context.Context, r Record) error {
// TODO (#5063): Implement.
func (b *BatchingProcessor) OnEmit(_ context.Context, r Record) error {
if b.stopped.Load() {
return nil
}
if flushed := b.batch.Append(r); flushed != nil {
b.enqueue(b.ctx, flushed, nil)
}
return nil
}

// Enabled returns true.
// Enabled returns if b is enabled.
func (b *BatchingProcessor) Enabled(context.Context, Record) bool {
return true
return !b.stopped.Load()
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchingProcessor) Shutdown(ctx context.Context) error {
// TODO (#5063): Implement.
return nil
if b.stopped.Swap(true) {
return nil
}

resp := make(chan error, 1)
b.enqueue(ctx, b.batch.Flush(), resp)

// Cancel all exports and polling.
b.cancel()
Copy link
Member

@pellared pellared Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we should cancel pending exports. I think we should still give a chance to have them completed until export timeout is not reached. Cannot this lead to missing log records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cancels the context for the polling. This is not the context used in the last flush.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Cancel all exports and polling.
b.cancel()
// Cancel all subsequent exports and polling.
b.cancel()


// Wait for response before closing exporter.
var err error
select {
case err = <-resp:
close(resp)
case <-ctx.Done():
// Out of time. Ignore flush response.
return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx))
}
return errors.Join(err, b.exporter.Shutdown(ctx))
}

// ForceFlush flushes queued log records and flushes the decorated exporter.
func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
// TODO (#5063): Implement.
if b.stopped.Load() {
return nil
}
resp := make(chan error, 1)
b.enqueue(ctx, b.batch.Flush(), resp)

var err error
select {
case err = <-resp:
close(resp)
case <-ctx.Done():
return ctx.Err()
}
return errors.Join(err, b.exporter.ForceFlush(ctx))
}

// batch holds a batch of logging records.
type batch struct {
sync.Mutex

data []Record
}

func newBatch(n int) *batch {
return &batch{data: make([]Record, 0, n)}
}

// Append adds r to the batch. If adding r fills the batch, the batch is
// flushed and its contents returned.
func (b *batch) Append(r Record) []Record {
b.Lock()
defer b.Unlock()

b.data = append(b.data, r)
if len(b.data) == cap(b.data) {
return b.flush()
}
return nil
}

// Flush returns and clears the contents of the batch.
func (b *batch) Flush() []Record {
b.Lock()
defer b.Unlock()

return b.flush()
}

// flush returns and clears the contents of the batch.
//
// This assumes b.Lock is held.
func (b *batch) flush() []Record {
clone := slices.Clone(b.data)
b.data = b.data[:0]
return clone
}

type batchingConfig struct {
maxQSize setting[int]
expInterval setting[time.Duration]
Expand Down
Loading