From caa86cf65f53e30c5a855690f4e72362a352a93f Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 11 Sep 2024 00:52:51 -0700 Subject: [PATCH] [otelarrowexporter] Support the new experimental batcher (#34802) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** Add exportbatcher.BatcherConfig to OTel-Arrow exporter. Follows https://github.com/open-telemetry/opentelemetry-collector/pull/10846 as we intend to maintain parity between OTLP/gRPC and OTel-Arrow exporters. **Link to tracking Issue:** **Testing:** ✅ **Documentation:** README updated with reference to new batcher and [concurrent batch processor](https://github.com/open-telemetry/otel-arrow/tree/main/collector/processor/concurrentbatchprocessor) in the otel-arrow repo. --------- Co-authored-by: Matthew Wear --- .chloggen/otelarrow-batcher.yaml | 27 ++++++++++++ exporter/otelarrowexporter/README.md | 44 +++++++++++++++++++ exporter/otelarrowexporter/config.go | 5 +++ exporter/otelarrowexporter/config_test.go | 11 +++++ exporter/otelarrowexporter/factory.go | 6 +++ .../otelarrowexporter/testdata/config.yaml | 5 +++ 6 files changed, 98 insertions(+) create mode 100644 .chloggen/otelarrow-batcher.yaml diff --git a/.chloggen/otelarrow-batcher.yaml b/.chloggen/otelarrow-batcher.yaml new file mode 100644 index 000000000000..e4e5161069da --- /dev/null +++ b/.chloggen/otelarrow-batcher.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add BatcherConfig field following similar in OTLP exporter. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34802] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/otelarrowexporter/README.md b/exporter/otelarrowexporter/README.md index 7fa7cdd3b904..4b3bcb0e4fd7 100644 --- a/exporter/otelarrowexporter/README.md +++ b/exporter/otelarrowexporter/README.md @@ -241,3 +241,47 @@ exporters: zstd: level: 1 # 1 is the "fastest" compression level ``` + +### Batching Configuration + +This exporter includes a new, experimental `batcher` configuration for +batching in the `exporterhelper` module, but this mode is disabled by +default. This batching support works when combined with +`queue_sender` functionality. + +``` +exporters: + otelarrow: + batcher: + enabled: true + sending_queue: + enabled: true + storage: file_storage/otc +extensions: + file_storage/otc: + directory: /var/lib/storage/otc +``` + +The built-in batcher is only recommended with a persistent queue, +otherwise it cannot provide back-pressure to the caller. If building +a custom build of the OpenTelemetry Collector, we recommend using the +[Concurrent Batch +Processor](https://github.com/open-telemetry/otel-arrow/blob/main/collector/processor/concurrentbatchprocessor/README.md) +to provide simultaneous back-pressure, concurrency, and batching +functionality. See [more discussion on this +issue](https://github.com/open-telemetry/opentelemetry-collector/issues/10368). + +``` +exporters: + otelarrow: + batcher: + enabled: false + sending_queue: + enabled: false +processors: + concurrentbatch: + send_batch_max_size: 1500 + send_batch_size: 1000 + timeout: 1s + max_in_flight_size_mib: 128 +``` diff --git a/exporter/otelarrowexporter/config.go b/exporter/otelarrowexporter/config.go index cddb25d7b777..29332dbe43d5 100644 --- a/exporter/otelarrowexporter/config.go +++ b/exporter/otelarrowexporter/config.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" "google.golang.org/grpc" @@ -32,6 +33,10 @@ type Config struct { configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + // Experimental: This configuration is at the early stage of development and may change without backward compatibility + // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved + BatcherConfig exporterbatcher.Config `mapstructure:"batcher"` + // Arrow includes settings specific to OTel Arrow. Arrow ArrowConfig `mapstructure:"arrow"` diff --git a/exporter/otelarrowexporter/config_test.go b/exporter/otelarrowexporter/config_test.go index 9bd63f546147..9903534f13a1 100644 --- a/exporter/otelarrowexporter/config_test.go +++ b/exporter/otelarrowexporter/config_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/arrow" @@ -84,6 +85,16 @@ func TestUnmarshalConfig(t *testing.T) { BalancerName: "experimental", Auth: &configauth.Authentication{AuthenticatorID: component.NewID(component.MustNewType("nop"))}, }, + BatcherConfig: exporterbatcher.Config{ + Enabled: true, + FlushTimeout: 200 * time.Millisecond, + MinSizeConfig: exporterbatcher.MinSizeConfig{ + MinSizeItems: 1000, + }, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{ + MaxSizeItems: 10000, + }, + }, Arrow: ArrowConfig{ NumStreams: 2, MaxStreamLifetime: 2 * time.Hour, diff --git a/exporter/otelarrowexporter/factory.go b/exporter/otelarrowexporter/factory.go index 974d9c544007..15df576f21ec 100644 --- a/exporter/otelarrowexporter/factory.go +++ b/exporter/otelarrowexporter/factory.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" "google.golang.org/grpc" @@ -37,10 +38,14 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { + batcherCfg := exporterbatcher.NewDefaultConfig() + batcherCfg.Enabled = false + return &Config{ TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), RetryConfig: configretry.NewDefaultBackOffConfig(), QueueSettings: exporterhelper.NewDefaultQueueSettings(), + BatcherConfig: batcherCfg, ClientConfig: configgrpc.ClientConfig{ Headers: map[string]configopaque.String{}, // Default to zstd compression @@ -74,6 +79,7 @@ func (exp *baseExporter) helperOptions() []exporterhelper.Option { exporterhelper.WithRetry(exp.config.RetryConfig), exporterhelper.WithQueue(exp.config.QueueSettings), exporterhelper.WithStart(exp.start), + exporterhelper.WithBatcher(exp.config.BatcherConfig), exporterhelper.WithShutdown(exp.shutdown), } } diff --git a/exporter/otelarrowexporter/testdata/config.yaml b/exporter/otelarrowexporter/testdata/config.yaml index de3d86626582..76cc6df400e7 100644 --- a/exporter/otelarrowexporter/testdata/config.yaml +++ b/exporter/otelarrowexporter/testdata/config.yaml @@ -25,6 +25,11 @@ keepalive: timeout: 30s permit_without_stream: true balancer_name: "experimental" +batcher: + enabled: true + flush_timeout: 200ms + min_size_items: 1000 + max_size_items: 10000 arrow: num_streams: 2 disabled: false