diff --git a/.chloggen/batch-exporter-helper.yaml b/.chloggen/batch-exporter-helper.yaml new file mode 100755 index 00000000000..613cfbac5b2 --- /dev/null +++ b/.chloggen/batch-exporter-helper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add experimental batching capabilities to the exporter helper + +# One or more tracking issues or pull requests related to the change +issues: [8122] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go new file mode 100644 index 00000000000..476f91887fb --- /dev/null +++ b/exporter/exporterhelper/batch_sender.go @@ -0,0 +1,461 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "runtime" + "sync" + "time" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// MergeBatcherConfig defines a basic configuration for batching requests based on a timeout and a minimum number of +// items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MergeBatcherConfig struct { + // Enabled indicates whether to not enqueue batches before sending to the consumerSender. + Enabled bool `mapstructure:"enabled"` + + // Timeout sets the time after which a batch will be sent regardless of its size. + // When this is set to zero, data will be sent immediately. + // This is a recommended option, as it will ensure that the data is sent in a timely manner. + Timeout time.Duration `mapstructure:"timeout"` // Is there a better name to avoid confusion with the consumerSender timeout? + + // MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be + // sent regardless of the timeout. There is no guarantee that the batch size always greater than this value. + // This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored. + MinSizeItems int `mapstructure:"min_size_items"` +} + +func (c MergeBatcherConfig) Validate() error { + if c.MinSizeItems < 0 { + return errors.New("min_size_items must be greater than or equal to zero") + } + if c.Timeout < 0 { + return errors.New("timeout must be greater than or equal to zero") + } + return nil +} + +func NewDefaultMergeBatcherConfig() MergeBatcherConfig { + return MergeBatcherConfig{ + Enabled: true, + Timeout: 200 * time.Millisecond, + MinSizeItems: 8192, + } +} + +// MergeSplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and +// minimum and maximum number of items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type MergeSplitBatcherConfig struct { + MergeBatcherConfig `mapstructure:",squash"` + // MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP, + // but can be anything else for other formats. If the batch size exceeds this value, + // it will be broken up into smaller batches if possible. + // Setting this value to zero disables the maximum size limit. + MaxSizeItems int `mapstructure:"max_size_items"` +} + +func (c MergeSplitBatcherConfig) Validate() error { + if c.MaxSizeItems < 0 { + return errors.New("max_size_items must be greater than or equal to zero") + } + if c.MaxSizeItems < c.MinSizeItems { + return errors.New("max_size_items must be greater than or equal to min_size_items") + } + return nil +} + +// BatchConfigBatchersLimit defines batching configuration part for setting a maximum number of batchers. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchConfigBatchersLimit struct { + // BatchersLimit is the maximum number of batchers that can be used for batching. + // Requests producing batch identifiers that exceed this limit will be dropped. + // If this value is zero, then there is no limit on the number of batchers. + BatchersLimit int `mapstructure:"batchers_limit"` +} + +// BatchMergeFunc is a function that merges two requests into a single request. +// Context will be propagated from the first request. If you want to separate batches based on the context, +// use WithRequestBatchIdentifier option. Context will be propagated from the first request. +// Do not mutate the requests passed to the function if error can be returned after mutation. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeFunc func(context.Context, Request, Request) (Request, error) + +// BatchMergeSplitFunc is a function that merge and/or splits a request into multiple requests based on the provided +// limit of maximum number of items. All the returned requests MUST have a number of items that does not exceed the +// maximum number of items. Size of the last returned request MUST be less or equal than the size of any other returned +// request. The original request MUST not be mutated if error is returned. The length of the returned slice MUST not +// be 0. The optionalReq argument can be nil, make sure to check it before using. maxItems argument is guaranteed to be +// greater than 0. Context will be propagated from the original request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchMergeSplitFunc func(ctx context.Context, optionalReq Request, req Request, maxItems int) ([]Request, error) + +// IdentifyBatchFunc returns an identifier for a Request batch. This function can be used to separate particular +// Requests into different batches. Batches with different identifiers will not be merged together. +// Provided context can be used to extract information from the context and use it as a part of the identifier as well. +// This function is optional. If not provided, all Requests will be batched together. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type IdentifyBatchFunc func(ctx context.Context, r Request) string + +type shardGetter interface { + shard(*request) (*shard, error) +} + +type Batcher struct { + cfg MergeSplitBatcherConfig + mergeFunc BatchMergeFunc + mergeSplitFunc BatchMergeSplitFunc + + shardGetter + + shutdownCh chan struct{} + goroutines sync.WaitGroup + logger *zap.Logger + + export func(*request) error +} + +// shutdown is invoked during service shutdown. +func (b *Batcher) shutdown() { + close(b.shutdownCh) + + // Wait until all goroutines are done. + b.goroutines.Wait() +} + +// newShard gets or creates a batcher corresponding with attrs. +func (b *Batcher) newShard() *shard { + s := &shard{ + batcher: b, + newRequest: make(chan *request, runtime.NumCPU()), + } + b.goroutines.Add(1) + go s.start() + return s +} + +func (b *Batcher) mergeRequests(req1 *request, req2 *request) (*request, error) { + r, err := b.mergeFunc(req1.Context(), req1.Request, req2.Request) + if err != nil { + return nil, err + } + return &request{ + baseRequest: baseRequest{ctx: req1.Context()}, + Request: r, + }, nil +} + +func (b *Batcher) splitMergeRequest(optReq *request, req *request, maxItems int) ([]*request, error) { + var optionalReq Request + if optReq != nil { + optionalReq = optReq.Request + } + rs, err := b.mergeSplitFunc(req.Context(), optionalReq, req.Request, maxItems) + if err != nil { + return nil, err + } + if len(rs) == 0 { + return nil, errors.New("empty requests slice returned") + + } + + reqs := make([]*request, 0, len(rs)) + for _, r := range rs { + reqs = append(reqs, &request{ + baseRequest: baseRequest{ctx: req.Context()}, + Request: r, + }) + } + return reqs, nil +} + +// NewMergeBatcher creates a new Batcher that merges requests. +func NewMergeBatcher(cfg MergeBatcherConfig, mf BatchMergeFunc, opts ...BatcherOption) *Batcher { + return newBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: cfg}, mf, nil, opts...) +} + +// NewMergeSplitBatcher creates a new Batcher that merges and splits requests. +// It requires both BatchMergeFunc and BatchMergeSplitFunc to be provided. BatchMergeFunc will be used when splitting +// is not needed based on the provided configuration. BatchMergeSplitFunc will be used when splitting is needed. +func NewMergeSplitBatcher(cfg MergeSplitBatcherConfig, mf BatchMergeFunc, msf BatchMergeSplitFunc, opts ...BatcherOption) *Batcher { + return newBatcher(cfg, mf, msf, opts...) +} + +func newBatcher(cfg MergeSplitBatcherConfig, mf BatchMergeFunc, msf BatchMergeSplitFunc, opts ...BatcherOption) *Batcher { + if !cfg.Enabled { + return nil + } + + b := &Batcher{ + cfg: cfg, + mergeFunc: mf, + mergeSplitFunc: msf, + shutdownCh: make(chan struct{}), + } + for _, op := range opts { + op(b) + } + if b.shardGetter == nil { + b.shardGetter = &singleShardGetter{singleShard: b.newShard()} + } + return b +} + +type BatcherOption func(*Batcher) + +func WithRequestBatchIdentifier(f IdentifyBatchFunc, bl BatchConfigBatchersLimit) BatcherOption { + return func(b *Batcher) { + b.shardGetter = &multiShardGetter{ + identifyBatchFunc: f, + batchersLimit: bl.BatchersLimit, + shardFactory: b.newShard, + } + } +} + +// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. +var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batch identifier combinations")) + +// batchSender is a component that accepts places requests into batches before passing them to the downstream senders. +// +// batch_processor implements consumer.Traces and consumer.Metrics +// +// Batches are sent out with any of the following conditions: +// - batch size reaches cfg.SendBatchSize +// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. +type batchSender struct { + baseRequestSender + + // batcher will be either *singletonBatcher or *multiBatcher + batcher *Batcher +} + +// newBatchSender returns a new batch consumer component. +func newBatchSender(set exporter.CreateSettings, b *Batcher) requestSender { + if b == nil { + return &baseRequestSender{} + } + b.logger = set.Logger + return &batchSender{ + batcher: b, + } +} + +// start is invoked during service startup. +func (bs *batchSender) start(_ context.Context, _ component.Host, _ exporter.CreateSettings) error { + bs.batcher.export = func(req *request) error { + return bs.nextSender.send(req) + } + return nil +} + +func (bs *batchSender) send(req internal.Request) error { + s, err := bs.batcher.shard(req.(*request)) + if err != nil { + return err + } + + // For now the batcher can work asyncronously only. Potentially we can + // add a sync mode later. + s.newRequest <- req.(*request) + return nil +} + +func (bs *batchSender) shutdown() { + bs.batcher.shutdown() +} + +// shard is a single instance of the batch logic. When metadata +// keys are in use, one of these is created per distinct combination +// of values. +type shard struct { + batcher *Batcher + + // timer informs the shard send a batch. + timer *time.Timer + + // newRequest is used to receive batches from producers. + newRequest chan *request + + // batch is an in-flight data item containing one of the + // underlying data types. + batch *request + + processRequest func(req *request) +} + +func (s *shard) start() { + defer s.batcher.goroutines.Done() + + var timerCh <-chan time.Time + if s.batcher.cfg.Timeout != 0 { + s.timer = time.NewTimer(s.batcher.cfg.Timeout) + timerCh = s.timer.C + } + + s.processRequest = s.processRequestFunc() + + for { + select { + case <-s.batcher.shutdownCh: + DONE: + for { + select { + case req := <-s.newRequest: + s.processRequest(req) + default: + break DONE + } + } + // This is the close of the channel + if s.batch != nil && s.batch.Count() > 0 { + s.export(s.batch) + } + return + case req := <-s.newRequest: + s.processRequest(req) + case <-timerCh: + if s.batch != nil && s.batch.Count() > 0 { + s.export(s.batch) + } + s.resetTimer() + } + } +} + +// processRequestFunc return a function to used for requests processing based on the batcher configuration. +func (s *shard) processRequestFunc() func(req *request) { + if s.batcher.cfg.MaxSizeItems == 0 { + if s.batcher.cfg.MinSizeItems == 0 || !s.hasTimer() { + return s.processRequestPassThrough + } + return s.processRequestMerge + } + return s.processRequestMergeSplit +} + +func (s *shard) processRequestPassThrough(req *request) { + s.export(req) +} + +func (s *shard) processRequestMerge(req *request) { + if s.batch == nil { + s.batch = req + } else { + mergedReq, err := s.batcher.mergeRequests(s.batch, req) + if err != nil { + s.batcher.logger.Error("failed to merge requests", zap.Error(err)) + return + } + s.batch = mergedReq + } + s.exporterBatchIfMinSizeReached() +} + +func (s *shard) processRequestMergeSplit(req *request) { + reqs, err := s.batcher.splitMergeRequest(s.batch, req, s.batcher.cfg.MaxSizeItems) + if err != nil { + s.batcher.logger.Error("failed to split request", zap.Error(err)) + return + } + + for _, r := range reqs[:len(reqs)-1] { + s.export(r) + } + s.batch = reqs[len(reqs)-1] + s.exporterBatchIfMinSizeReached() +} + +func (s *shard) exporterBatchIfMinSizeReached() { + if s.batch.Count() >= s.batcher.cfg.MinSizeItems || !s.hasTimer() { + s.export(s.batch) + s.batch = nil + s.stopTimer() + s.resetTimer() + } +} + +func (s *shard) export(req *request) { + err := s.batcher.export(req) + // TODO: Replace with metrics and logging. + if err != nil { + s.batcher.logger.Error("Failed to send batch", zap.Error(err)) + } +} + +func (s *shard) hasTimer() bool { + return s.timer != nil +} + +func (s *shard) stopTimer() { + if s.hasTimer() && !s.timer.Stop() { + <-s.timer.C + } +} + +func (s *shard) resetTimer() { + if s.hasTimer() { + s.timer.Reset(s.batcher.cfg.Timeout) + } +} + +// singleShardGetter is used when metadataKeys is empty, to avoid the +// additional lock and map operations used in multiBatcher. +type singleShardGetter struct { + singleShard *shard +} + +func (sb *singleShardGetter) shard(_ *request) (*shard, error) { + return sb.singleShard, nil +} + +// multiBatcher is used when metadataKeys is not empty. +type multiShardGetter struct { + identifyBatchFunc IdentifyBatchFunc + batchersLimit int + batchers sync.Map + shardFactory func() *shard + + // Guards the size and the storing logic to ensure no more than limit items are stored. + // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. + lock sync.Mutex + size int +} + +func (mb *multiShardGetter) shard(req *request) (*shard, error) { + id := mb.identifyBatchFunc(req.Context(), req.Request) + s, ok := mb.batchers.Load(id) + if ok { + return s.(*shard), nil + } + + mb.lock.Lock() + defer mb.lock.Unlock() + + if mb.batchersLimit != 0 && mb.size >= mb.batchersLimit { + return nil, errTooManyBatchers + } + + s, loaded := mb.batchers.LoadOrStore(id, mb.shardFactory()) + if !loaded { + mb.size++ + } + return s.(*shard), nil +} diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go new file mode 100644 index 00000000000..1c8ab546b75 --- /dev/null +++ b/exporter/exporterhelper/batch_sender_test.go @@ -0,0 +1,336 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestMergeBatcherConfigValidate(t *testing.T) { + cfg := NewDefaultMergeBatcherConfig() + assert.NoError(t, cfg.Validate()) + + cfg.MinSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero") + + cfg = NewDefaultMergeBatcherConfig() + cfg.Timeout = -1 + assert.EqualError(t, cfg.Validate(), "timeout must be greater than or equal to zero") +} + +func TestMergeSplitBatcherConfigValidate(t *testing.T) { + cfg := MergeSplitBatcherConfig{ + MergeBatcherConfig: NewDefaultMergeBatcherConfig(), + MaxSizeItems: 20000, + } + assert.NoError(t, cfg.Validate()) + + cfg.MaxSizeItems = -1 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero") + + cfg.MaxSizeItems = 1000 + assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items") +} + +func TestBatcherSingleShardMerge(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 10 + mergeCfg.Timeout = 100 * time.Millisecond + + tests := []struct { + name string + b *Batcher + }{ + { + name: "merge_only_batcher", + b: NewMergeBatcher(mergeCfg, fakeBatchMergeFunc), + }, + { + name: "merge_or_split_batcher", + b: NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 0}, + fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(tt.b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink}))) + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink}))) + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink}))) + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink, + mergeErr: errors.New("merge error")}))) + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 1, sink: sink}))) + + // the first two requests should be merged into one and sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 + }, 50*time.Millisecond, 10*time.Millisecond) + + // the third and fifth requests should be sent by reaching the timeout + // the fourth request should be ignored because of the merge error + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 + }, 150*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatcherMultiShardMerge(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 10 + mergeCfg.Timeout = 100 * time.Millisecond + batchIdentifierOpt := WithRequestBatchIdentifier(func(_ context.Context, req Request) string { + return req.(*fakeRequest).batchID + }, BatchConfigBatchersLimit{BatchersLimit: 2}) + + tests := []struct { + name string + b *Batcher + }{ + { + name: "merge_only_batcher", + b: NewMergeBatcher(mergeCfg, fakeBatchMergeFunc, batchIdentifierOpt), + }, + { + name: "merge_or_split_batcher", + b: NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 0}, + fakeBatchMergeFunc, fakeBatchMergeSplitFunc, batchIdentifierOpt), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(tt.b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink, + batchID: "1"}))) // batch 1 + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink, + batchID: "2"}))) // batch 2 + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink, + batchID: "1"}))) // batch 1 + + // batch 1 should be sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 + }, 50*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 9, sink: sink, + batchID: "2"}))) // batch 2 + require.Error(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink, + batchID: "3"}))) // batchers limit reached + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 2, sink: sink, + batchID: "2"}))) // batch 3 + + // batch 2 should be sent by reaching the minimum items size + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 23 + }, 50*time.Millisecond, 10*time.Millisecond) + + // batch 3 should be sent by reaching the timeout + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 + }, 150*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatchSenderShutdown(t *testing.T) { + batchCfg := NewDefaultMergeBatcherConfig() + batchCfg.MinSizeItems = 10 + b := NewMergeBatcher(batchCfg, fakeBatchMergeFunc) + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink}))) + + require.NoError(t, be.Shutdown(context.Background())) + + // shutdown should force sending the batch + assert.Equal(t, int64(1), sink.requestsCount.Load()) + assert.Equal(t, int64(3), sink.itemsCount.Load()) +} + +func TestBatcherSingleShardMergeOrSplit(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 5 + mergeCfg.Timeout = 100 * time.Millisecond + b := NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 10}, + fakeBatchMergeFunc, fakeBatchMergeSplitFunc) + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + // should be sent right away by reaching the minimum items size. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink}))) + + // should be left in the queue because it doesn't reach the minimum items size. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 3, sink: sink}))) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 50*time.Millisecond, 10*time.Millisecond) + + // big request should be merged with the second request and broken down into two requests + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 17, sink: sink}))) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 28 + }, 50*time.Millisecond, 10*time.Millisecond) + + // big request should be broken down into two requests, both are sent right away by reaching the minimum items size. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 17, sink: sink}))) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 45 + }, 50*time.Millisecond, 10*time.Millisecond) + + // request that cannot be split should be dropped. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 11, sink: sink, + splitErr: errors.New("split error")}))) + + // big request should be broken down into two requests, only the first one is sent right away by reaching the minimum items size. + // the second one should be sent by reaching the timeout. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 13, sink: sink}))) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 6 && sink.itemsCount.Load() == 55 + }, 50*time.Millisecond, 10*time.Millisecond) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 7 && sink.itemsCount.Load() == 58 + }, 150*time.Millisecond, 10*time.Millisecond) +} + +func TestBatcherNoTimeout(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.MinSizeItems = 5 + mergeCfg.Timeout = 0 + tests := []struct { + name string + b *Batcher + expectedBatches int64 + }{ + { + name: "with_split", + b: NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 10}, + fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + expectedBatches: 2, + }, + { + name: "no_split", + b: NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 0}, + fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + expectedBatches: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(tt.b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // should be sent almost right away as two requests even it doesn't reach the minimum items size. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 12, sink: sink}))) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == tt.expectedBatches && sink.itemsCount.Load() == 12 + }, 50*time.Millisecond, 10*time.Millisecond) + }) + } +} + +func TestBatcherDisabled(t *testing.T) { + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.Enabled = false + b := NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 10}, + fakeBatchMergeFunc, fakeBatchMergeSplitFunc) + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // should be sent right away because batching is disabled. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 8, sink: sink}))) + assert.Equal(t, int64(1), sink.requestsCount.Load()) + assert.Equal(t, int64(8), sink.itemsCount.Load()) +} + +func TestBatcherInvalidMergeSplitFunc(t *testing.T) { + invalidMergeSplitFunc := func(_ context.Context, _ Request, req2 Request, _ int) ([]Request, error) { + // reply with invalid 0 length slice if req2 is more than 20 items + if req2.(*fakeRequest).items > 20 { + return []Request{}, nil + } + // otherwise reply with a single request. + return []Request{req2}, nil + } + mergeCfg := NewDefaultMergeBatcherConfig() + mergeCfg.Timeout = 50 * time.Millisecond + b := NewMergeSplitBatcher(MergeSplitBatcherConfig{MergeBatcherConfig: mergeCfg, MaxSizeItems: 20}, + fakeBatchMergeFunc, invalidMergeSplitFunc) + be, err := newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender, WithBatcher(b)) + require.NotNil(t, be) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + // first request should be ignored due to invalid merge/split function. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 30, sink: sink}))) + // second request should be sent after reaching the timeout. + require.NoError(t, be.send(newRequest(context.Background(), &fakeRequest{items: 15, sink: sink}))) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 + }, 100*time.Millisecond, 10*time.Millisecond) +} diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index cc97cbc3db1..d506992f4e4 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -132,6 +132,15 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +func WithBatcher(batcher *Batcher) Option { + return func(o *baseExporter) { + if !o.requestExporter { + panic("batching is only available for the new request exporters") + } + o.batchSender = newBatchSender(o.set, batcher) + } +} + // baseExporter contains common fields between different exporter types. type baseExporter struct { component.StartFunc @@ -148,6 +157,7 @@ type baseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. + batchSender requestSender queueSender requestSender obsrepSender requestSender retrySender requestSender @@ -174,6 +184,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req unmarshaler: unmarshaler, signal: signal, + batchSender: &baseRequestSender{}, queueSender: &baseRequestSender{}, obsrepSender: osf(obsReport), retrySender: &baseRequestSender{}, @@ -193,11 +204,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req // send sends the request using the first sender in the chain. func (be *baseExporter) send(req internal.Request) error { - return be.queueSender.send(req) + return be.batchSender.send(req) } // connectSenders connects the senders in the predefined order. func (be *baseExporter) connectSenders() { + be.batchSender.setNextSender(be.queueSender) be.queueSender.setNextSender(be.obsrepSender) be.obsrepSender.setNextSender(be.retrySender) be.retrySender.setNextSender(be.timeoutSender) @@ -210,11 +222,19 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { } // If no error then start the queueSender. - return be.queueSender.start(ctx, host, be.set) + if err := be.queueSender.start(ctx, host, be.set); err != nil { + return err + } + + // Last start the batch sender. + return be.batchSender.start(ctx, host, be.set) } func (be *baseExporter) Shutdown(ctx context.Context) error { - // First shutdown the retry sender, so it can push any pending requests to back the queue. + // First shutdown the batch sender + be.batchSender.shutdown() + + // Then shutdown the retry sender, so it can push any pending requests to back the queue. be.retrySender.shutdown() // Then shutdown the queue sender. diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index d3ea32cea90..10597a72fec 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -98,6 +98,7 @@ func NewLogsExporter( lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ctx, ld, pusher) serr := be.send(req) + // TODO: Check for the queue overflow before converting the data. if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeLogs, int64(req.Count())) } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 4edd3997bb2..3bcb53f0281 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -150,6 +150,7 @@ func NewMetricsRequestExporter( } r := newRequest(ctx, req) sErr := be.send(r) + // TODO: Check for the queue overflow before converting the data. if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeMetrics, int64(r.Count())) } diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 6dd3f67800a..cb5fbfd37fb 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,25 +5,80 @@ package exporterhelper import ( "context" + "sync/atomic" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) +type fakeRequestSink struct { + requestsCount *atomic.Int64 + itemsCount *atomic.Int64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: &atomic.Int64{}, + itemsCount: &atomic.Int64{}, + } +} + type fakeRequest struct { - items int - err error + items int + exportErr error + mergeErr error + splitErr error + batchID string + sink *fakeRequestSink } -func (r fakeRequest) Export(_ context.Context) error { - return r.err +func (r *fakeRequest) Export(_ context.Context) error { + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(int64(r.items)) + } + return nil } -func (r fakeRequest) ItemsCount() int { +func (r *fakeRequest) ItemsCount() int { return r.items } +func fakeBatchMergeFunc(_ context.Context, r1 Request, r2 Request) (Request, error) { + if r1 == nil { + return r2, nil + } + fr1 := r1.(*fakeRequest) + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{items: fr1.items + fr2.items, sink: fr1.sink}, nil +} + +func fakeBatchMergeSplitFunc(_ context.Context, optionalReq Request, req Request, maxItems int) ([]Request, error) { + fr := req.(*fakeRequest) + if fr.splitErr != nil { + return nil, fr.splitErr + } + + if optionalReq != nil { + fr.items += optionalReq.(*fakeRequest).items + } + + res := []Request{} + for fr.items > maxItems { + res = append(res, &fakeRequest{items: maxItems, sink: fr.sink}) + fr.items -= maxItems + } + res = append(res, &fakeRequest{items: fr.items, sink: fr.sink}) + return res, nil +} + type fakeRequestConverter struct { metricsError error tracesError error @@ -32,13 +87,13 @@ type fakeRequestConverter struct { } func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError + return &fakeRequest{items: md.DataPointCount(), exportErr: c.requestError}, c.metricsError } func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { - return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError + return &fakeRequest{items: td.SpanCount(), exportErr: c.requestError}, c.tracesError } func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { - return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError + return &fakeRequest{items: ld.LogRecordCount(), exportErr: c.requestError}, c.logsError } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index bd9b6eb6ac8..f3d4038fea6 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -150,6 +150,7 @@ func NewTracesRequestExporter( } r := newRequest(ctx, req) sErr := be.send(r) + // TODO: Check for the queue overflow before converting the data. if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeTraces, int64(r.Count())) } diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index 64af2a951cb..fccbe2d974e 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.110.2 h1:sdFPBr6xG9/wkBbfhmUz/JmZC7X6LavQgcrVINrKiVA= cloud.google.com/go v0.110.2/go.mod h1:k04UEeEtb6ZBRTv3dZz4CeJC3jKGxyhl0sAiVVquxiw= +cloud.google.com/go v0.110.9/go.mod h1:rpxevX/0Lqvlbc88b7Sc1SPNdyK1riNBTUU6JXhYNpM= cloud.google.com/go/accessapproval v1.4.0/go.mod h1:zybIuC3KpDOvotz59lFe5qxRZx6C75OtwbisN56xYB4= cloud.google.com/go/accessapproval v1.5.0/go.mod h1:HFy3tuiGvMdcd/u+Cu5b9NkO1pEICJ46IR82PoUdplw= cloud.google.com/go/accessapproval v1.6.0/go.mod h1:R0EiYnwV5fsRFiKZkPHr6mwyk2wxUJ30nL4j2pcFY2E= @@ -130,6 +131,8 @@ cloud.google.com/go/compute v1.14.0/go.mod h1:YfLtxrj9sU4Yxv+sXzZkyPjEyPBZfXHUvj cloud.google.com/go/compute v1.15.1/go.mod h1:bjjoF/NtFUrkD/urWfdHaKuOPDR5nWIs63rR+SXhcpA= cloud.google.com/go/compute v1.18.0/go.mod h1:1X7yHxec2Ga+Ss6jPyjxRxpu2uu7PLgsOVXvgU0yacs= cloud.google.com/go/compute v1.19.0/go.mod h1:rikpw2y+UMidAe9tISo04EHNOIf42RLYF/q8Bs93scU= +cloud.google.com/go/compute v1.19.1/go.mod h1:6ylj3a05WF8leseCdIf77NK0g1ey+nj5IKd5/kvShxE= +cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI= cloud.google.com/go/compute v1.23.0 h1:tP41Zoavr8ptEqaW6j+LQOnyBBhO7OkOMAGrgLopTwY= cloud.google.com/go/compute/metadata v0.1.0/go.mod h1:Z1VN+bulIf6bt4P/C37K4DyZYZEXYonfTBHHFPO/4UU= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= @@ -476,6 +479,7 @@ cloud.google.com/go/storage v1.23.0/go.mod h1:vOEEDNFnciUMhBeT6hsJIn3ieU5cFRmzeL cloud.google.com/go/storage v1.27.0/go.mod h1:x9DOL8TK/ygDUMieqwfhdpQryTeEkhGKMi80i/iqR2s= cloud.google.com/go/storage v1.28.1/go.mod h1:Qnisd4CqDdo6BGs2AD5LLnEsmSQ80wQ5ogcBBKhU86Y= cloud.google.com/go/storage v1.29.0/go.mod h1:4puEjyTKnku6gfKoTfNOU/W+a9JyuVNxjpS5GBrB8h4= +cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7BiccwkR7+P7gN8E= cloud.google.com/go/storagetransfer v1.5.0/go.mod h1:dxNzUopWy7RQevYFHewchb29POFv3/AaBgnhqzqiK0w= cloud.google.com/go/storagetransfer v1.6.0/go.mod h1:y77xm4CQV/ZhFZH75PLEXY0ROiS7Gh6pSKrM8dJyg6I= cloud.google.com/go/storagetransfer v1.7.0/go.mod h1:8Giuj1QNb1kfLAiWM1bN6dHzfdlDAVC9rv9abHot2W4= @@ -586,6 +590,7 @@ github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -603,10 +608,12 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go. github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/go-control-plane v0.10.3/go.mod h1:fJJn/j26vwOu972OllsvAgJJM//w9BV6Fxbg2LuVd34= github.com/envoyproxy/go-control-plane v0.11.0/go.mod h1:VnHyVMpzcLvCFt9yUz1UnCwHLhwx1WguiVDV7pTG/tI= +github.com/envoyproxy/go-control-plane v0.11.1-0.20230524094728-9239064ad72f/go.mod h1:sfYdkwUW4BA3PbKjySwjJy+O4Pu0h62rlqCMHNk+K+Q= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= +github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= @@ -684,6 +691,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/s2a-go v0.1.0/go.mod h1:OJpEgntRZo8ugHpF9hkoLJbS5dSI20XZeXJ9JVywLlM= +github.com/google/s2a-go v0.1.3/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= +github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= @@ -691,6 +700,7 @@ github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= +github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM= @@ -702,6 +712,8 @@ github.com/googleapis/gax-go/v2 v2.6.0/go.mod h1:1mjbznJAPHFpesgE5ucqfYEscaz5kMd github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8= github.com/googleapis/gax-go/v2 v2.7.1/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI= github.com/googleapis/gax-go/v2 v2.8.0/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI= +github.com/googleapis/gax-go/v2 v2.10.0/go.mod h1:4UOEnMCrxsSqQ940WnTiD6qJ63le2ev3xfyagutxiPw= +github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= @@ -804,6 +816,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -849,8 +862,11 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -910,6 +926,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -930,6 +947,7 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -953,6 +971,7 @@ golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I= golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= +golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -968,6 +987,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1019,6 +1039,7 @@ golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -1030,6 +1051,8 @@ golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -1121,6 +1144,9 @@ google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60c google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0= google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= google.golang.org/api v0.118.0/go.mod h1:76TtD3vkgmZ66zZzp72bUUklpmQmKlhh6sYtIjYK+5E= +google.golang.org/api v0.122.0/go.mod h1:gcitW0lvnyWjSp9nKxAbdHKIZ6vF4aajGueeslZOyms= +google.golang.org/api v0.124.0/go.mod h1:xu2HQurE5gi/3t1aFCvhPD781p0a3p11sdunTJ2BlP4= +google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -1214,6 +1240,17 @@ google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea/go.mod h1:UUQDJDOl google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/genproto v0.0.0-20230525234025-438c736192d0/go.mod h1:9ExIQyXL5hZrHzQceCwuSYwZZ5QZBazOcprJ5rgs3lY= +google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= +google.golang.org/genproto/googleapis/api v0.0.0-20230525234020-1aefcd67740a/go.mod h1:ts19tUU+Z0ZShN1y3aPyq2+O3d5FUNNgT6FtOzmrNn8= +google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/api v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/bytestream v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:ylj+BE99M198VPbBh6A8d9n3w8fChvyLK3wwBOjXBFA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234015-3fc162c6f38a/go.mod h1:xURIpW9ES5+/GZhnV6beoEtxQrnkRGIfP5VQG2tCBLc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c= google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1244,9 +1281,11 @@ google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCD google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= +google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= +google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=