Skip to content

Commit

Permalink
[exporterhelper] Introduce batching functionality
Browse files Browse the repository at this point in the history
This change introduces new experimental batching functionality to the exporter helper
  • Loading branch information
dmitryax committed Mar 7, 2024
1 parent f471413 commit c9e1c14
Show file tree
Hide file tree
Showing 9 changed files with 929 additions and 11 deletions.
25 changes: 25 additions & 0 deletions .chloggen/batch-exporter-helper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add experimental batching capabilities to the exporter helper

# One or more tracking issues or pull requests related to the change
issues: [8122]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
24 changes: 24 additions & 0 deletions exporter/exporterbatcher/batch_func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"

import "context"

// BatchMergeFunc is a function that merges two requests into a single request.
// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is
// marked as not mutable.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeFunc[T any] func(context.Context, T, T) (T, error)

// BatchMergeSplitFunc is a function that merge and/or splits one or two requests into multiple requests based on the
// configured limit provided in MaxSizeConfig.
// All the returned requests MUST have a number of items that does not exceed the maximum number of items.
// Size of the last returned request MUST be less or equal than the size of any other returned request.
// The original request MUST not be mutated if error is returned after mutation or if the exporter is
// marked as not mutable. The length of the returned slice MUST not be 0. The optionalReq argument can be nil,
// make sure to check it before using.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeSplitFunc[T any] func(ctx context.Context, cfg MaxSizeConfig, optionalReq T, req T) ([]T, error)
70 changes: 70 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"

import (
"errors"
"time"
)

// Config defines a configuration for batching requests based on a timeout and a minimum number of items.
// MaxSizeItems defines batch splitting functionality if it's more than zero.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type Config struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`

// FlushTimeout sets the time after which a batch will be sent regardless of its size.
FlushTimeout time.Duration `mapstructure:"flush_timeout"`

MinSizeConfig `mapstructure:",squash"`
MaxSizeConfig `mapstructure:",squash"`
}

// MinSizeConfig defines the configuration for the minimum number of items in a batch.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type MinSizeConfig struct {
// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
}

// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type MaxSizeConfig struct {
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP.
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
}

func (c Config) Validate() error {
if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
}
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")
}
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")
}
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
return nil
}

func NewDefaultConfig() Config {
return Config{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
MinSizeConfig: MinSizeConfig{
MinSizeItems: 8192,
},
}
}
30 changes: 30 additions & 0 deletions exporter/exporterbatcher/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfig_Validate(t *testing.T) {
cfg := NewDefaultConfig()
assert.NoError(t, cfg.Validate())

cfg.MinSizeItems = -1
assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero")

cfg = NewDefaultConfig()
cfg.FlushTimeout = 0
assert.EqualError(t, cfg.Validate(), "timeout must be greater than zero")

cfg.MaxSizeItems = -1
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero")

cfg = NewDefaultConfig()
cfg.MaxSizeItems = 20000
cfg.MinSizeItems = 20001
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")
}
218 changes: 218 additions & 0 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import (
"context"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
)

// batchSender is a component that places requests into batches before passing them to the downstream senders.
// Batches are sent out with any of the following conditions:
// - batch size reaches cfg.SendBatchSize
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
// - concurrencyLimit is reached.
type batchSender struct {
baseRequestSender
cfg exporterbatcher.Config
mergeFunc exporterbatcher.BatchMergeFunc[Request]
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[Request]

// concurrencyLimit is the maximum number of goroutines that can be created by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
// Populated from the number of queue consumers if queue is enabled.
concurrencyLimit uint64
activeRequests atomic.Uint64

resetTimerCh chan struct{}

mu sync.Mutex
activeBatch *batch

logger *zap.Logger

shutdownCh chan struct{}
stopped *atomic.Bool
}

// newBatchSender returns a new batch consumer component.
func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings) *batchSender {
bs := &batchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
shutdownCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
}
return bs
}

func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
timer := time.NewTimer(bs.cfg.FlushTimeout)
go func() {
for {
select {
case <-bs.shutdownCh:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
if !timer.Stop() {
<-timer.C
}

Check warning on line 73 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L72-L73

Added lines #L72 - L73 were not covered by tests
return
case <-timer.C:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
timer.Reset(bs.cfg.FlushTimeout)
case <-bs.resetTimerCh:
if !timer.Stop() {
<-timer.C
}

Check warning on line 85 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L84-L85

Added lines #L84 - L85 were not covered by tests
timer.Reset(bs.cfg.FlushTimeout)
}
}
}()

return nil
}

type batch struct {
ctx context.Context
request Request
done chan struct{}
err error
}

func newEmptyBatch() *batch {
return &batch{
ctx: context.Background(),
done: make(chan struct{}),
}
}

// exportActiveBatch exports the active batch asynchronously and replaces it with a new one.
// Caller must hold the lock.
func (bs *batchSender) exportActiveBatch() {
go func(b *batch) {
b.err = b.request.Export(b.ctx)
close(b.done)
}(bs.activeBatch)
bs.activeBatch = newEmptyBatch()
}

// isActiveBatchReady returns true if the active batch is ready to be exported.
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
// Caller must hold the lock.
func (bs *batchSender) isActiveBatchReady() bool {
return bs.activeBatch.request.ItemsCount() >= bs.cfg.MinSizeItems ||
(bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit)
}

func (bs *batchSender) send(ctx context.Context, req Request) error {
// Stopped batch sender should act as pass-through to allow the queue to be drained.
if bs.stopped.Load() {
return bs.nextSender.send(ctx, req)
}

bs.activeRequests.Add(1)
defer bs.activeRequests.Add(^uint64(0))

if bs.cfg.MaxSizeItems > 0 {
return bs.sendMergeSplitBatch(ctx, req)
}
return bs.sendMergeBatch(ctx, req)
}

// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
bs.mu.Lock()

reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
if err != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
}
if len(reqs) == 1 || bs.activeBatch.request != nil {
bs.updateActiveBatch(ctx, reqs[0])
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
}
bs.mu.Unlock()
<-batch.done
if batch.err != nil {
return batch.err
}
reqs = reqs[1:]
} else {
bs.mu.Unlock()
}

// Intentionally do not put the last request in the active batch to not block it.
// TODO: Consider including the partial request in the error to avoid double publishing.
for _, r := range reqs {
if err := r.Export(ctx); err != nil {
return err
}
}
return nil
}

// sendMergeBatch sends the request to the batch and waits for the batch to be exported.
func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
bs.mu.Lock()
if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
if err != nil {
bs.mu.Unlock()
return err
}
}
bs.updateActiveBatch(ctx, req)
batch := bs.activeBatch
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.resetTimerCh <- struct{}{}
}
bs.mu.Unlock()
<-batch.done
return batch.err
}

// updateActiveBatch update the active batch to the new merged request and context.
// The context is only set once and is not updated after the first call.
// Merging the context would be complex and require an additional goroutine to handle the context cancellation.
// We take the approach of using the context from the first request since it's likely to have the shortest timeout.
func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
if bs.activeBatch.request == nil {
bs.activeBatch.ctx = ctx
}
bs.activeBatch.request = req
}

func (bs *batchSender) Shutdown(context.Context) error {
bs.stopped.Store(true)
close(bs.shutdownCh)
// Wait for the active requests to finish.
for bs.activeRequests.Load() > 0 {
time.Sleep(10 * time.Millisecond)
}
return nil
}
Loading

0 comments on commit c9e1c14

Please sign in to comment.