diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff52375529..399ae020199 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Move BigEndian helper functions in `tracetranslator` to an internal package.(#3298) - Rename `configtest.LoadConfigFile` to `configtest.LoadConfigAndValidate` (#3306) - Replace `ExtensionCreateParams` with `ExtensionCreateSettings` (#3294) +- Change `batchprocessor` logic to limit data points rather than metrics (#3141) ## 💡 Enhancements 💡 diff --git a/processor/batchprocessor/README.md b/processor/batchprocessor/README.md index 03de2797dea..5b69aa0166f 100644 --- a/processor/batchprocessor/README.md +++ b/processor/batchprocessor/README.md @@ -15,8 +15,8 @@ any data drops such as sampling. Please refer to [config.go](./config.go) for the config spec. The following configuration options can be modified: -- `send_batch_size` (default = 8192): Number of spans or metrics after which a -batch will be sent regardless of the timeout. +- `send_batch_size` (default = 8192): Number of spans, metric data points, or log +records after which a batch will be sent regardless of the timeout. - `timeout` (default = 200ms): Time duration after which a batch will be sent regardless of size. - `send_batch_max_size` (default = 0): The upper limit of the batch size. diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 91d40f6e83b..0d134026727 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -263,9 +263,9 @@ func (bt *batchTraces) size() int { } type batchMetrics struct { - nextConsumer consumer.Metrics - metricData pdata.Metrics - metricCount int + nextConsumer consumer.Metrics + metricData pdata.Metrics + dataPointCount int } func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { @@ -274,19 +274,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error { var req pdata.Metrics - if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize { + if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize { req = splitMetrics(sendBatchMaxSize, bm.metricData) - bm.metricCount -= sendBatchMaxSize + bm.dataPointCount -= sendBatchMaxSize } else { req = bm.metricData bm.metricData = pdata.NewMetrics() - bm.metricCount = 0 + bm.dataPointCount = 0 } return bm.nextConsumer.ConsumeMetrics(ctx, req) } func (bm *batchMetrics) itemCount() int { - return bm.metricCount + return bm.dataPointCount } func (bm *batchMetrics) size() int { @@ -296,11 +296,11 @@ func (bm *batchMetrics) size() int { func (bm *batchMetrics) add(item interface{}) { md := item.(pdata.Metrics) - newMetricsCount := md.MetricCount() - if newMetricsCount == 0 { + _, newDataPointCount := md.MetricAndDataPointCount() + if newDataPointCount == 0 { return } - bm.metricCount += newMetricsCount + bm.dataPointCount += newDataPointCount md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics()) } diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index c6a4cf8f6d2..b7ee6fc4070 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -17,6 +17,7 @@ package batchprocessor import ( "context" "fmt" + "sync" "testing" "time" @@ -29,6 +30,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/testdata" @@ -320,6 +322,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) { requestCount := 100 metricsPerRequest := 5 + dataPointsPerMetric := 2 // Since the int counter uses two datapoints. + dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric sink := new(consumertest.MetricsSink) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} @@ -339,8 +343,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) { elapsed := time.Since(start) require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - expectedBatchesNum := requestCount * metricsPerRequest / int(cfg.SendBatchSize) - expectedBatchingFactor := int(cfg.SendBatchSize) / metricsPerRequest + expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize) + expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest require.Equal(t, requestCount*metricsPerRequest, sink.MetricsCount()) receivedMds := sink.AllMetrics() @@ -357,7 +361,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) { assert.Equal(t, 1, len(viewData)) distData := viewData[0].Data.(*view.DistributionData) assert.Equal(t, int64(expectedBatchesNum), distData.Count) - assert.Equal(t, sink.MetricsCount(), int(distData.Sum())) + assert.Equal(t, sink.MetricsCount()*dataPointsPerMetric, int(distData.Sum())) assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min)) assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max)) @@ -369,6 +373,23 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) { assert.Equal(t, size, int(distData.Sum())) } +func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { + ctx := context.Background() + sink := new(metricsSink) + metricsCount := 50 + dataPointsPerMetric := 2 + sendBatchMaxSize := 99 + + batchMetrics := newBatchMetrics(sink) + md := testdata.GenerateMetricsManyMetricsSameResource(metricsCount) + + batchMetrics.add(md) + require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount) + require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize)) + remainingDataPointsCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize + require.Equal(t, remainingDataPointsCount, batchMetrics.dataPointCount) +} + func TestBatchMetricsProcessor_Timeout(t *testing.T) { cfg := Config{ ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), @@ -502,6 +523,55 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) { } } +func BenchmarkBatchMetricProcessor(b *testing.B) { + b.StopTimer() + cfg := Config{ + ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, + } + ctx := context.Background() + sink := new(metricsSink) + createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} + metricsPerRequest := 1000 + + batcher, err := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) + require.NoError(b, err) + require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost())) + + mds := make([]pdata.Metrics, 0, b.N) + for n := 0; n < b.N; n++ { + mds = append(mds, + testdata.GenerateMetricsManyMetricsSameResource(metricsPerRequest), + ) + } + b.StartTimer() + for n := 0; n < b.N; n++ { + batcher.ConsumeMetrics(ctx, mds[n]) + } + b.StopTimer() + require.NoError(b, batcher.Shutdown(ctx)) + require.Equal(b, b.N*metricsPerRequest, sink.metricsCount) +} + +type metricsSink struct { + mu sync.Mutex + metricsCount int +} + +func (sme *metricsSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: false, + } +} + +func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { + sme.mu.Lock() + defer sme.mu.Unlock() + sme.metricsCount += md.MetricCount() + return nil +} + func TestBatchLogProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. diff --git a/processor/batchprocessor/splitmetrics.go b/processor/batchprocessor/splitmetrics.go index f78d07f5ce1..9fbacb77e4e 100644 --- a/processor/batchprocessor/splitmetrics.go +++ b/processor/batchprocessor/splitmetrics.go @@ -20,15 +20,16 @@ import ( // splitMetrics removes metrics from the input data and returns a new data of the specified size. func splitMetrics(size int, src pdata.Metrics) pdata.Metrics { - if src.MetricCount() <= size { + _, dataPoints := src.MetricAndDataPointCount() + if dataPoints <= size { return src } - totalCopiedMetrics := 0 + totalCopiedDataPoints := 0 dest := pdata.NewMetrics() src.ResourceMetrics().RemoveIf(func(srcRs pdata.ResourceMetrics) bool { // If we are done skip everything else. - if totalCopiedMetrics == size { + if totalCopiedDataPoints == size { return false } @@ -37,7 +38,7 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics { srcRs.InstrumentationLibraryMetrics().RemoveIf(func(srcIlm pdata.InstrumentationLibraryMetrics) bool { // If we are done skip everything else. - if totalCopiedMetrics == size { + if totalCopiedDataPoints == size { return false } @@ -45,21 +46,23 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics { srcIlm.InstrumentationLibrary().CopyTo(destIlm.InstrumentationLibrary()) // If possible to move all metrics do that. - srcMetricsLen := srcIlm.Metrics().Len() - if size-totalCopiedMetrics >= srcMetricsLen { - totalCopiedMetrics += srcMetricsLen + srcDataPointCount := metricSliceDataPointCount(srcIlm.Metrics()) + if size-totalCopiedDataPoints >= srcDataPointCount { + totalCopiedDataPoints += srcDataPointCount srcIlm.Metrics().MoveAndAppendTo(destIlm.Metrics()) return true } srcIlm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool { // If we are done skip everything else. - if totalCopiedMetrics == size { + if totalCopiedDataPoints == size { return false } - srcMetric.CopyTo(destIlm.Metrics().AppendEmpty()) - totalCopiedMetrics++ - return true + // If the metric has more data points than free slots we should split it. + newMetric, remove := splitMetric(srcMetric, size-totalCopiedDataPoints) + newMetric.CopyTo(destIlm.Metrics().AppendEmpty()) + totalCopiedDataPoints += metricDataPointCount(newMetric) + return remove }) return false }) @@ -68,3 +71,83 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics { return dest } + +// metricSliceDataPointCount calculates the total number of data points. +func metricSliceDataPointCount(ms pdata.MetricSlice) (dataPointCount int) { + for k := 0; k < ms.Len(); k++ { + dataPointCount += metricDataPointCount(ms.At(k)) + } + return +} + +// metricDataPointCount calculates the total number of data points. +func metricDataPointCount(ms pdata.Metric) (dataPointCount int) { + switch ms.DataType() { + case pdata.MetricDataTypeIntGauge: + dataPointCount = ms.IntGauge().DataPoints().Len() + case pdata.MetricDataTypeDoubleGauge: + dataPointCount = ms.DoubleGauge().DataPoints().Len() + case pdata.MetricDataTypeIntSum: + dataPointCount = ms.IntSum().DataPoints().Len() + case pdata.MetricDataTypeDoubleSum: + dataPointCount = ms.DoubleSum().DataPoints().Len() + case pdata.MetricDataTypeIntHistogram: + dataPointCount = ms.IntHistogram().DataPoints().Len() + case pdata.MetricDataTypeHistogram: + dataPointCount = ms.Histogram().DataPoints().Len() + case pdata.MetricDataTypeSummary: + dataPointCount = ms.Summary().DataPoints().Len() + } + return +} + +// splitMetric removes metric points from the input data and returns new data of the specified size +// and boolean describing, whether the metric should be removed from original slice. +func splitMetric(ms pdata.Metric, size int) (pdata.Metric, bool) { + if metricDataPointCount(ms) <= size { + return ms, true + } + + result := pdata.NewMetric() + ms.CopyTo(result) + msSize, i := metricDataPointCount(ms)-size, 0 + filterDataPoints := func() bool { i++; return i <= msSize } + switch ms.DataType() { + case pdata.MetricDataTypeIntGauge: + result.IntGauge().DataPoints().Resize(size) + ms.IntGauge().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool { + return filterDataPoints() + }) + case pdata.MetricDataTypeDoubleGauge: + result.DoubleGauge().DataPoints().Resize(size) + ms.DoubleGauge().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool { + return filterDataPoints() + }) + case pdata.MetricDataTypeIntSum: + result.IntSum().DataPoints().Resize(size) + ms.IntSum().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool { + return filterDataPoints() + }) + case pdata.MetricDataTypeDoubleSum: + result.DoubleSum().DataPoints().Resize(size) + ms.DoubleSum().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool { + return filterDataPoints() + }) + case pdata.MetricDataTypeIntHistogram: + result.IntHistogram().DataPoints().Resize(size) + ms.IntHistogram().DataPoints().RemoveIf(func(_ pdata.IntHistogramDataPoint) bool { + return filterDataPoints() + }) + case pdata.MetricDataTypeHistogram: + result.Histogram().DataPoints().Resize(size) + ms.Histogram().DataPoints().RemoveIf(func(_ pdata.HistogramDataPoint) bool { + return filterDataPoints() + }) + case pdata.MetricDataTypeSummary: + result.Summary().DataPoints().Resize(size) + ms.Summary().DataPoints().RemoveIf(func(_ pdata.SummaryDataPoint) bool { + return filterDataPoints() + }) + } + return result, false +} diff --git a/processor/batchprocessor/splitmetrics_test.go b/processor/batchprocessor/splitmetrics_test.go index 8d7e1c8743b..8d8780ffa1d 100644 --- a/processor/batchprocessor/splitmetrics_test.go +++ b/processor/batchprocessor/splitmetrics_test.go @@ -36,8 +36,10 @@ func TestSplitMetrics_noop(t *testing.T) { func TestSplitMetrics(t *testing.T) { md := testdata.GenerateMetricsManyMetricsSameResource(20) metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() + dataPointCount := metricDataPointCount(metrics.At(0)) for i := 0; i < metrics.Len(); i++ { metrics.At(i).SetName(getTestMetricName(0, i)) + assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i))) } cp := pdata.NewMetrics() cpMetrics := cp.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics() @@ -52,9 +54,10 @@ func TestSplitMetrics(t *testing.T) { metrics.At(3).CopyTo(cpMetrics.At(3)) metrics.At(4).CopyTo(cpMetrics.At(4)) - splitSize := 5 + splitMetricCount := 5 + splitSize := splitMetricCount * dataPointCount split := splitMetrics(splitSize, md) - assert.Equal(t, splitSize, split.MetricCount()) + assert.Equal(t, splitMetricCount, split.MetricCount()) assert.Equal(t, cp, split) assert.Equal(t, 15, md.MetricCount()) assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) @@ -79,8 +82,10 @@ func TestSplitMetrics(t *testing.T) { func TestSplitMetricsMultipleResourceSpans(t *testing.T) { md := testdata.GenerateMetricsManyMetricsSameResource(20) metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() + dataPointCount := metricDataPointCount(metrics.At(0)) for i := 0; i < metrics.Len(); i++ { metrics.At(i).SetName(getTestMetricName(0, i)) + assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i))) } // add second index to resource metrics testdata.GenerateMetricsManyMetricsSameResource(20). @@ -90,9 +95,10 @@ func TestSplitMetricsMultipleResourceSpans(t *testing.T) { metrics.At(i).SetName(getTestMetricName(1, i)) } - splitSize := 5 + splitMetricCount := 5 + splitSize := splitMetricCount * dataPointCount split := splitMetrics(splitSize, md) - assert.Equal(t, splitSize, split.MetricCount()) + assert.Equal(t, splitMetricCount, split.MetricCount()) assert.Equal(t, 35, md.MetricCount()) assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) @@ -101,8 +107,10 @@ func TestSplitMetricsMultipleResourceSpans(t *testing.T) { func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *testing.T) { td := testdata.GenerateMetricsManyMetricsSameResource(20) metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() + dataPointCount := metricDataPointCount(metrics.At(0)) for i := 0; i < metrics.Len(); i++ { metrics.At(i).SetName(getTestMetricName(0, i)) + assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i))) } td.ResourceMetrics().Resize(2) // add second index to resource metrics @@ -113,10 +121,11 @@ func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *tes metrics.At(i).SetName(getTestMetricName(1, i)) } - splitSize := 25 + splitMetricCount := 25 + splitSize := splitMetricCount * dataPointCount split := splitMetrics(splitSize, td) - assert.Equal(t, splitSize, split.MetricCount()) - assert.Equal(t, 40-splitSize, td.MetricCount()) + assert.Equal(t, splitMetricCount, split.MetricCount()) + assert.Equal(t, 40-splitMetricCount, td.MetricCount()) assert.Equal(t, 1, td.ResourceMetrics().Len()) assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name()) @@ -166,3 +175,61 @@ func BenchmarkCloneMetrics(b *testing.B) { } } } + +func TestSplitMetricsUneven(t *testing.T) { + md := testdata.GenerateMetricsManyMetricsSameResource(10) + metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() + dataPointCount := 2 + for i := 0; i < metrics.Len(); i++ { + metrics.At(i).SetName(getTestMetricName(0, i)) + assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i))) + } + + splitSize := 9 + split := splitMetrics(splitSize, md) + assert.Equal(t, 5, split.MetricCount()) + assert.Equal(t, 6, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) + + split = splitMetrics(splitSize, md) + assert.Equal(t, 5, split.MetricCount()) + assert.Equal(t, 1, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + assert.Equal(t, "test-metric-int-0-8", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name()) + + split = splitMetrics(splitSize, md) + assert.Equal(t, 1, split.MetricCount()) + assert.Equal(t, "test-metric-int-0-9", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) +} + +func TestSplitMetricsBatchSizeSmallerThanDataPointCount(t *testing.T) { + md := testdata.GenerateMetricsManyMetricsSameResource(2) + metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() + dataPointCount := 2 + for i := 0; i < metrics.Len(); i++ { + metrics.At(i).SetName(getTestMetricName(0, i)) + assert.Equal(t, dataPointCount, metricDataPointCount(metrics.At(i))) + } + + splitSize := 1 + split := splitMetrics(splitSize, md) + assert.Equal(t, 1, split.MetricCount()) + assert.Equal(t, 2, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + + split = splitMetrics(splitSize, md) + assert.Equal(t, 1, split.MetricCount()) + assert.Equal(t, 1, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + + split = splitMetrics(splitSize, md) + assert.Equal(t, 1, split.MetricCount()) + assert.Equal(t, 1, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-1", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) + + split = splitMetrics(splitSize, md) + assert.Equal(t, 1, split.MetricCount()) + assert.Equal(t, 1, md.MetricCount()) + assert.Equal(t, "test-metric-int-0-1", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name()) +}