From 241334609fc47927b4a8533dfca28e0f65dad9fe Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Thu, 7 Mar 2024 16:42:01 -0800 Subject: [PATCH] [exporterhelper] Introduce batching functionality (#8685) This change introduces new experimental batching functionality to the exporter helper. The batch sender is fully concurrent and synchronous. It's set after the queue sender, which, if enabled, introduces the asynchronous behavior and ensures no data loss with the permanent queue. Follow-up TODO list: - Add pre-built merge funcs for pdata - Handle partial errors - A missing part compared to the batch processor is the ability to shard the batches by context value. Updates https://github.com/open-telemetry/opentelemetry-collector/issues/8122 --- .chloggen/batch-exporter-helper.yaml | 25 ++ exporter/exporterbatcher/batch_func.go | 24 ++ exporter/exporterbatcher/config.go | 70 ++++ exporter/exporterbatcher/config_test.go | 30 ++ exporter/exporterhelper/batch_sender.go | 218 ++++++++++ exporter/exporterhelper/batch_sender_test.go | 407 +++++++++++++++++++ exporter/exporterhelper/common.go | 53 ++- exporter/exporterhelper/queue_sender.go | 6 +- exporter/exporterhelper/request_test.go | 108 ++++- 9 files changed, 930 insertions(+), 11 deletions(-) create mode 100755 .chloggen/batch-exporter-helper.yaml create mode 100644 exporter/exporterbatcher/batch_func.go create mode 100644 exporter/exporterbatcher/config.go create mode 100644 exporter/exporterbatcher/config_test.go create mode 100644 exporter/exporterhelper/batch_sender.go create mode 100644 exporter/exporterhelper/batch_sender_test.go diff --git a/.chloggen/batch-exporter-helper.yaml b/.chloggen/batch-exporter-helper.yaml new file mode 100755 index 00000000000..613cfbac5b2 --- /dev/null +++ b/.chloggen/batch-exporter-helper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add experimental batching capabilities to the exporter helper + +# One or more tracking issues or pull requests related to the change +issues: [8122] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file diff --git a/exporter/exporterbatcher/batch_func.go b/exporter/exporterbatcher/batch_func.go new file mode 100644 index 00000000000..0298276ba7b --- /dev/null +++ b/exporter/exporterbatcher/batch_func.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" + +import "context" + +// BatchMergeFunc is a function that merges two requests into a single request. +// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is +// marked as not mutable. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeFunc[T any] func(context.Context, T, T) (T, error) + +// BatchMergeSplitFunc is a function that merge and/or splits one or two requests into multiple requests based on the +// configured limit provided in MaxSizeConfig. +// All the returned requests MUST have a number of items that does not exceed the maximum number of items. +// Size of the last returned request MUST be less or equal than the size of any other returned request. +// The original request MUST not be mutated if error is returned after mutation or if the exporter is +// marked as not mutable. The length of the returned slice MUST not be 0. The optionalReq argument can be nil, +// make sure to check it before using. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeSplitFunc[T any] func(ctx context.Context, cfg MaxSizeConfig, optionalReq T, req T) ([]T, error) diff --git a/exporter/exporterbatcher/config.go b/exporter/exporterbatcher/config.go new file mode 100644 index 00000000000..239dc2dd4fe --- /dev/null +++ b/exporter/exporterbatcher/config.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" + +import ( + "errors" + "time" +) + +// Config defines a configuration for batching requests based on a timeout and a minimum number of items. +// MaxSizeItems defines batch splitting functionality if it's more than zero. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + // Enabled indicates whether to not enqueue batches before sending to the consumerSender. + Enabled bool `mapstructure:"enabled"` + + // FlushTimeout sets the time after which a batch will be sent regardless of its size. + FlushTimeout time.Duration `mapstructure:"flush_timeout"` + + MinSizeConfig `mapstructure:",squash"` + MaxSizeConfig `mapstructure:",squash"` +} + +// MinSizeConfig defines the configuration for the minimum number of items in a batch. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MinSizeConfig struct { + // MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be + // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. + // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. + MinSizeItems int `mapstructure:"min_size_items"` +} + +// MaxSizeConfig defines the configuration for the maximum number of items in a batch. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MaxSizeConfig struct { + // MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP. + // If the batch size exceeds this value, it will be broken up into smaller batches if possible. + // Setting this value to zero disables the maximum size limit. + MaxSizeItems int `mapstructure:"max_size_items"` +} + +func (c Config) Validate() error { + if c.MinSizeItems < 0 { + return errors.New("min_size_items must be greater than or equal to zero") + } + if c.MaxSizeItems < 0 { + return errors.New("max_size_items must be greater than or equal to zero") + } + if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems { + return errors.New("max_size_items must be greater than or equal to min_size_items") + } + if c.FlushTimeout <= 0 { + return errors.New("timeout must be greater than zero") + } + return nil +} + +func NewDefaultConfig() Config { + return Config{ + Enabled: true, + FlushTimeout: 200 * time.Millisecond, + MinSizeConfig: MinSizeConfig{ + MinSizeItems: 8192, + }, + } +} diff --git a/exporter/exporterbatcher/config_test.go b/exporter/exporterbatcher/config_test.go new file mode 100644 index 00000000000..337f83ce318 --- /dev/null +++ b/exporter/exporterbatcher/config_test.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig_Validate(t *testing.T) { + cfg := NewDefaultConfig() + assert.NoError(t, cfg.Validate()) + + cfg.MinSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero") + + cfg = NewDefaultConfig() + cfg.FlushTimeout = 0 + assert.EqualError(t, cfg.Validate(), "timeout must be greater than zero") + + cfg.MaxSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero") + + cfg = NewDefaultConfig() + cfg.MaxSizeItems = 20000 + cfg.MinSizeItems = 20001 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items") +} diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go new file mode 100644 index 00000000000..1a6f08ced3f --- /dev/null +++ b/exporter/exporterhelper/batch_sender.go @@ -0,0 +1,218 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" +) + +// batchSender is a component that places requests into batches before passing them to the downstream senders. +// Batches are sent out with any of the following conditions: +// - batch size reaches cfg.SendBatchSize +// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. +// - concurrencyLimit is reached. +type batchSender struct { + baseRequestSender + cfg exporterbatcher.Config + mergeFunc exporterbatcher.BatchMergeFunc[Request] + mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[Request] + + // concurrencyLimit is the maximum number of goroutines that can be created by the batcher. + // If this number is reached and all the goroutines are busy, the batch will be sent right away. + // Populated from the number of queue consumers if queue is enabled. + concurrencyLimit uint64 + activeRequests atomic.Uint64 + + resetTimerCh chan struct{} + + mu sync.Mutex + activeBatch *batch + + logger *zap.Logger + + shutdownCh chan struct{} + stopped *atomic.Bool +} + +// newBatchSender returns a new batch consumer component. +func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings) *batchSender { + bs := &batchSender{ + activeBatch: newEmptyBatch(), + cfg: cfg, + logger: set.Logger, + shutdownCh: make(chan struct{}), + stopped: &atomic.Bool{}, + resetTimerCh: make(chan struct{}), + } + return bs +} + +func (bs *batchSender) Start(_ context.Context, _ component.Host) error { + timer := time.NewTimer(bs.cfg.FlushTimeout) + go func() { + for { + select { + case <-bs.shutdownCh: + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() + timer.Reset(bs.cfg.FlushTimeout) + case <-bs.resetTimerCh: + if !timer.Stop() { + <-timer.C + } + timer.Reset(bs.cfg.FlushTimeout) + } + } + }() + + return nil +} + +type batch struct { + ctx context.Context + request Request + done chan struct{} + err error +} + +func newEmptyBatch() *batch { + return &batch{ + ctx: context.Background(), + done: make(chan struct{}), + } +} + +// exportActiveBatch exports the active batch asynchronously and replaces it with a new one. +// Caller must hold the lock. +func (bs *batchSender) exportActiveBatch() { + go func(b *batch) { + b.err = b.request.Export(b.ctx) + close(b.done) + }(bs.activeBatch) + bs.activeBatch = newEmptyBatch() +} + +// isActiveBatchReady returns true if the active batch is ready to be exported. +// The batch is ready if it has reached the minimum size or the concurrency limit is reached. +// Caller must hold the lock. +func (bs *batchSender) isActiveBatchReady() bool { + return bs.activeBatch.request.ItemsCount() >= bs.cfg.MinSizeItems || + (bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit) +} + +func (bs *batchSender) send(ctx context.Context, req Request) error { + // Stopped batch sender should act as pass-through to allow the queue to be drained. + if bs.stopped.Load() { + return bs.nextSender.send(ctx, req) + } + + bs.activeRequests.Add(1) + defer bs.activeRequests.Add(^uint64(0)) + + if bs.cfg.MaxSizeItems > 0 { + return bs.sendMergeSplitBatch(ctx, req) + } + return bs.sendMergeBatch(ctx, req) +} + +// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests. +func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error { + bs.mu.Lock() + + reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req) + if err != nil || len(reqs) == 0 { + bs.mu.Unlock() + return err + } + if len(reqs) == 1 || bs.activeBatch.request != nil { + bs.updateActiveBatch(ctx, reqs[0]) + batch := bs.activeBatch + if bs.isActiveBatchReady() || len(reqs) > 1 { + bs.exportActiveBatch() + bs.resetTimerCh <- struct{}{} + } + bs.mu.Unlock() + <-batch.done + if batch.err != nil { + return batch.err + } + reqs = reqs[1:] + } else { + bs.mu.Unlock() + } + + // Intentionally do not put the last request in the active batch to not block it. + // TODO: Consider including the partial request in the error to avoid double publishing. + for _, r := range reqs { + if err := r.Export(ctx); err != nil { + return err + } + } + return nil +} + +// sendMergeBatch sends the request to the batch and waits for the batch to be exported. +func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error { + bs.mu.Lock() + if bs.activeBatch.request != nil { + var err error + req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req) + if err != nil { + bs.mu.Unlock() + return err + } + } + bs.updateActiveBatch(ctx, req) + batch := bs.activeBatch + if bs.isActiveBatchReady() { + bs.exportActiveBatch() + bs.resetTimerCh <- struct{}{} + } + bs.mu.Unlock() + <-batch.done + return batch.err +} + +// updateActiveBatch update the active batch to the new merged request and context. +// The context is only set once and is not updated after the first call. +// Merging the context would be complex and require an additional goroutine to handle the context cancellation. +// We take the approach of using the context from the first request since it's likely to have the shortest timeout. +func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) { + if bs.activeBatch.request == nil { + bs.activeBatch.ctx = ctx + } + bs.activeBatch.request = req +} + +func (bs *batchSender) Shutdown(context.Context) error { + bs.stopped.Store(true) + close(bs.shutdownCh) + // Wait for the active requests to finish. + for bs.activeRequests.Load() > 0 { + time.Sleep(10 * time.Millisecond) + } + return nil +} diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go new file mode 100644 index 00000000000..237aa58a70a --- /dev/null +++ b/exporter/exporterhelper/batch_sender_test.go @@ -0,0 +1,407 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterqueue" +) + +func TestBatchSender_Merge(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.MinSizeItems = 10 + cfg.FlushTimeout = 100 * time.Millisecond + + tests := []struct { + name string + batcherOption Option + }{ + { + name: "split_disabled", + batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + }, + { + name: "split_high_limit", + batcherOption: func() Option { + c := cfg + c.MaxSizeItems = 1000 + return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + }(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be := queueBatchExporter(t, tt.batcherOption) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + + // the first two requests should be merged into one and sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 + }, 50*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) + + // the third and fifth requests should be sent by reaching the timeout + // the fourth request should be ignored because of the merge error. + time.Sleep(50 * time.Millisecond) + + // should be ignored because of the merge error. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, + mergeErr: errors.New("merge error")})) + + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSender_BatchExportError(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.MinSizeItems = 10 + tests := []struct { + name string + batcherOption Option + expectedRequests uint64 + expectedItems uint64 + }{ + { + name: "merge_only", + batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + }, + { + name: "merge_without_split_triggered", + batcherOption: func() Option { + c := cfg + c.MaxSizeItems = 200 + return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + }(), + }, + { + name: "merge_with_split_triggered", + batcherOption: func() Option { + c := cfg + c.MaxSizeItems = 20 + return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + }(), + expectedRequests: 1, + expectedItems: 20, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be := queueBatchExporter(t, tt.batcherOption) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + + // the first two requests should be blocked by the batchSender. + time.Sleep(50 * time.Millisecond) + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + + // the third request should trigger the export and cause an error. + errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} + require.NoError(t, be.send(context.Background(), errReq)) + + // the batch should be dropped since the queue doesn't have requeuing enabled. + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == tt.expectedRequests && + sink.itemsCount.Load() == tt.expectedItems && + be.batchSender.(*batchSender).activeRequests.Load() == uint64(0) && + be.queueSender.(*queueSender).queue.Size() == 0 + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSender_MergeOrSplit(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.MinSizeItems = 5 + cfg.MaxSizeItems = 10 + cfg.FlushTimeout = 100 * time.Millisecond + be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + // should be sent right away by reaching the minimum items size. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 50*time.Millisecond, 10*time.Millisecond) + + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 + }, 50*time.Millisecond, 10*time.Millisecond) + + // request that cannot be split should be dropped. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, + mergeErr: errors.New("split error")})) + + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 + }, 50*time.Millisecond, 10*time.Millisecond) + + fmt.Println("TestBatchSender_MergeOrSplit") +} + +func TestBatchSender_Shutdown(t *testing.T) { + batchCfg := exporterbatcher.NewDefaultConfig() + batchCfg.MinSizeItems = 10 + be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + + // To make the request reached the batchSender before shutdown. + time.Sleep(50 * time.Millisecond) + + require.NoError(t, be.Shutdown(context.Background())) + + // shutdown should force sending the batch + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(3), sink.itemsCount.Load()) +} + +func TestBatchSender_Disabled(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + cfg.MaxSizeItems = 10 + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // should be sent right away because batching is disabled. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(8), sink.itemsCount.Load()) +} + +func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { + invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ Request, req2 Request) ([]Request, + error) { + // reply with invalid 0 length slice if req2 is more than 20 items + if req2.(*fakeRequest).items > 20 { + return []Request{}, nil + } + // otherwise reply with a single request. + return []Request{req2}, nil + } + cfg := exporterbatcher.NewDefaultConfig() + cfg.FlushTimeout = 50 * time.Millisecond + cfg.MaxSizeItems = 20 + be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // first request should be ignored due to invalid merge/split function. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) + // second request should be sent after reaching the timeout. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func TestBatchSender_PostShutdown(t *testing.T) { + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, + fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, be.Shutdown(context.Background())) + + // Closed batch sender should act as a pass-through to not block queue draining. + sink := newFakeRequestSink() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(8), sink.itemsCount.Load()) +} + +func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { + qCfg := exporterqueue.NewDefaultConfig() + qCfg.NumConsumers = 2 + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) + require.NotNil(t, be) + require.NoError(t, err) + assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + + time.Sleep(50 * time.Millisecond) + // the first request should be still in-flight. + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + + // the second request should be sent by reaching max concurrency limit. + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 16 + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func TestBatchSender_BatchBlocking(t *testing.T) { + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 3 + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 6 blocking requests + wg := sync.WaitGroup{} + for i := 0; i < 6; i++ { + wg.Add(1) + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) + wg.Done() + }() + } + wg.Wait() + + // should be sent in two batches since the batch size is 3 + assert.Equal(t, uint64(2), sink.requestsCount.Load()) + assert.Equal(t, uint64(6), sink.itemsCount.Load()) + + require.NoError(t, be.Shutdown(context.Background())) +} + +// Validate that the batch is cancelled once the first request in the request is cancelled +func TestBatchSender_BatchCancelled(t *testing.T) { + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 2 + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 2 blocking requests + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) + wg.Done() + }() + wg.Add(1) + go func() { + time.Sleep(20 * time.Millisecond) // ensure this call is the second + assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) + wg.Done() + }() + cancel() // canceling the first request should cancel the whole batch + wg.Wait() + + // nothing should be delivered + assert.Equal(t, uint64(0), sink.requestsCount.Load()) + assert.Equal(t, uint64(0), sink.itemsCount.Load()) + + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestBatchSender_DrainActiveRequests(t *testing.T) { + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 2 + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 3 blocking requests with a timeout + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + go func() { + assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + + // give time for the first two requests to be batched + time.Sleep(20 * time.Millisecond) + + // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. + // It should take 120 milliseconds to complete. + require.NoError(t, be.Shutdown(context.Background())) + + assert.Equal(t, uint64(2), sink.requestsCount.Load()) + assert.Equal(t, uint64(3), sink.itemsCount.Load()) +} + +func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, batchOption, + WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) + require.NotNil(t, be) + require.NoError(t, err) + return be +} diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index b67dbacc449..b81bfdc4efa 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" ) @@ -146,6 +147,37 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +// BatcherOption apply changes to batcher sender. +type BatcherOption func(*batchSender) + +// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types. +func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption { + return func(bs *batchSender) { + bs.mergeFunc = mf + bs.mergeSplitFunc = msf + } +} + +// WithBatcher enables batching for an exporter based on custom request types. +// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and +// WithRequestBatchFuncs provided. +// TODO: Add OTLP-based batch functions applied by default so it can be used with New[Traces|Metrics|Logs]Exporter exporter helpers. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option { + return func(o *baseExporter) error { + bs := newBatchSender(cfg, o.set) + for _, opt := range opts { + opt(bs) + } + if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { + return fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters") + } + o.batchSender = bs + return nil + } +} + // withMarshaler is used to set the request marshaler for the new exporter helper. // It must be provided as the first option when creating a new exporter helper. func withMarshaler(marshaler exporterqueue.Marshaler[Request]) Option { @@ -182,6 +214,7 @@ type baseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. + batchSender requestSender queueSender requestSender obsrepSender requestSender retrySender requestSender @@ -199,6 +232,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf be := &baseExporter{ signal: signal, + batchSender: &baseRequestSender{}, queueSender: &baseRequestSender{}, obsrepSender: osf(obsReport), retrySender: &baseRequestSender{}, @@ -217,6 +251,13 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf be.connectSenders() + // If queue sender is enabled assign to the batch sender the same number of workers. + if qs, ok := be.queueSender.(*queueSender); ok { + if bs, ok := be.batchSender.(*batchSender); ok { + bs.concurrencyLimit = uint64(qs.numConsumers) + } + } + return be, nil } @@ -232,7 +273,8 @@ func (be *baseExporter) send(ctx context.Context, req Request) error { // connectSenders connects the senders in the predefined order. func (be *baseExporter) connectSenders() { - be.queueSender.setNextSender(be.obsrepSender) + be.queueSender.setNextSender(be.batchSender) + be.batchSender.setNextSender(be.obsrepSender) be.obsrepSender.setNextSender(be.retrySender) be.retrySender.setNextSender(be.timeoutSender) } @@ -243,7 +285,12 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { return err } - // If no error then start the queueSender. + // If no error then start the batchSender. + if err := be.batchSender.Start(ctx, host); err != nil { + return err + } + + // Last start the queueSender. return be.queueSender.Start(ctx, host) } @@ -251,6 +298,8 @@ func (be *baseExporter) Shutdown(ctx context.Context) error { return multierr.Combine( // First shutdown the retry sender, so the queue sender can flush the queue without retries. be.retrySender.Shutdown(ctx), + // Then shutdown the batch sender + be.batchSender.Shutdown(ctx), // Then shutdown the queue sender. be.queueSender.Shutdown(ctx), // Last shutdown the wrapped exporter itself. diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 3092c60c98f..3e539d7f9a0 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -31,7 +31,9 @@ var ( type QueueSettings struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. Enabled bool `mapstructure:"enabled"` - // NumConsumers is the number of consumers from the queue. + // NumConsumers is the number of consumers from the queue. Defaults to 10. + // If batching is enabled, a combined batch cannot contain more requests than the number of consumers. + // So it's recommended to set higher number of consumers if batching is enabled. NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` @@ -73,6 +75,7 @@ type queueSender struct { baseRequestSender fullName string queue exporterqueue.Queue[Request] + numConsumers int traceAttribute attribute.KeyValue logger *zap.Logger meter otelmetric.Meter @@ -87,6 +90,7 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.CreateSettings, qs := &queueSender{ fullName: set.ID.String(), queue: q, + numConsumers: numConsumers, traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 18e8228f946..fe373c67e12 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,25 +5,117 @@ package exporterhelper import ( "context" + "sync/atomic" + "time" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) +type fakeRequestSink struct { + requestsCount *atomic.Uint64 + itemsCount *atomic.Uint64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: &atomic.Uint64{}, + itemsCount: &atomic.Uint64{}, + } +} + type fakeRequest struct { - items int - err error + items int + exportErr error + mergeErr error + delay time.Duration + sink *fakeRequestSink } -func (r fakeRequest) Export(context.Context) error { - return r.err +func (r *fakeRequest) Export(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.delay): + } + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(uint64(r.items)) + } + return nil } -func (r fakeRequest) ItemsCount() int { +func (r *fakeRequest) ItemsCount() int { return r.items } +func fakeBatchMergeFunc(_ context.Context, r1 Request, r2 Request) (Request, error) { + if r1 == nil { + return r2, nil + } + fr1 := r1.(*fakeRequest) + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{ + items: fr1.items + fr2.items, + sink: fr1.sink, + exportErr: fr2.exportErr, + delay: fr1.delay + fr2.delay, + }, nil +} + +func fakeBatchMergeSplitFunc(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { + maxItems := cfg.MaxSizeItems + if maxItems == 0 { + r, err := fakeBatchMergeFunc(ctx, r1, r2) + return []Request{r}, err + } + + if r2.(*fakeRequest).mergeErr != nil { + return nil, r2.(*fakeRequest).mergeErr + } + + fr2 := r2.(*fakeRequest) + fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay} + var res []Request + + // fill fr1 to maxItems if it's not nil + if r1 != nil { + fr1 := r1.(*fakeRequest) + fr1 = &fakeRequest{items: fr1.items, sink: fr1.sink, exportErr: fr1.exportErr, delay: fr1.delay} + if fr2.items <= maxItems-fr1.items { + fr1.items += fr2.items + if fr2.exportErr != nil { + fr1.exportErr = fr2.exportErr + } + return []Request{fr1}, nil + } + // if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases + fr2.items -= maxItems - fr1.items + fr1.items = maxItems + res = append(res, fr1) + } + + // split fr2 to maxItems + for { + if fr2.items <= maxItems { + res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + break + } + res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + fr2.items -= maxItems + } + + return res, nil +} + type fakeRequestConverter struct { metricsError error tracesError error @@ -32,13 +124,13 @@ type fakeRequestConverter struct { } func (frc *fakeRequestConverter) requestFromMetricsFunc(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: frc.requestError}, frc.metricsError + return &fakeRequest{items: md.DataPointCount(), exportErr: frc.requestError}, frc.metricsError } func (frc *fakeRequestConverter) requestFromTracesFunc(_ context.Context, md ptrace.Traces) (Request, error) { - return fakeRequest{items: md.SpanCount(), err: frc.requestError}, frc.tracesError + return &fakeRequest{items: md.SpanCount(), exportErr: frc.requestError}, frc.tracesError } func (frc *fakeRequestConverter) requestFromLogsFunc(_ context.Context, md plog.Logs) (Request, error) { - return fakeRequest{items: md.LogRecordCount(), err: frc.requestError}, frc.logsError + return &fakeRequest{items: md.LogRecordCount(), exportErr: frc.requestError}, frc.logsError }