From bf4f4961eb1a423faf7664e87f604a4683917090 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Thu, 30 Nov 2023 16:44:44 -0800 Subject: [PATCH] batcher --- .chloggen/batch-exporter-helper.yaml | 25 ++ exporter/exporterhelper/batch_sender.go | 319 +++++++++++++++++++ exporter/exporterhelper/batch_sender_test.go | 230 +++++++++++++ exporter/exporterhelper/common.go | 27 +- exporter/exporterhelper/common_test.go | 11 - exporter/exporterhelper/queue_sender.go | 6 +- exporter/exporterhelper/request_test.go | 88 ++++- 7 files changed, 684 insertions(+), 22 deletions(-) create mode 100755 .chloggen/batch-exporter-helper.yaml create mode 100644 exporter/exporterhelper/batch_sender.go create mode 100644 exporter/exporterhelper/batch_sender_test.go diff --git a/.chloggen/batch-exporter-helper.yaml b/.chloggen/batch-exporter-helper.yaml new file mode 100755 index 00000000000..613cfbac5b2 --- /dev/null +++ b/.chloggen/batch-exporter-helper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add experimental batching capabilities to the exporter helper + +# One or more tracking issues or pull requests related to the change +issues: [8122] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go new file mode 100644 index 00000000000..6636abb1089 --- /dev/null +++ b/exporter/exporterhelper/batch_sender.go @@ -0,0 +1,319 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" +) + +// MergeBatcherConfig defines a basic configuration for batching requests based on a timeout and a minimum number of +// items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MergeBatcherConfig struct { + // Enabled indicates whether to not enqueue batches before sending to the consumerSender. + Enabled bool `mapstructure:"enabled"` + + // Timeout sets the time after which a batch will be sent regardless of its size. + // When this is set to zero, data will be sent immediately. + // This is a recommended option, as it will ensure that the data is sent in a timely manner. + Timeout time.Duration `mapstructure:"timeout"` // Is there a better name to avoid confusion with the consumerSender timeout? + + // MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be + // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. + // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. + MinSizeItems int `mapstructure:"min_size_items"` +} + +func (c MergeBatcherConfig) Validate() error { + if c.MinSizeItems < 0 { + return errors.New("min_size_items must be greater than or equal to zero") + } + if c.Timeout <= 0 { + return errors.New("timeout must be greater than zero") + } + return nil +} + +func NewDefaultMergeBatcherConfig() MergeBatcherConfig { + return MergeBatcherConfig{ + Enabled: true, + Timeout: 200 * time.Millisecond, + MinSizeItems: 8192, + } +} + +// SplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and +// minimum and maximum number of items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type SplitBatcherConfig struct { + // MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP, + // but can be anything else for other formats. If the batch size exceeds this value, + // it will be broken up into smaller batches if possible. + // Setting this value to zero disables the maximum size limit. + MaxSizeItems int `mapstructure:"max_size_items"` +} + +func (c SplitBatcherConfig) Validate() error { + if c.MaxSizeItems < 0 { + return errors.New("max_size_items must be greater than or equal to zero") + } + return nil +} + +// MergeSplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and +// minimum and maximum number of items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MergeSplitBatcherConfig struct { + MergeBatcherConfig `mapstructure:",squash"` + SplitBatcherConfig `mapstructure:",squash"` +} + +func (c MergeSplitBatcherConfig) Validate() error { + if c.MaxSizeItems < c.MinSizeItems { + return errors.New("max_size_items must be greater than or equal to min_size_items") + } + return nil +} + +// BatchConfigBatchersLimit defines batching configuration part for setting a maximum number of batchers. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchConfigBatchersLimit struct { + // BatchersLimit is the maximum number of batchers that can be used for batching. + // Requests producing batch identifiers that exceed this limit will be dropped. + // If this value is zero, then there is no limit on the number of batchers. + BatchersLimit int `mapstructure:"batchers_limit"` +} + +// BatchMergeFunc is a function that merges two requests into a single request. +// Context will be propagated from the first request. If you want to separate batches based on the context, +// use WithRequestBatchIdentifier option. Context will be propagated from the first request. +// Do not mutate the requests passed to the function if error can be returned after mutation. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeFunc func(context.Context, Request, Request) (Request, error) + +// BatchMergeSplitFunc is a function that merge and/or splits a request into multiple requests based on the provided +// limit of maximum number of items. All the returned requests MUST have a number of items that does not exceed the +// maximum number of items. Size of the last returned request MUST be less or equal than the size of any other returned +// request. The original request MUST not be mutated if error is returned. The length of the returned slice MUST not +// be 0. The optionalReq argument can be nil, make sure to check it before using. maxItems argument is guaranteed to be +// greater than 0. Context will be propagated from the original request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeSplitFunc func(ctx context.Context, optionalReq Request, req Request, maxItems int) ([]Request, error) + +type BatcherOption func(*batchSender) + +func WithSplitBatcher(cfg SplitBatcherConfig, msf BatchMergeSplitFunc) BatcherOption { + return func(b *batchSender) { + if cfg.MaxSizeItems != 0 { + b.splitCfg = cfg + b.mergeSplitFunc = msf + } + } +} + +// batchSender is a component that accepts places requests into batches before passing them to the downstream senders. +// +// batch_processor implements consumer.Traces and consumer.Metrics +// +// Batches are sent out with any of the following conditions: +// - batch size reaches cfg.SendBatchSize +// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. +type batchSender struct { + baseRequestSender + mergeCfg MergeBatcherConfig + splitCfg SplitBatcherConfig + mergeFunc BatchMergeFunc + mergeSplitFunc BatchMergeSplitFunc + + // concurrencyLimit is the maximum number of goroutines that can be created by the batcher. + // If this number is reached and all the goroutines are busy, the batch will be sent right away. + // Populated from the number of queue consumers if queue is enabled. + concurrencyLimit int + activeRequestsWG sync.WaitGroup + + resetTimerCh chan struct{} + + mu sync.Mutex + activeBatch *batch + + logger *zap.Logger + + shutdownCh chan struct{} + stopped *atomic.Bool +} + +// newBatchSender returns a new batch consumer component. +func newBatchSender(cfg MergeBatcherConfig, set exporter.CreateSettings, mf BatchMergeFunc, opts ...BatcherOption) requestSender { + bs := &batchSender{ + activeBatch: newEmptyBatch(), + mergeCfg: cfg, + mergeFunc: mf, + logger: set.Logger, + shutdownCh: make(chan struct{}), + stopped: &atomic.Bool{}, + resetTimerCh: make(chan struct{}), + } + + for _, op := range opts { + op(bs) + } + return bs +} + +func (bs *batchSender) Start(_ context.Context, _ component.Host) error { + timer := time.NewTimer(bs.mergeCfg.Timeout) + go func() { + for { + select { + case <-bs.shutdownCh: + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() + return + case <-timer.C: + bs.mu.Lock() + if bs.activeBatch.request != nil { + bs.exportActiveBatch() + } + bs.mu.Unlock() + timer.Reset(bs.mergeCfg.Timeout) + case <-bs.resetTimerCh: + if !timer.Stop() { + <-timer.C + } + timer.Reset(bs.mergeCfg.Timeout) + } + } + }() + + return nil +} + +type batch struct { + ctx context.Context + request Request + done chan struct{} + err error +} + +func newEmptyBatch() *batch { + return &batch{ + ctx: context.Background(), + done: make(chan struct{}), + } +} + +// Caller must hold the lock. +func (bs *batchSender) exportActiveBatch() { + go func(b *batch) { + b.err = b.request.Export(b.ctx) + close(b.done) + }(bs.activeBatch) + bs.activeBatch = newEmptyBatch() +} + +func (bs *batchSender) send(ctx context.Context, req Request) error { + if bs.stopped.Load() { + return errors.New("batchSender is stopped") + } + + bs.activeRequestsWG.Add(1) + defer bs.activeRequestsWG.Done() + + if bs.mergeSplitFunc != nil { + return bs.sendMergeSplitBatch(ctx, req) + } + return bs.sendMergeBatch(ctx, req) +} + +func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error { + bs.mu.Lock() + + reqs, err := bs.mergeSplitFunc(ctx, bs.activeBatch.request, req, bs.splitCfg.MaxSizeItems) + if err != nil || len(reqs) == 0 { + bs.mu.Unlock() + return err + } + if len(reqs) == 1 || bs.activeBatch.request != nil { + bs.activeBatch.request = reqs[0] + bs.activeBatch.ctx = ctx + batch := bs.activeBatch + var sent bool + if batch.request.ItemsCount() >= bs.mergeCfg.MinSizeItems || len(reqs) > 1 { + bs.exportActiveBatch() + sent = true + } + bs.mu.Unlock() + if sent { + bs.resetTimerCh <- struct{}{} + } + <-batch.done + if batch.err != nil { + return batch.err + } + reqs = reqs[1:] + } else { + bs.mu.Unlock() + } + + // Intentionally do not put the last request in the active batch to not block it. + // TODO: Consider including the partial request in the error to avoid double publishing. + for _, r := range reqs { + if err := r.Export(ctx); err != nil { + return err + } + } + return nil +} + +func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error { + bs.mu.Lock() + + if bs.activeBatch.request != nil { + var err error + req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req) + if err != nil { + bs.mu.Unlock() + return err + } + } + bs.activeBatch.request = req + bs.activeBatch.ctx = ctx + batch := bs.activeBatch + var sent bool + if bs.activeBatch.request.ItemsCount() >= bs.mergeCfg.MinSizeItems { + bs.exportActiveBatch() + sent = true + } + bs.mu.Unlock() + if sent { + bs.resetTimerCh <- struct{}{} + } + <-batch.done + return batch.err +} + +func (bs *batchSender) Shutdown(context.Context) error { + bs.stopped.Store(true) + close(bs.shutdownCh) + bs.activeRequestsWG.Wait() + return nil +} diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go new file mode 100644 index 00000000000..95aab44463e --- /dev/null +++ b/exporter/exporterhelper/batch_sender_test.go @@ -0,0 +1,230 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestBatchSender_MergeBatcherConfig_Validate(t *testing.T) { + cfg := NewDefaultMergeBatcherConfig() + assert.NoError(t, cfg.Validate()) + + cfg.MinSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero") + + cfg = NewDefaultMergeBatcherConfig() + cfg.Timeout = 0 + assert.EqualError(t, cfg.Validate(), "timeout must be greater than zero") +} + +func TestBatchSender_SplitBatcherConfig_Validate(t *testing.T) { + cfg := SplitBatcherConfig{} + assert.NoError(t, cfg.Validate()) + + cfg.MaxSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero") +} + +func TestBatchSender_MergeSplitBatcherConfig_Validate(t *testing.T) { + cfg := MergeSplitBatcherConfig{ + MergeBatcherConfig: NewDefaultMergeBatcherConfig(), + SplitBatcherConfig: SplitBatcherConfig{ + MaxSizeItems: 20000, + }, + } + assert.NoError(t, cfg.Validate()) + + cfg.MinSizeItems = 20001 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items") +} + +func TestBatchSender_Merge(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 10 + mergeCfg.Timeout = 100 * time.Millisecond + + tests := []struct { + name string + batcherOption Option + }{ + { + name: "merge_only", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc), + }, + { + name: "split_disabled", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 0}, fakeBatchMergeSplitFunc)), + }, + { + name: "split_high_limit", + batcherOption: WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 1000}, fakeBatchMergeSplitFunc)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be := queueBatchExporter(t, tt.batcherOption) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + + // the first two requests should be merged into one and sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 + }, 50*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) + + // the third and fifth requests should be sent by reaching the timeout + // the fourth request should be ignored because of the merge error. + time.Sleep(50 * time.Millisecond) + + // should be ignored because of the merge error. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, + mergeErr: errors.New("merge error")})) + + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSender_MergeOrSplit(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 5 + mergeCfg.Timeout = 100 * time.Millisecond + be := queueBatchExporter(t, WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 10}, fakeBatchMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + // should be sent right away by reaching the minimum items size. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 50*time.Millisecond, 10*time.Millisecond) + + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 + }, 50*time.Millisecond, 10*time.Millisecond) + + // request that cannot be split should be dropped. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, + mergeErr: errors.New("split error")})) + + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 + }, 50*time.Millisecond, 10*time.Millisecond) + + fmt.Println("TestBatchSender_MergeOrSplit") +} + +func TestBatchSender_Shutdown(t *testing.T) { + batchCfg := NewDefaultMergeBatcherConfig() + batchCfg.MinSizeItems = 10 + be := queueBatchExporter(t, WithBatcher(batchCfg, fakeBatchMergeFunc)) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + + // To make the request reached the batchSender before shutdown. + time.Sleep(50 * time.Millisecond) + + require.NoError(t, be.Shutdown(context.Background())) + + // shutdown should force sending the batch + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(3), sink.itemsCount.Load()) +} + +func TestBatcherDisabled(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.Enabled = false + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, + WithBatcher(mergeCfg, fakeBatchMergeFunc, WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 10}, fakeBatchMergeSplitFunc))) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // should be sent right away because batching is disabled. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, uint64(1), sink.requestsCount.Load()) + assert.Equal(t, uint64(8), sink.itemsCount.Load()) +} + +func TestBatcherInvalidMergeSplitFunc(t *testing.T) { + invalidMergeSplitFunc := func(_ context.Context, _ Request, req2 Request, _ int) ([]Request, error) { + // reply with invalid 0 length slice if req2 is more than 20 items + if req2.(*fakeRequest).items > 20 { + return []Request{}, nil + } + // otherwise reply with a single request. + return []Request{req2}, nil + } + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.Timeout = 50 * time.Millisecond + be := queueBatchExporter(t, WithBatcher(mergeCfg, fakeBatchMergeFunc, + WithSplitBatcher(SplitBatcherConfig{MaxSizeItems: 20}, invalidMergeSplitFunc))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // first request should be ignored due to invalid merge/split function. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) + // second request should be sent after reaching the timeout. + require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, + newNoopObsrepSender, batchOption, WithQueue(NewDefaultQueueSettings())) + require.NotNil(t, be) + require.NoError(t, err) + return be +} diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 509e66bf7f9..c2c8a2f9b38 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -155,6 +155,12 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +func WithBatcher(cfg MergeBatcherConfig, mf BatchMergeFunc, opts ...BatcherOption) Option { + return func(o *baseExporter) { + o.batchSender = newBatchSender(cfg, o.set, mf, opts...) + } +} + // baseExporter contains common fields between different exporter types. type baseExporter struct { component.StartFunc @@ -171,6 +177,7 @@ type baseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. + batchSender requestSender queueSender requestSender obsrepSender requestSender retrySender requestSender @@ -194,6 +201,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req unmarshaler: unmarshaler, signal: signal, + batchSender: &baseRequestSender{}, queueSender: &baseRequestSender{}, obsrepSender: osf(obsReport), retrySender: &baseRequestSender{}, @@ -208,6 +216,13 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req } be.connectSenders() + // If queue sender is enabled assign to the batch sender the same number of workers. + if qs, ok := be.queueSender.(*queueSender); ok { + if bs, ok := be.batchSender.(*batchSender); ok { + bs.concurrencyLimit = qs.consumersNum + } + } + return be, nil } @@ -218,7 +233,8 @@ func (be *baseExporter) send(ctx context.Context, req Request) error { // connectSenders connects the senders in the predefined order. func (be *baseExporter) connectSenders() { - be.queueSender.setNextSender(be.obsrepSender) + be.queueSender.setNextSender(be.batchSender) + be.batchSender.setNextSender(be.obsrepSender) be.obsrepSender.setNextSender(be.retrySender) be.retrySender.setNextSender(be.timeoutSender) } @@ -229,7 +245,12 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { return err } - // If no error then start the queueSender. + // If no error then start the batchSender. + if err := be.batchSender.Start(ctx, host); err != nil { + return err + } + + // Last start the queueSender. return be.queueSender.Start(ctx, host) } @@ -239,6 +260,8 @@ func (be *baseExporter) Shutdown(ctx context.Context) error { be.retrySender.Shutdown(ctx), // Then shutdown the queue sender. be.queueSender.Shutdown(ctx), + // Then shutdown the batch sender + be.batchSender.Shutdown(ctx), // Last shutdown the wrapped exporter itself. be.ShutdownFunc.Shutdown(ctx)) } diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 0e74e095147..099d7b32c91 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -71,17 +71,6 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { } } -func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { - bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, - WithRetry(NewDefaultRetrySettings())) - require.Nil(t, err) - require.True(t, bs.requestExporter) - require.Panics(t, func() { - _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, - WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) - }) -} - func TestBaseExporterLogging(t *testing.T) { set := exportertest.NewNopCreateSettings() logger, observed := observer.New(zap.DebugLevel) diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index b61ac2032ad..f0e4e5b74e3 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -34,7 +34,9 @@ var ( type QueueSettings struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. Enabled bool `mapstructure:"enabled"` - // NumConsumers is the number of consumers from the queue. + // NumConsumers is the number of consumers from the queue. Defaults to 10. + // If batching is enabled, a combined batch cannot contain more requests than the number of consumers. + // So it's recommended to set higher number of consumers if batching is enabled. NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` @@ -87,6 +89,7 @@ type queueSender struct { traceAttribute attribute.KeyValue logger *zap.Logger meter otelmetric.Meter + consumersNum int requeuingEnabled bool metricCapacity otelmetric.Int64ObservableGauge @@ -101,6 +104,7 @@ func newQueueSender(queue Queue, queueLimiter internal.QueueCapacityLimiter[Requ logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), requeuingEnabled: requeueOnFailure, + consumersNum: config.NumConsumers, } qs.queueController = internal.NewQueueController[Request](queue, queueLimiter, numConsumers, qs.consume) return qs diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 593ee175319..7df6f9dd8c9 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,25 +5,97 @@ package exporterhelper import ( "context" + "sync/atomic" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) +type fakeRequestSink struct { + requestsCount *atomic.Uint64 + itemsCount *atomic.Uint64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: &atomic.Uint64{}, + itemsCount: &atomic.Uint64{}, + } +} + type fakeRequest struct { - items int - err error + items int + exportErr error + mergeErr error + sink *fakeRequestSink } -func (r fakeRequest) Export(_ context.Context) error { - return r.err +func (r *fakeRequest) Export(_ context.Context) error { + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(uint64(r.items)) + } + return nil } -func (r fakeRequest) ItemsCount() int { +func (r *fakeRequest) ItemsCount() int { return r.items } +func fakeBatchMergeFunc(_ context.Context, r1 Request, r2 Request) (Request, error) { + if r1 == nil { + return r2, nil + } + fr1 := r1.(*fakeRequest) + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{items: fr1.items + fr2.items, sink: fr1.sink}, nil +} + +func fakeBatchMergeSplitFunc(ctx context.Context, r1 Request, r2 Request, maxItems int) ([]Request, error) { + if maxItems == 0 { + r, err := fakeBatchMergeFunc(ctx, r1, r2) + return []Request{r}, err + } + + if r2.(*fakeRequest).mergeErr != nil { + return nil, r2.(*fakeRequest).mergeErr + } + + fr2 := &fakeRequest{items: r2.(*fakeRequest).items, sink: r2.(*fakeRequest).sink} + var res []Request + + // fill fr1 to maxItems if it's not nil + if r1 != nil { + fr1 := &fakeRequest{items: r1.(*fakeRequest).items, sink: r1.(*fakeRequest).sink} + if fr2.items <= maxItems-fr1.items { + fr1.items += fr2.items + return []Request{fr1}, nil + } + fr2.items -= maxItems - fr1.items + fr1.items = maxItems + res = append(res, fr1) + } + + // split fr2 to maxItems + for { + if fr2.items <= maxItems { + res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink}) + break + } + res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink}) + fr2.items -= maxItems + } + + return res, nil +} + type fakeRequestConverter struct { metricsError error tracesError error @@ -32,13 +104,13 @@ type fakeRequestConverter struct { } func (frc *fakeRequestConverter) requestFromMetricsFunc(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: frc.requestError}, frc.metricsError + return &fakeRequest{items: md.DataPointCount(), exportErr: frc.requestError}, frc.metricsError } func (frc *fakeRequestConverter) requestFromTracesFunc(_ context.Context, md ptrace.Traces) (Request, error) { - return fakeRequest{items: md.SpanCount(), err: frc.requestError}, frc.tracesError + return &fakeRequest{items: md.SpanCount(), exportErr: frc.requestError}, frc.tracesError } func (frc *fakeRequestConverter) requestFromLogsFunc(_ context.Context, md plog.Logs) (Request, error) { - return fakeRequest{items: md.LogRecordCount(), err: frc.requestError}, frc.logsError + return &fakeRequest{items: md.LogRecordCount(), exportErr: frc.requestError}, frc.logsError }