Skip to content

Commit

Permalink
[exporterhelper] Introduce batching functionality (#8685)
Browse files Browse the repository at this point in the history
This change introduces new experimental batching functionality to the
exporter helper. The batch sender is fully concurrent and synchronous.
It's set after the queue sender, which, if enabled, introduces the
asynchronous behavior and ensures no data loss with the permanent queue.

Follow-up TODO list:
- Add pre-built merge funcs for pdata
- Handle partial errors
- A missing part compared to the batch processor is the ability to shard
the batches by context value.

Updates
#8122
  • Loading branch information
dmitryax authored Mar 8, 2024
1 parent 2fe7ed8 commit 2413346
Show file tree
Hide file tree
Showing 9 changed files with 930 additions and 11 deletions.
25 changes: 25 additions & 0 deletions .chloggen/batch-exporter-helper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add experimental batching capabilities to the exporter helper

# One or more tracking issues or pull requests related to the change
issues: [8122]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
24 changes: 24 additions & 0 deletions exporter/exporterbatcher/batch_func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"

import "context"

// BatchMergeFunc is a function that merges two requests into a single request.
// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is
// marked as not mutable.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeFunc[T any] func(context.Context, T, T) (T, error)

// BatchMergeSplitFunc is a function that merge and/or splits one or two requests into multiple requests based on the
// configured limit provided in MaxSizeConfig.
// All the returned requests MUST have a number of items that does not exceed the maximum number of items.
// Size of the last returned request MUST be less or equal than the size of any other returned request.
// The original request MUST not be mutated if error is returned after mutation or if the exporter is
// marked as not mutable. The length of the returned slice MUST not be 0. The optionalReq argument can be nil,
// make sure to check it before using.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeSplitFunc[T any] func(ctx context.Context, cfg MaxSizeConfig, optionalReq T, req T) ([]T, error)
70 changes: 70 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"

import (
"errors"
"time"
)

// Config defines a configuration for batching requests based on a timeout and a minimum number of items.
// MaxSizeItems defines batch splitting functionality if it's more than zero.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type Config struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`

// FlushTimeout sets the time after which a batch will be sent regardless of its size.
FlushTimeout time.Duration `mapstructure:"flush_timeout"`

MinSizeConfig `mapstructure:",squash"`
MaxSizeConfig `mapstructure:",squash"`
}

// MinSizeConfig defines the configuration for the minimum number of items in a batch.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type MinSizeConfig struct {
// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
}

// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type MaxSizeConfig struct {
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP.
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
}

func (c Config) Validate() error {
if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
}
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")
}
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")
}
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
return nil
}

func NewDefaultConfig() Config {
return Config{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
MinSizeConfig: MinSizeConfig{
MinSizeItems: 8192,
},
}
}
30 changes: 30 additions & 0 deletions exporter/exporterbatcher/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfig_Validate(t *testing.T) {
cfg := NewDefaultConfig()
assert.NoError(t, cfg.Validate())

cfg.MinSizeItems = -1
assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero")

cfg = NewDefaultConfig()
cfg.FlushTimeout = 0
assert.EqualError(t, cfg.Validate(), "timeout must be greater than zero")

cfg.MaxSizeItems = -1
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero")

cfg = NewDefaultConfig()
cfg.MaxSizeItems = 20000
cfg.MinSizeItems = 20001
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")
}
218 changes: 218 additions & 0 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import (
"context"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
)

// batchSender is a component that places requests into batches before passing them to the downstream senders.
// Batches are sent out with any of the following conditions:
// - batch size reaches cfg.SendBatchSize
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
// - concurrencyLimit is reached.
type batchSender struct {
baseRequestSender
cfg exporterbatcher.Config
mergeFunc exporterbatcher.BatchMergeFunc[Request]
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[Request]

// concurrencyLimit is the maximum number of goroutines that can be created by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
// Populated from the number of queue consumers if queue is enabled.
concurrencyLimit uint64
activeRequests atomic.Uint64

resetTimerCh chan struct{}

mu sync.Mutex
activeBatch *batch

logger *zap.Logger

shutdownCh chan struct{}
stopped *atomic.Bool
}

// newBatchSender returns a new batch consumer component.
func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings) *batchSender {
bs := &batchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
shutdownCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
}
return bs
}

func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
timer := time.NewTimer(bs.cfg.FlushTimeout)
go func() {
for {
select {
case <-bs.shutdownCh:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
timer.Reset(bs.cfg.FlushTimeout)
case <-bs.resetTimerCh:
if !timer.Stop() {
<-timer.C
}
timer.Reset(bs.cfg.FlushTimeout)
}
}
}()

return nil
}

type batch struct {
ctx context.Context
request Request
done chan struct{}
err error
}

func newEmptyBatch() *batch {
return &batch{
ctx: context.Background(),
done: make(chan struct{}),
}
}

// exportActiveBatch exports the active batch asynchronously and replaces it with a new one.
// Caller must hold the lock.
func (bs *batchSender) exportActiveBatch() {
go func(b *batch) {
b.err = b.request.Export(b.ctx)
close(b.done)
}(bs.activeBatch)
bs.activeBatch = newEmptyBatch()
}

// isActiveBatchReady returns true if the active batch is ready to be exported.
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
// Caller must hold the lock.
func (bs *batchSender) isActiveBatchReady() bool {
return bs.activeBatch.request.ItemsCount() >= bs.cfg.MinSizeItems ||
(bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit)
}

func (bs *batchSender) send(ctx context.Context, req Request) error {
// Stopped batch sender should act as pass-through to allow the queue to be drained.
if bs.stopped.Load() {
return bs.nextSender.send(ctx, req)
}

bs.activeRequests.Add(1)
defer bs.activeRequests.Add(^uint64(0))

if bs.cfg.MaxSizeItems > 0 {
return bs.sendMergeSplitBatch(ctx, req)
}
return bs.sendMergeBatch(ctx, req)
}

// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
bs.mu.Lock()

reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
if err != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
}
if len(reqs) == 1 || bs.activeBatch.request != nil {
bs.updateActiveBatch(ctx, reqs[0])
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
}
bs.mu.Unlock()
<-batch.done
if batch.err != nil {
return batch.err
}
reqs = reqs[1:]
} else {
bs.mu.Unlock()
}

// Intentionally do not put the last request in the active batch to not block it.
// TODO: Consider including the partial request in the error to avoid double publishing.
for _, r := range reqs {
if err := r.Export(ctx); err != nil {
return err
}
}
return nil
}

// sendMergeBatch sends the request to the batch and waits for the batch to be exported.
func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
bs.mu.Lock()
if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
if err != nil {
bs.mu.Unlock()
return err
}
}
bs.updateActiveBatch(ctx, req)
batch := bs.activeBatch
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
}
bs.mu.Unlock()
<-batch.done
return batch.err
}

// updateActiveBatch update the active batch to the new merged request and context.
// The context is only set once and is not updated after the first call.
// Merging the context would be complex and require an additional goroutine to handle the context cancellation.
// We take the approach of using the context from the first request since it's likely to have the shortest timeout.
func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
if bs.activeBatch.request == nil {
bs.activeBatch.ctx = ctx
}
bs.activeBatch.request = req
}

func (bs *batchSender) Shutdown(context.Context) error {
bs.stopped.Store(true)
close(bs.shutdownCh)
// Wait for the active requests to finish.
for bs.activeRequests.Load() > 0 {
time.Sleep(10 * time.Millisecond)
}
return nil
}
Loading

0 comments on commit 2413346

Please sign in to comment.