Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the batch processor limit data points rather than metrics. #3141

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Replace `ProcessorCreateParams` with `ProcessorCreateSettings`. (#3181)
- Replace `ExporterCreateParams` with `ExporterCreateSettings` (#3164)
- Replace `ReceiverCreateParams` with `ReceiverCreateSettings`. (#3167)
- Change `batchprocessor` logic to limit data points rather than metrics (#3141)
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved

## 💡 Enhancements 💡

Expand Down
4 changes: 2 additions & 2 deletions processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ any data drops such as sampling.
Please refer to [config.go](./config.go) for the config spec.

The following configuration options can be modified:
- `send_batch_size` (default = 8192): Number of spans or metrics after which a
batch will be sent regardless of the timeout.
- `send_batch_size` (default = 8192): Number of spans, metric data points, or log
records after which a batch will be sent regardless of the timeout.
- `timeout` (default = 200ms): Time duration after which a batch will be sent
regardless of size.
- `send_batch_max_size` (default = 0): The upper limit of the batch size.
Expand Down
20 changes: 10 additions & 10 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func (bt *batchTraces) size() int {
}

type batchMetrics struct {
nextConsumer consumer.Metrics
metricData pdata.Metrics
metricCount int
nextConsumer consumer.Metrics
metricData pdata.Metrics
dataPointCount int
}

func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
Expand All @@ -274,19 +274,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
var req pdata.Metrics
if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize {
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.metricCount -= sendBatchMaxSize
bm.dataPointCount -= sendBatchMaxSize
kisieland marked this conversation as resolved.
Show resolved Hide resolved
} else {
req = bm.metricData
bm.metricData = pdata.NewMetrics()
bm.metricCount = 0
bm.dataPointCount = 0
}
return bm.nextConsumer.ConsumeMetrics(ctx, req)
}

func (bm *batchMetrics) itemCount() int {
return bm.metricCount
return bm.dataPointCount
}

func (bm *batchMetrics) size() int {
Expand All @@ -296,11 +296,11 @@ func (bm *batchMetrics) size() int {
func (bm *batchMetrics) add(item interface{}) {
md := item.(pdata.Metrics)

newMetricsCount := md.MetricCount()
if newMetricsCount == 0 {
_, newDataPointCount := md.MetricAndDataPointCount()
if newDataPointCount == 0 {
return
}
bm.metricCount += newMetricsCount
bm.dataPointCount += newDataPointCount
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}

Expand Down
76 changes: 73 additions & 3 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package batchprocessor
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -320,6 +322,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {

requestCount := 100
metricsPerRequest := 5
dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
sink := new(consumertest.MetricsSink)

creationSet := component.ProcessorCreateSettings{Logger: zap.NewNop()}
Expand All @@ -339,8 +343,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

expectedBatchesNum := requestCount * metricsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / metricsPerRequest
expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest

require.Equal(t, requestCount*metricsPerRequest, sink.MetricsCount())
receivedMds := sink.AllMetrics()
Expand All @@ -357,7 +361,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
assert.Equal(t, 1, len(viewData))
distData := viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, sink.MetricsCount(), int(distData.Sum()))
assert.Equal(t, sink.MetricsCount()*dataPointsPerMetric, int(distData.Sum()))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))

Expand All @@ -369,6 +373,23 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
assert.Equal(t, size, int(distData.Sum()))
}

func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {
ctx := context.Background()
sink := new(metricsSink)
metricsCount := 50
dataPointsPerMetric := 2
sendBatchMaxSize := 99

batchMetrics := newBatchMetrics(sink)
md := testdata.GenerateMetricsManyMetricsSameResource(metricsCount)

batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize))
remainingDataPointsCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
require.Equal(t, remainingDataPointsCount, batchMetrics.dataPointCount)
}

func TestBatchMetricsProcessor_Timeout(t *testing.T) {
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
Expand Down Expand Up @@ -502,6 +523,55 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
}
}

func BenchmarkBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
}
ctx := context.Background()
sink := new(metricsSink)
creationSet := component.ProcessorCreateSettings{Logger: zap.NewNop()}
metricsPerRequest := 1000

batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(b, err)
require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost()))

mds := make([]pdata.Metrics, 0, b.N)
for n := 0; n < b.N; n++ {
mds = append(mds,
testdata.GenerateMetricsManyMetricsSameResource(metricsPerRequest),
)
}
b.StartTimer()
for n := 0; n < b.N; n++ {
batcher.ConsumeMetrics(ctx, mds[n])
}
b.StopTimer()
require.NoError(b, batcher.Shutdown(ctx))
require.Equal(b, b.N*metricsPerRequest, sink.metricsCount)
}

type metricsSink struct {
mu sync.Mutex
metricsCount int
}

func (sme *metricsSink) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: false,
}
}

func (sme *metricsSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
sme.mu.Lock()
defer sme.mu.Unlock()
sme.metricsCount += md.MetricCount()
return nil
}

func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
Expand Down
103 changes: 92 additions & 11 deletions processor/batchprocessor/splitmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (

// splitMetrics removes metrics from the input data and returns a new data of the specified size.
func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
if src.MetricCount() <= size {
_, dataPoints := src.MetricAndDataPointCount()
if dataPoints <= size {
return src
}
totalCopiedMetrics := 0
totalCopiedDataPoints := 0
dest := pdata.NewMetrics()

src.ResourceMetrics().RemoveIf(func(srcRs pdata.ResourceMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}

Expand All @@ -37,29 +38,30 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {

srcRs.InstrumentationLibraryMetrics().RemoveIf(func(srcIlm pdata.InstrumentationLibraryMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}

destIlm := destRs.InstrumentationLibraryMetrics().AppendEmpty()
srcIlm.InstrumentationLibrary().CopyTo(destIlm.InstrumentationLibrary())

// If possible to move all metrics do that.
srcMetricsLen := srcIlm.Metrics().Len()
if size-totalCopiedMetrics >= srcMetricsLen {
totalCopiedMetrics += srcMetricsLen
srcDataPointCount := metricSliceDataPointCount(srcIlm.Metrics())
if size-totalCopiedDataPoints >= srcDataPointCount {
totalCopiedDataPoints += srcDataPointCount
srcIlm.Metrics().MoveAndAppendTo(destIlm.Metrics())
return true
}

srcIlm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if totalCopiedDataPoints == size {
return false
}
srcMetric.CopyTo(destIlm.Metrics().AppendEmpty())
totalCopiedMetrics++
return true
// If the metric has more data points than free slots we should split it.
copiedDataPoints, remove := splitMetric(srcMetric, destIlm.Metrics().AppendEmpty(), size-totalCopiedDataPoints)
totalCopiedDataPoints += copiedDataPoints
return remove
})
return false
})
Expand All @@ -68,3 +70,82 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {

return dest
}

// metricSliceDataPointCount calculates the total number of data points.
func metricSliceDataPointCount(ms pdata.MetricSlice) (dataPointCount int) {
for k := 0; k < ms.Len(); k++ {
dataPointCount += metricDataPointCount(ms.At(k))
}
return
}

// metricDataPointCount calculates the total number of data points.
func metricDataPointCount(ms pdata.Metric) (dataPointCount int) {
switch ms.DataType() {
case pdata.MetricDataTypeIntGauge:
dataPointCount = ms.IntGauge().DataPoints().Len()
case pdata.MetricDataTypeDoubleGauge:
dataPointCount = ms.DoubleGauge().DataPoints().Len()
case pdata.MetricDataTypeIntSum:
dataPointCount = ms.IntSum().DataPoints().Len()
case pdata.MetricDataTypeDoubleSum:
dataPointCount = ms.DoubleSum().DataPoints().Len()
case pdata.MetricDataTypeIntHistogram:
dataPointCount = ms.IntHistogram().DataPoints().Len()
case pdata.MetricDataTypeHistogram:
dataPointCount = ms.Histogram().DataPoints().Len()
case pdata.MetricDataTypeSummary:
dataPointCount = ms.Summary().DataPoints().Len()
}
return
}

// splitMetric removes metric points from the input data and moves data of the specified size to destination.
// Returns size of moved data and boolean describing, whether the metric should be removed from original slice.
func splitMetric(ms, dest pdata.Metric, size int) (int, bool) {
ms.CopyTo(dest)
if metricDataPointCount(ms) <= size {
return metricDataPointCount(ms), true
}

msSize, i := metricDataPointCount(ms)-size, 0
filterDataPoints := func() bool { i++; return i <= msSize }
switch ms.DataType() {
case pdata.MetricDataTypeIntGauge:
dest.IntGauge().DataPoints().Resize(size)
ms.IntGauge().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeDoubleGauge:
dest.DoubleGauge().DataPoints().Resize(size)
ms.DoubleGauge().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeIntSum:
dest.IntSum().DataPoints().Resize(size)
ms.IntSum().DataPoints().RemoveIf(func(_ pdata.IntDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeDoubleSum:
dest.DoubleSum().DataPoints().Resize(size)
ms.DoubleSum().DataPoints().RemoveIf(func(_ pdata.DoubleDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeIntHistogram:
dest.IntHistogram().DataPoints().Resize(size)
ms.IntHistogram().DataPoints().RemoveIf(func(_ pdata.IntHistogramDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeHistogram:
dest.Histogram().DataPoints().Resize(size)
ms.Histogram().DataPoints().RemoveIf(func(_ pdata.HistogramDataPoint) bool {
return filterDataPoints()
})
case pdata.MetricDataTypeSummary:
dest.Summary().DataPoints().Resize(size)
ms.Summary().DataPoints().RemoveIf(func(_ pdata.SummaryDataPoint) bool {
return filterDataPoints()
})
}
return size, false
}
Loading