Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the BatchingProcessor without polling #5107

Closed
wants to merge 16 commits into from
Closed
215 changes: 195 additions & 20 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"container/ring"
"context"
"errors"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -25,55 +29,226 @@ var _ Processor = (*BatchingProcessor)(nil)

// BatchingProcessor is a processor that exports batches of log records.
type BatchingProcessor struct {
exporter Exporter
// exporter is the bufferedExporter all batches are exported with.
exporter *bufferedExporter

maxQueueSize int
exportInterval time.Duration
exportTimeout time.Duration
exportMaxBatchSize int
// q is the active queue of records that have not yet been exported.
q *queue
// batchSize is the minimum number of Records needed before an export is
// triggered (unless the interval expires).
batchSize int

// pollTrigger triggers the poll goroutine to flush a batch from the queue.
// This is sent to when it is known that the queue contains at least one
// complete batch.
//
// When a send is made to the channel, the poll loop will be reset after
// the flush. If there is still enough Records in the queue for another
// batch the reset of the poll loop will automatically re-trigger itself.
// There is no need for the original sender to monitor and resend.
pollTrigger chan struct{}
// pollKill kills the poll goroutine. This is only expected to be closed
// once by the Shutdown method.
pollKill chan struct{}
// pollDone signals the poll goroutine has completed.
pollDone chan struct{}

// stopped holds the stopped state of the BatchingProcessor.
stopped atomic.Bool
}

// NewBatchingProcessor decorates the provided exporter
// so that the log records are batched before exporting.
//
// All of the exporter's methods are called synchronously.
func NewBatchingProcessor(exporter Exporter, opts ...BatchingOption) *BatchingProcessor {
cfg := newBatchingConfig(opts)

if exporter == nil {
// Do not panic on nil export.
exporter = defaultNoopExporter
}
cfg := newBatchingConfig(opts)
return &BatchingProcessor{
exporter: exporter,
// Order is important here. Wrap the timeoutExporter with the chuncker to
// ensure each export completes in timeout (instead of all chuncked
// exports).
exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value)
// Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched
// appropriately on export.
exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value)

b := &BatchingProcessor{
// TODO: explore making the size of this configurable.
exporter: newBufferedExporter(exporter, 1),

maxQueueSize: cfg.maxQSize.Value,
exportInterval: cfg.expInterval.Value,
exportTimeout: cfg.expTimeout.Value,
exportMaxBatchSize: cfg.expMaxBatchSize.Value,
q: newQueue(cfg.maxQSize.Value),
batchSize: cfg.expMaxBatchSize.Value,
pollTrigger: make(chan struct{}, 1),
pollKill: make(chan struct{}),
}
b.pollDone = b.poll(cfg.expInterval.Value)
return b
}

// poll spawns a goroutine to handle interval polling and batch exporting. The
// returned done chan is closed when the spawned goroutine completes.
func (b *BatchingProcessor) poll(interval time.Duration) (done chan struct{}) {
done = make(chan struct{})
go func() {
defer close(done)

for {
// TODO: add interval polling.
select {
case <-b.pollTrigger:
case <-b.pollKill:
return
}

// TODO: sync.Pool to hold these.
buf := make([]Record, b.batchSize)
qLen := b.q.TryFlush(buf, func(r []Record) bool {
return b.exporter.EnqueueExport(context.Background(), r)
})
if qLen >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:
// Another flush signal already received.
}
}
}
}()
return done
}

// OnEmit batches provided log record.
func (b *BatchingProcessor) OnEmit(ctx context.Context, r Record) error {
// TODO (#5063): Implement.
func (b *BatchingProcessor) OnEmit(_ context.Context, r Record) error {
if b.stopped.Load() {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:
// Flush chan full. The poll goroutine will handle this by
// re-sending any trigger until the queue has less than batchSize
// records.
}
}
return nil
}

// Enabled returns true.
// Enabled returns if b is enabled.
func (b *BatchingProcessor) Enabled(context.Context, Record) bool {
return true
return !b.stopped.Load()
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchingProcessor) Shutdown(ctx context.Context) error {
// TODO (#5063): Implement.
return nil
if b.stopped.Swap(true) {
return nil
}

// Stop the poll goroutine.
close(b.pollKill)
select {
case <-b.pollDone:
case <-ctx.Done():
// Out of time. Do not close b.exportCh, it is not certain if the poll
// goroutine will try to send to it still.
return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx))
}

// Flush remaining queued before exporter shutdown.
//
// Given the poll goroutine has stopped we know no more data will be
// queued. This ensures concurrent calls to ForceFlush do not panic because
// they are flusing to a shut down exporter.
err := b.exporter.Export(ctx, b.q.Flush())
return errors.Join(err, b.exporter.Shutdown(ctx))
}

// ForceFlush flushes queued log records and flushes the decorated exporter.
func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
// TODO (#5063): Implement.
return nil
if b.stopped.Load() {
return nil
}
err := b.exporter.Export(ctx, b.q.Flush())
return errors.Join(err, b.exporter.ForceFlush(ctx))
}

// queue holds a queue of logging records.
type queue struct {
sync.Mutex

cap, len int
read, write *ring.Ring
}

func newQueue(size int) *queue {
r := ring.New(size)
return &queue{
cap: size,
read: r,
write: r,
}
}

// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
func (q *queue) Enqueue(r Record) int {
q.Lock()
defer q.Unlock()

q.write.Value = r
q.write = q.write.Next()

q.len++
if q.len > q.cap {
// Overflow. Advance read to be the new "oldest".
q.len = q.cap
q.read = q.read.Next()
}
return q.len
}

// TryFlush attempts to flush up to len(buf) Records. The available Records
// will be assigned into buf and passed to flush. If flush fails, returning
// false, the Records will not be removed from the queue. If flush succeeds,
// returning true, the flushed Records are removed from the queue. The number
// of Records remaining in the queue are returned.
func (q *queue) TryFlush(buf []Record, flush func([]Record) bool) int {
q.Lock()
defer q.Unlock()

origRead := q.read

n := min(len(buf), q.len)
for i := 0; i < n; i++ {
buf[i] = q.read.Value.(Record)
q.read = q.read.Next()
}

if flush(buf[:n]) {
q.len -= n
} else {
q.read = origRead
}
return q.len
}

func (q *queue) Flush() []Record {
q.Lock()
defer q.Unlock()

out := make([]Record, q.len)
for i := range out {
out[i] = q.read.Value.(Record)
q.read = q.read.Next()
}
q.len = 0

return out
}

type batchingConfig struct {
Expand Down
Loading
Loading