From 7f680d9e471f4b67fd099d09461d6359a2e18022 Mon Sep 17 00:00:00 2001 From: Daniel Gutowski Date: Thu, 20 May 2021 08:50:30 +0000 Subject: [PATCH] Fix an issue with batchMaxSize --- processor/batchprocessor/batch_processor.go | 3 ++- .../batchprocessor/batch_processor_test.go | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 0d134026727..cd747f88799 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -276,7 +276,8 @@ func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error var req pdata.Metrics if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize { req = splitMetrics(sendBatchMaxSize, bm.metricData) - bm.dataPointCount -= sendBatchMaxSize + _, reqDataPointCount := req.MetricAndDataPointCount() + bm.dataPointCount -= reqDataPointCount } else { req = bm.metricData bm.metricData = pdata.NewMetrics() diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index ce93cda1e5d..ec252378332 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -373,6 +373,23 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) { assert.Equal(t, size, int(distData.Sum())) } +func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { + ctx := context.Background() + sink := new(metricsSink) + metricsCount := 50 + dataPointsPerMetric := 2 + sendBatchMaxSize := 99 + + batchMetrics := newBatchMetrics(sink) + md := testdata.GenerateMetricsManyMetricsSameResource(metricsCount) + + batchMetrics.add(md) + require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount) + require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize)) + remainingMetricsCount := (metricsCount - sendBatchMaxSize/dataPointsPerMetric) + require.Equal(t, remainingMetricsCount*dataPointsPerMetric, batchMetrics.dataPointCount) +} + func TestBatchMetricsProcessor_Timeout(t *testing.T) { cfg := Config{ ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),