diff --git a/CHANGELOG.md b/CHANGELOG.md index f69029dca3f..63237296913 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `TraceBasedFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747, #5862) +- Add `WithExportBufferSize` option to log batch processor.(#5877) ### Changed diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 197fcbad43d..28c969262b4 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -19,6 +19,7 @@ const ( dfltExpInterval = time.Second dfltExpTimeout = 30 * time.Second dfltExpMaxBatchSize = 512 + dfltExpBufferSize = 1 envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE" envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY" @@ -119,8 +120,7 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) b := &BatchProcessor{ - // TODO: explore making the size of this configurable. - exporter: newBufferExporter(exporter, 1), + exporter: newBufferExporter(exporter, cfg.expBufferSize.Value), q: newQueue(cfg.maxQSize.Value), batchSize: cfg.expMaxBatchSize.Value, @@ -349,6 +349,7 @@ type batchConfig struct { expInterval setting[time.Duration] expTimeout setting[time.Duration] expMaxBatchSize setting[int] + expBufferSize setting[int] } func newBatchConfig(options []BatchProcessorOption) batchConfig { @@ -382,6 +383,10 @@ func newBatchConfig(options []BatchProcessorOption) batchConfig { clampMax[int](c.maxQSize.Value), fallback[int](dfltExpMaxBatchSize), ) + c.expBufferSize = c.expBufferSize.Resolve( + clearLessThanOne[int](), + fallback[int](dfltExpBufferSize), + ) return c } @@ -458,3 +463,15 @@ func WithExportMaxBatchSize(size int) BatchProcessorOption { return cfg }) } + +// WithExportBufferSize sets the batch buffer size. +// Batches will be temporarily kept in a memory buffer until they are exported. +// +// By default, a value of 1 will be used. +// The default value is also used when the provided value is less than one. +func WithExportBufferSize(size int) BatchProcessorOption { + return batchOptionFunc(func(cfg batchConfig) batchConfig { + cfg.expBufferSize = newSetting(size) + return cfg + }) +} diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 05cdf0a13f6..b2e993a5bfa 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -70,6 +70,7 @@ func TestNewBatchConfig(t *testing.T) { expInterval: newSetting(dfltExpInterval), expTimeout: newSetting(dfltExpTimeout), expMaxBatchSize: newSetting(dfltExpMaxBatchSize), + expBufferSize: newSetting(dfltExpBufferSize), }, }, { @@ -79,12 +80,14 @@ func TestNewBatchConfig(t *testing.T) { WithExportInterval(time.Microsecond), WithExportTimeout(time.Hour), WithExportMaxBatchSize(2), + WithExportBufferSize(3), }, want: batchConfig{ maxQSize: newSetting(10), expInterval: newSetting(time.Microsecond), expTimeout: newSetting(time.Hour), expMaxBatchSize: newSetting(2), + expBufferSize: newSetting(3), }, }, { @@ -100,6 +103,7 @@ func TestNewBatchConfig(t *testing.T) { expInterval: newSetting(100 * time.Millisecond), expTimeout: newSetting(1000 * time.Millisecond), expMaxBatchSize: newSetting(1), + expBufferSize: newSetting(dfltExpBufferSize), }, }, { @@ -109,12 +113,14 @@ func TestNewBatchConfig(t *testing.T) { WithExportInterval(-1 * time.Microsecond), WithExportTimeout(-1 * time.Hour), WithExportMaxBatchSize(-2), + WithExportBufferSize(-2), }, want: batchConfig{ maxQSize: newSetting(dfltMaxQSize), expInterval: newSetting(dfltExpInterval), expTimeout: newSetting(dfltExpTimeout), expMaxBatchSize: newSetting(dfltExpMaxBatchSize), + expBufferSize: newSetting(dfltExpBufferSize), }, }, { @@ -130,6 +136,7 @@ func TestNewBatchConfig(t *testing.T) { expInterval: newSetting(dfltExpInterval), expTimeout: newSetting(dfltExpTimeout), expMaxBatchSize: newSetting(dfltExpMaxBatchSize), + expBufferSize: newSetting(dfltExpBufferSize), }, }, { @@ -146,12 +153,14 @@ func TestNewBatchConfig(t *testing.T) { WithExportInterval(time.Microsecond), WithExportTimeout(time.Hour), WithExportMaxBatchSize(2), + WithExportBufferSize(2), }, want: batchConfig{ maxQSize: newSetting(3), expInterval: newSetting(time.Microsecond), expTimeout: newSetting(time.Hour), expMaxBatchSize: newSetting(2), + expBufferSize: newSetting(2), }, }, { @@ -159,12 +168,14 @@ func TestNewBatchConfig(t *testing.T) { options: []BatchProcessorOption{ WithMaxQueueSize(1), WithExportMaxBatchSize(10), + WithExportBufferSize(3), }, want: batchConfig{ maxQSize: newSetting(1), expInterval: newSetting(dfltExpInterval), expTimeout: newSetting(dfltExpTimeout), expMaxBatchSize: newSetting(1), + expBufferSize: newSetting(3), }, }, }