Skip to content

Commit

Permalink
[otelarrowexporter] Support the new experimental batcher (#34802)
Browse files Browse the repository at this point in the history
**Description:** Add exportbatcher.BatcherConfig to OTel-Arrow exporter.
Follows
open-telemetry/opentelemetry-collector#10846 as
we intend to maintain parity between OTLP/gRPC and OTel-Arrow exporters.

**Link to tracking Issue:** 

**Testing:** ✅ 

**Documentation:** README updated with reference to new batcher and
[concurrent batch
processor](https://github.com/open-telemetry/otel-arrow/tree/main/collector/processor/concurrentbatchprocessor)
in the otel-arrow repo.

---------

Co-authored-by: Matthew Wear <matthew.wear@gmail.com>
  • Loading branch information
jmacd and mwear authored Sep 11, 2024
1 parent cb7f220 commit caa86cf
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-batcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add BatcherConfig field following similar in OTLP exporter.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34802]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
44 changes: 44 additions & 0 deletions exporter/otelarrowexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,47 @@ exporters:
zstd:
level: 1 # 1 is the "fastest" compression level
```

### Batching Configuration

This exporter includes a new, experimental `batcher` configuration for
batching in the `exporterhelper` module, but this mode is disabled by
default. This batching support works when combined with
`queue_sender` functionality.

```
exporters:
otelarrow:
batcher:
enabled: true
sending_queue:
enabled: true
storage: file_storage/otc
extensions:
file_storage/otc:
directory: /var/lib/storage/otc
```
The built-in batcher is only recommended with a persistent queue,
otherwise it cannot provide back-pressure to the caller. If building
a custom build of the OpenTelemetry Collector, we recommend using the
[Concurrent Batch
Processor](https://github.com/open-telemetry/otel-arrow/blob/main/collector/processor/concurrentbatchprocessor/README.md)
to provide simultaneous back-pressure, concurrency, and batching
functionality. See [more discussion on this
issue](https://github.com/open-telemetry/opentelemetry-collector/issues/10368).
```
exporters:
otelarrow:
batcher:
enabled: false
sending_queue:
enabled: false
processors:
concurrentbatch:
send_batch_max_size: 1500
send_batch_size: 1000
timeout: 1s
max_in_flight_size_mib: 128
```
5 changes: 5 additions & 0 deletions exporter/otelarrowexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"google.golang.org/grpc"

Expand All @@ -32,6 +33,10 @@ type Config struct {

configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// Experimental: This configuration is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved
BatcherConfig exporterbatcher.Config `mapstructure:"batcher"`

// Arrow includes settings specific to OTel Arrow.
Arrow ArrowConfig `mapstructure:"arrow"`

Expand Down
11 changes: 11 additions & 0 deletions exporter/otelarrowexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/arrow"
Expand Down Expand Up @@ -84,6 +85,16 @@ func TestUnmarshalConfig(t *testing.T) {
BalancerName: "experimental",
Auth: &configauth.Authentication{AuthenticatorID: component.NewID(component.MustNewType("nop"))},
},
BatcherConfig: exporterbatcher.Config{
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
MinSizeConfig: exporterbatcher.MinSizeConfig{
MinSizeItems: 1000,
},
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
MaxSizeItems: 10000,
},
},
Arrow: ArrowConfig{
NumStreams: 2,
MaxStreamLifetime: 2 * time.Hour,
Expand Down
6 changes: 6 additions & 0 deletions exporter/otelarrowexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"google.golang.org/grpc"

Expand All @@ -37,10 +38,14 @@ func NewFactory() exporter.Factory {
}

func createDefaultConfig() component.Config {
batcherCfg := exporterbatcher.NewDefaultConfig()
batcherCfg.Enabled = false

return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BatcherConfig: batcherCfg,
ClientConfig: configgrpc.ClientConfig{
Headers: map[string]configopaque.String{},
// Default to zstd compression
Expand Down Expand Up @@ -74,6 +79,7 @@ func (exp *baseExporter) helperOptions() []exporterhelper.Option {
exporterhelper.WithRetry(exp.config.RetryConfig),
exporterhelper.WithQueue(exp.config.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithBatcher(exp.config.BatcherConfig),
exporterhelper.WithShutdown(exp.shutdown),
}
}
Expand Down
5 changes: 5 additions & 0 deletions exporter/otelarrowexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ keepalive:
timeout: 30s
permit_without_stream: true
balancer_name: "experimental"
batcher:
enabled: true
flush_timeout: 200ms
min_size_items: 1000
max_size_items: 10000
arrow:
num_streams: 2
disabled: false
Expand Down

0 comments on commit caa86cf

Please sign in to comment.