diff --git a/.chloggen/batch-exporter-helper.yaml b/.chloggen/batch-exporter-helper.yaml new file mode 100755 index 00000000000..613cfbac5b2 --- /dev/null +++ b/.chloggen/batch-exporter-helper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add experimental batching capabilities to the exporter helper + +# One or more tracking issues or pull requests related to the change +issues: [8122] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go new file mode 100644 index 00000000000..8315d70aab1 --- /dev/null +++ b/exporter/exporterhelper/batch_sender.go @@ -0,0 +1,443 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "go.opentelemetry.io/collector/component" + "runtime" + "sync" + "time" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// MergeBatcherConfig defines a basic configuration for batching requests based on a timeout and a minimum number of +// items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MergeBatcherConfig struct { + // Enabled indicates whether to not enqueue batches before sending to the consumerSender. + Enabled bool `mapstructure:"enabled"` + + // Timeout sets the time after which a batch will be sent regardless of its size. + // When this is set to zero, batched data will be sent immediately. + // This is a recommended option, as it will ensure that the data is sent in a timely manner. + Timeout time.Duration `mapstructure:"timeout"` // Is there a better name to avoid confusion with the consumerSender timeout? + + // MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be + // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. + // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. + MinSizeItems int `mapstructure:"min_size_items"` +} + +func (c MergeBatcherConfig) Validate() error { + if c.MinSizeItems < 0 { + return errors.New("min_size_items must be greater than or equal to zero") + } + if c.Timeout < 0 { + return errors.New("timeout must be greater than or equal to zero") + } + return nil +} + +func NewDefaultMergeBatcherConfig() MergeBatcherConfig { + return MergeBatcherConfig{ + Enabled: true, + Timeout: 200 * time.Millisecond, + MinSizeItems: 8192, + } +} + +// MergeSplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and +// minimum and maximum number of items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MergeSplitBatcherConfig struct { + MergeBatcherConfig `mapstructure:",squash"` + // MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP, + // but can be anything else for other formats. If the batch size exceeds this value, + // it will be broken up into smaller batches if possible. + MaxSizeItems int `mapstructure:"max_size_items"` +} + +func (c MergeSplitBatcherConfig) Validate() error { + if c.MaxSizeItems < 0 { + return errors.New("max_size_items must be greater than or equal to zero") + } + if c.MaxSizeItems < c.MinSizeItems { + return errors.New("max_size_items must be greater than or equal to min_size_items") + } + return nil +} + +// BatchConfigBatchersLimit defines batching configuration part for setting a maximum number of batchers. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchConfigBatchersLimit struct { + // BatchersLimit is the maximum number of batchers that can be used for batching. + // Requests producing batch identifiers that exceed this limit will be dropped. + // If this value is zero, then there is no limit on the number of batchers. + BatchersLimit int `mapstructure:"batchers_limit"` +} + +// BatchMergeFunc is a function that merges two requests into a single request. +// Context will be propagated from the first request. If you want to separate batches based on the context, +// use WithRequestBatchIdentifier option. +// Do not mutate the requests passed to the function if error can be returned after mutation. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeFunc func(context.Context, Request, Request) (Request, error) + +// BatchSplitFunc is a function that splits a request into multiple requests based on the provided maximum number of +// items. All the returned requests must have a number of items that does not exceed the maximum number of items. +// Size of the last returned request must not be more than the size of any other returned request. +// Do not mutate the request passed to the function if error can be returned after mutation. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchSplitFunc func(ctx context.Context, req Request, maxItems int) ([]Request, error) + +// IdentifyBatchFunc returns an identifier for a Request batch. This function can be used to separate particular +// Requests into different batches. Batches with different identifiers will not be merged together. +// Provided context can be used to extract information from the context and use it as a part of the identifier as well. +// This function is optional. If not provided, all Requests will be batched together. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type IdentifyBatchFunc func(ctx context.Context, r Request) string + +type shardGetter interface { + shard(*request) (*shard, error) +} + +type Batcher struct { + cfg MergeSplitBatcherConfig + mergeFunc BatchMergeFunc + splitFunc BatchSplitFunc + + shardGetter + + shutdownCh chan struct{} + goroutines sync.WaitGroup + logger *zap.Logger + + export func(*request) error +} + +// shutdown is invoked during service shutdown. +func (b *Batcher) shutdown() { + close(b.shutdownCh) + + // Wait until all goroutines are done. + b.goroutines.Wait() +} + +// newShard gets or creates a batcher corresponding with attrs. +func (b *Batcher) newShard() *shard { + s := &shard{ + batcher: b, + newRequest: make(chan *request, runtime.NumCPU()), + } + b.goroutines.Add(1) + go s.start() + return s +} + +func (b *Batcher) mergeRequests(req1 *request, req2 *request) (*request, error) { + r, err := b.mergeFunc(req1.Context(), req1.Request, req2.Request) + if err != nil { + return nil, err + } + return &request{ + baseRequest: baseRequest{ctx: req1.Context()}, + Request: r, + }, nil +} + +func (b *Batcher) splitRequest(req *request, maxItems int) ([]*request, error) { + rs, err := b.splitFunc(req.Context(), req.Request, maxItems) + if err != nil { + return nil, err + } + reqs := make([]*request, 0, len(rs)) + for _, r := range rs { + reqs = append(reqs, &request{ + baseRequest: baseRequest{ctx: req.Context()}, + Request: r, + }) + } + return reqs, nil +} + +func NewMergeBatcher(cfg MergeBatcherConfig, mf BatchMergeFunc, opts ...BatcherOption) *Batcher { + if !cfg.Enabled { + return nil + } + + b := &Batcher{ + cfg: MergeSplitBatcherConfig{ + MergeBatcherConfig: cfg, + }, + mergeFunc: mf, + shutdownCh: make(chan struct{}), + } + for _, op := range opts { + op(b) + } + if b.shardGetter == nil { + b.shardGetter = &singleShardGetter{singleShard: b.newShard()} + } + return b +} + +func NewMergeSplitBatcher(cfg MergeSplitBatcherConfig, mf BatchMergeFunc, sf BatchSplitFunc, opts ...BatcherOption) *Batcher { + b := NewMergeBatcher(cfg.MergeBatcherConfig, mf, opts...) + if b != nil { + b.cfg.MaxSizeItems = cfg.MaxSizeItems + b.splitFunc = sf + } + return b +} + +type BatcherOption func(*Batcher) + +func WithRequestBatchIdentifier(f IdentifyBatchFunc, bl BatchConfigBatchersLimit) BatcherOption { + return func(b *Batcher) { + b.shardGetter = &multiShardGetter{ + identifyBatchFunc: f, + batchersLimit: bl.BatchersLimit, + shardFactory: b.newShard, + } + } +} + +// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. +var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batch identifier combinations")) + +// batchSender is a component that accepts places requests into batches before passing them to the downstream senders. +// +// batch_processor implements consumer.Traces and consumer.Metrics +// +// Batches are sent out with any of the following conditions: +// - batch size reaches cfg.SendBatchSize +// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. +type batchSender struct { + baseRequestSender + + // batcher will be either *singletonBatcher or *multiBatcher + batcher *Batcher +} + +// newBatchSender returns a new batch consumer component. +func newBatchSender(set exporter.CreateSettings, b *Batcher) requestSender { + if b == nil { + return &baseRequestSender{} + } + b.logger = set.Logger + return &batchSender{ + batcher: b, + } +} + +// start is invoked during service startup. +func (bs *batchSender) start(_ context.Context, _ component.Host, _ exporter.CreateSettings) error { + bs.batcher.export = func(req *request) error { + return bs.nextSender.send(req) + } + return nil +} + +func (bs *batchSender) send(req internal.Request) error { + s, err := bs.batcher.shard(req.(*request)) + if err != nil { + return err + } + + // For now the batcher can work asyncronously only. Potentially we can + // add a sync mode later. + s.newRequest <- req.(*request) + return nil +} + +func (bs *batchSender) shutdown() { + bs.batcher.shutdown() +} + +// shard is a single instance of the batch logic. When metadata +// keys are in use, one of these is created per distinct combination +// of values. +type shard struct { + batcher *Batcher + + // timer informs the shard send a batch. + timer *time.Timer + + // newRequest is used to receive batches from producers. + newRequest chan *request + + // batch is an in-flight data item containing one of the + // underlying data types. + batch *request +} + +func (s *shard) start() { + defer s.batcher.goroutines.Done() + + var timerCh <-chan time.Time + if s.batcher.cfg.Timeout != 0 { + s.timer = time.NewTimer(s.batcher.cfg.Timeout) + timerCh = s.timer.C + } + + for { + select { + case <-s.batcher.shutdownCh: + DONE: + for { + select { + case req := <-s.newRequest: + s.processRequest(req) + default: + break DONE + } + } + // This is the close of the channel + if s.batch != nil && s.batch.Count() > 0 { + s.export(s.batch) + } + return + case req := <-s.newRequest: + s.processRequest(req) + case <-timerCh: + if s.batch != nil && s.batch.Count() > 0 { + s.export(s.batch) + } + s.resetTimer() + } + } +} + +func (s *shard) processRequest(req *request) { + s.appendToBatch(req) + sent := s.exportBatchOverflow() + + if s.batch != nil && (s.batch.Count() >= s.batcher.cfg.MinSizeItems || !s.hasTimer()) { + s.export(s.batch) + s.batch = nil + sent = true + } + + if sent { + s.stopTimer() + s.resetTimer() + } +} + +// exportBatchOverflow checks if the current batch exceeds the maximum size limit and sends split requests if needed. +// Returns true if at least one request was sent. +func (s *shard) exportBatchOverflow() bool { + if s.batcher.cfg.MaxSizeItems == 0 || s.batch.Count() <= s.batcher.cfg.MaxSizeItems { + return false + } + reqs, err := s.batcher.splitRequest(s.batch, s.batcher.cfg.MaxSizeItems) + if err != nil { + s.batcher.logger.Error("failed to split request", zap.Error(err)) + // Erase the batch that cannot be split. + s.batch = nil + return false + } + sent := false + for _, r := range reqs[:len(reqs)-1] { + s.export(r) + sent = true + } + s.batch = reqs[len(reqs)-1] + return sent +} + +// appendToBatch appends the request to the current batch. +func (s *shard) appendToBatch(req *request) { + if s.batch == nil { + s.batch = req + return + } + mergedReq, err := s.batcher.mergeRequests(s.batch, req) + if err != nil { + s.batcher.logger.Error("Failed to merge request", zap.Error(err)) + return + } + s.batch = mergedReq +} + +func (s *shard) export(req *request) { + err := s.batcher.export(req) + // TODO: Replace with metrics and logging. + if err != nil { + s.batcher.logger.Error("Failed to send batch", zap.Error(err)) + } +} + +func (s *shard) hasTimer() bool { + return s.timer != nil +} + +func (s *shard) stopTimer() { + if s.hasTimer() && !s.timer.Stop() { + <-s.timer.C + } +} + +func (s *shard) resetTimer() { + if s.hasTimer() { + s.timer.Reset(s.batcher.cfg.Timeout) + } +} + +// singleShardGetter is used when metadataKeys is empty, to avoid the +// additional lock and map operations used in multiBatcher. +type singleShardGetter struct { + singleShard *shard +} + +func (sb *singleShardGetter) shard(_ *request) (*shard, error) { + return sb.singleShard, nil +} + +// multiBatcher is used when metadataKeys is not empty. +type multiShardGetter struct { + identifyBatchFunc IdentifyBatchFunc + batchersLimit int + batchers sync.Map + shardFactory func() *shard + + // Guards the size and the storing logic to ensure no more than limit items are stored. + // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. + lock sync.Mutex + size int +} + +func (mb *multiShardGetter) shard(req *request) (*shard, error) { + id := mb.identifyBatchFunc(req.Context(), req.Request) + s, ok := mb.batchers.Load(id) + if ok { + return s.(*shard), nil + } + + mb.lock.Lock() + defer mb.lock.Unlock() + + if mb.batchersLimit != 0 && mb.size >= mb.batchersLimit { + return nil, errTooManyBatchers + } + + s, loaded := mb.batchers.LoadOrStore(id, mb.shardFactory()) + if !loaded { + mb.size++ + } + return s.(*shard), nil +} diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go new file mode 100644 index 00000000000..77725110ae6 --- /dev/null +++ b/exporter/exporterhelper/batch_sender_test.go @@ -0,0 +1,287 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestMergeBatcherConfigValidate(t *testing.T) { + cfg := NewDefaultMergeBatcherConfig() + assert.NoError(t, cfg.Validate()) + + cfg.MinSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero") + + cfg = NewDefaultMergeBatcherConfig() + cfg.Timeout = -1 + assert.EqualError(t, cfg.Validate(), "timeout must be greater than or equal to zero") +} + +func TestMergeSplitBatcherConfigValidate(t *testing.T) { + cfg := MergeSplitBatcherConfig{ + MergeBatcherConfig: NewDefaultMergeBatcherConfig(), + MaxSizeItems: 20000, + } + assert.NoError(t, cfg.Validate()) + + cfg.MaxSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero") + + cfg.MaxSizeItems = 1000 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items") +} + +func TestBatcherSingleShardMerge(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 10 + mergeCfg.Timeout = 100 * time.Millisecond + + tests := []struct { + name string + b *Batcher + }{ + { + name: "merge_only_batcher", + b: NewMergeBatcher(mergeCfg, fakeBatchMergeFunc), + }, + { + name: "merge_or_split_batcher", + b: NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 0}, + fakeBatchMergeFunc, fakeBatchSplitFunc), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(tt.b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink}))) + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink}))) + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink}))) + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink, + mergeErr: errors.New("merge error")}))) + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 1, sink: sink}))) + + // the first two requests should be merged into one and sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 + }, 50*time.Millisecond, 10*time.Millisecond) + + // the third and fifth requests should be sent by reaching the timeout + // the fourth request should be ignored because of the merge error + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 + }, 150*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatcherMultiShardMerge(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 10 + mergeCfg.Timeout = 100 * time.Millisecond + batchIdentifierOpt := WithRequestBatchIdentifier(func(_ context.Context, req Request) string { + return req.(*fakeRequest).batchID + }, BatchConfigBatchersLimit{BatchersLimit: 2}) + + tests := []struct { + name string + b *Batcher + }{ + { + name: "merge_only_batcher", + b: NewMergeBatcher(mergeCfg, fakeBatchMergeFunc, batchIdentifierOpt), + }, + { + name: "merge_or_split_batcher", + b: NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 0}, + fakeBatchMergeFunc, fakeBatchSplitFunc, batchIdentifierOpt), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(tt.b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink, + batchID: "1"}))) // batch 1 + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink, + batchID: "2"}))) // batch 2 + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink, + batchID: "1"}))) // batch 1 + + // batch 1 should be sent by reaching the minimum items size + assert.Eventually(t, func() bool { + fmt.Println(sink.requestsCount.Load(), sink.itemsCount.Load()) + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 + }, 50*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 9, sink: sink, + batchID: "2"}))) // batch 2 + require.Error(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink, + batchID: "3"}))) // batchers limit reached + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 2, sink: sink, + batchID: "2"}))) // batch 3 + + // batch 2 should be sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 23 + }, 50*time.Millisecond, 10*time.Millisecond) + + // batch 3 should be sent by reaching the timeout + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 + }, 150*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSenderShutdown(t *testing.T) { + batchCfg := NewDefaultMergeBatcherConfig() + batchCfg.MinSizeItems = 10 + b := NewMergeBatcher(batchCfg, fakeBatchMergeFunc) + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink}))) + + require.NoError(t, be.Shutdown(context.Background())) + + // shutdown should force sending the batch + assert.Equal(t, int64(1), sink.requestsCount.Load()) + assert.Equal(t, int64(3), sink.itemsCount.Load()) +} + +func TestBatcherSingleShardMergeOrSplit(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 5 + mergeCfg.Timeout = 100 * time.Millisecond + b := NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 10}, + fakeBatchMergeFunc, fakeBatchSplitFunc) + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + // should be sent right away by reaching the minimum items size. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink}))) + + // should be left in the queue because it doesn't reach the minimum items size. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink}))) + + assert.Eventually(t, func() bool { + fmt.Println(sink.requestsCount.Load(), sink.itemsCount.Load()) + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 50*time.Millisecond, 10*time.Millisecond) + + // big request should be merged with the second request and broken down into two requests + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 17, sink: sink}))) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 28 + }, 50*time.Millisecond, 10*time.Millisecond) + + // big request should be broken down into two requests, both are sent right away by reaching the minimum items size. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 17, sink: sink}))) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 45 + }, 50*time.Millisecond, 10*time.Millisecond) + + // request that cannot be split should be dropped. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 11, sink: sink, + splitErr: errors.New("split error")}))) + + // big request should be broken down into two requests, only the first one is sent right away by reaching the minimum items size. + // the second one should be sent by reaching the timeout. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 13, sink: sink}))) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 6 && sink.itemsCount.Load() == 55 + }, 50*time.Millisecond, 10*time.Millisecond) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 7 && sink.itemsCount.Load() == 58 + }, 150*time.Millisecond, 10*time.Millisecond) +} + +func TestBatcherNoTimeout(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 5 + mergeCfg.Timeout = 0 + b := NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 10}, + fakeBatchMergeFunc, fakeBatchSplitFunc) + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // should be sent almost right away as two requests even it doesn't reach the minimum items size. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 12, sink: sink}))) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 12 + }, 10*time.Millisecond, time.Millisecond) +} + +func TestBatcherDisabled(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.Enabled = false + b := NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 10}, + fakeBatchMergeFunc, fakeBatchSplitFunc) + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // should be sent right away because batching is disabled. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink}))) + assert.Equal(t, int64(1), sink.requestsCount.Load()) + assert.Equal(t, int64(8), sink.itemsCount.Load()) +} diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 8a68efcc617..ff8b5e04151 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -132,6 +132,15 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +func WithBatcher(batcher *Batcher) Option { + return func(o *baseExporter) { + if !o.requestExporter { + panic("batching is only available for the new request exporters") + } + o.batchSender = newBatchSender(o.set, batcher) + } +} + // baseExporter contains common fields between different exporter types. type baseExporter struct { component.StartFunc @@ -148,6 +157,7 @@ type baseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. + batchSender requestSender queueSender requestSender obsrepSender requestSender retrySender requestSender @@ -174,6 +184,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req unmarshaler: unmarshaler, signal: signal, + batchSender: &baseRequestSender{}, queueSender: &baseRequestSender{}, obsrepSender: osf(obsrep), retrySender: &baseRequestSender{}, @@ -193,11 +204,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req // send sends the request using the first sender in the chain. func (be *baseExporter) send(req internal.Request) error { - return be.queueSender.send(req) + return be.batchSender.send(req) } // connectSenders connects the senders in the predefined order. func (be *baseExporter) connectSenders() { + be.batchSender.setNextSender(be.queueSender) be.queueSender.setNextSender(be.obsrepSender) be.obsrepSender.setNextSender(be.retrySender) be.retrySender.setNextSender(be.timeoutSender) @@ -210,11 +222,19 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { } // If no error then start the queueSender. - return be.queueSender.start(ctx, host, be.set) + if err := be.queueSender.start(ctx, host, be.set); err != nil { + return err + } + + // Last start the batch sender. + return be.batchSender.start(ctx, host, be.set) } func (be *baseExporter) Shutdown(ctx context.Context) error { - // First shutdown the retry sender, so it can push any pending requests to back the queue. + // First shutdown the batch sender + be.batchSender.shutdown() + + // Then shutdown the retry sender, so it can push any pending requests to back the queue. be.retrySender.shutdown() // Then shutdown the queue sender. diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b098e722921..682d42a5072 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -98,6 +98,7 @@ func NewLogsExporter( lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ctx, ld, pusher) serr := be.send(req) + // TODO: Check for the queue overflow before converting the data. if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index e3f09f361c7..fee7326b9e1 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -150,6 +150,7 @@ func NewMetricsRequestExporter( } r := newRequest(ctx, req) sErr := be.send(r) + // TODO: Check for the queue overflow before converting the data. if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count())) } diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 6dd3f67800a..f924b0fc258 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,25 +5,80 @@ package exporterhelper import ( "context" + "sync/atomic" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) +type fakeRequestSink struct { + requestsCount *atomic.Int64 + itemsCount *atomic.Int64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: &atomic.Int64{}, + itemsCount: &atomic.Int64{}, + } +} + type fakeRequest struct { - items int - err error + items int + exportErr error + mergeErr error + splitErr error + batchID string + sink *fakeRequestSink } -func (r fakeRequest) Export(_ context.Context) error { - return r.err +func (r *fakeRequest) Export(_ context.Context) error { + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(int64(r.items)) + } + return nil } -func (r fakeRequest) ItemsCount() int { +func (r *fakeRequest) ItemsCount() int { return r.items } +func fakeBatchMergeFunc(_ context.Context, r1 Request, r2 Request) (Request, error) { + if r1 == nil { + return r2, nil + } + fr1 := r1.(*fakeRequest) + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{items: fr1.items + fr2.items, sink: fr1.sink}, nil +} + +func fakeBatchSplitFunc(_ context.Context, req Request, maxItems int) ([]Request, error) { + if maxItems == 0 { + return []Request{req}, nil + } + + fr := req.(*fakeRequest) + if fr.splitErr != nil { + return nil, fr.splitErr + } + + res := []Request{} + for fr.items > maxItems { + res = append(res, &fakeRequest{items: maxItems, sink: fr.sink}) + fr.items -= maxItems + } + res = append(res, &fakeRequest{items: fr.items, sink: fr.sink}) + return res, nil +} + type fakeRequestConverter struct { metricsError error tracesError error @@ -32,13 +87,13 @@ type fakeRequestConverter struct { } func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError + return &fakeRequest{items: md.DataPointCount(), exportErr: c.requestError}, c.metricsError } func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { - return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError + return &fakeRequest{items: td.SpanCount(), exportErr: c.requestError}, c.tracesError } func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { - return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError + return &fakeRequest{items: ld.LogRecordCount(), exportErr: c.requestError}, c.logsError } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 4b9e397ec43..b6439a2dd9f 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -150,6 +150,7 @@ func NewTracesRequestExporter( } r := newRequest(ctx, req) sErr := be.send(r) + // TODO: Check for the queue overflow before converting the data. if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count())) } diff --git a/internal/tools/go.mod b/internal/tools/go.mod index 38386fff81b..e19ac5198d3 100644 --- a/internal/tools/go.mod +++ b/internal/tools/go.mod @@ -1,6 +1,8 @@ module go.opentelemetry.io/collector/internal/tools -go 1.20 +go 1.21 + +toolchain go1.21.0 require ( github.com/a8m/envsubst v1.4.2