Skip to content

Commit

Permalink
Fix an issue with batchMaxSize
Browse files Browse the repository at this point in the history
  • Loading branch information
kisieland committed May 20, 2021
1 parent 125c8bf commit 7f680d9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
3 changes: 2 additions & 1 deletion processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error
var req pdata.Metrics
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.dataPointCount -= sendBatchMaxSize
_, reqDataPointCount := req.MetricAndDataPointCount()
bm.dataPointCount -= reqDataPointCount
} else {
req = bm.metricData
bm.metricData = pdata.NewMetrics()
Expand Down
17 changes: 17 additions & 0 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,23 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
assert.Equal(t, size, int(distData.Sum()))
}

func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
ctx := context.Background()
sink := new(metricsSink)
metricsCount := 50
dataPointsPerMetric := 2
sendBatchMaxSize := 99

batchMetrics := newBatchMetrics(sink)
md := testdata.GenerateMetricsManyMetricsSameResource(metricsCount)

batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize))
remainingMetricsCount := (metricsCount - sendBatchMaxSize/dataPointsPerMetric)
require.Equal(t, remainingMetricsCount*dataPointsPerMetric, batchMetrics.dataPointCount)
}

func TestBatchMetricsProcessor_Timeout(t *testing.T) {
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
Expand Down

0 comments on commit 7f680d9

Please sign in to comment.