Skip to content

Commit

Permalink
Make the batch processor limit data points rather than metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
kisieland committed May 11, 2021
1 parent 51281a7 commit 9fd1d2e
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 51 deletions.
50 changes: 30 additions & 20 deletions consumer/pdata/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,31 +127,41 @@ func (md Metrics) MetricAndDataPointCount() (metricCount int, dataPointCount int
ilm := ilms.At(j)
metrics := ilm.Metrics()
metricCount += metrics.Len()
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
switch m.DataType() {
case MetricDataTypeIntGauge:
dataPointCount += m.IntGauge().DataPoints().Len()
case MetricDataTypeDoubleGauge:
dataPointCount += m.DoubleGauge().DataPoints().Len()
case MetricDataTypeIntSum:
dataPointCount += m.IntSum().DataPoints().Len()
case MetricDataTypeDoubleSum:
dataPointCount += m.DoubleSum().DataPoints().Len()
case MetricDataTypeIntHistogram:
dataPointCount += m.IntHistogram().DataPoints().Len()
case MetricDataTypeHistogram:
dataPointCount += m.Histogram().DataPoints().Len()
case MetricDataTypeSummary:
dataPointCount += m.Summary().DataPoints().Len()
}
}
dataPointCount += ilm.Metrics().DataPointCount()
}
}
return
}

// DataPointCount calculates the total number of data points.
func (ms MetricSlice) DataPointCount() (dataPointCount int) {
for k := 0; k < ms.Len(); k++ {
dataPointCount += ms.At(k).DataPointCount()
}
return
}

// DataPointCount calculates the total number of data points.
func (ms Metric) DataPointCount() (dataPointCount int) {
switch ms.DataType() {
case MetricDataTypeIntGauge:
dataPointCount = ms.IntGauge().DataPoints().Len()
case MetricDataTypeDoubleGauge:
dataPointCount = ms.DoubleGauge().DataPoints().Len()
case MetricDataTypeIntSum:
dataPointCount = ms.IntSum().DataPoints().Len()
case MetricDataTypeDoubleSum:
dataPointCount = ms.DoubleSum().DataPoints().Len()
case MetricDataTypeIntHistogram:
dataPointCount = ms.IntHistogram().DataPoints().Len()
case MetricDataTypeHistogram:
dataPointCount = ms.Histogram().DataPoints().Len()
case MetricDataTypeSummary:
dataPointCount = ms.Summary().DataPoints().Len()
}
return
}

// MetricDataType specifies the type of data in a Metric.
type MetricDataType int32

Expand Down
4 changes: 2 additions & 2 deletions processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ any data drops such as sampling.
Please refer to [config.go](./config.go) for the config spec.

The following configuration options can be modified:
- `send_batch_size` (default = 8192): Number of spans or metrics after which a
batch will be sent regardless of the timeout.
- `send_batch_size` (default = 8192): Number of spans or data point in metrics
after which a batch will be sent regardless of the timeout.
- `timeout` (default = 200ms): Time duration after which a batch will be sent
regardless of size.
- `send_batch_max_size` (default = 0): The upper limit of the batch size.
Expand Down
20 changes: 10 additions & 10 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func (bt *batchTraces) size() int {
}

type batchMetrics struct {
nextConsumer consumer.Metrics
metricData pdata.Metrics
metricCount int
nextConsumer consumer.Metrics
metricData pdata.Metrics
dataPointCount int
}

func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
Expand All @@ -274,19 +274,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
var req pdata.Metrics
if sendBatchMaxSize > 0 && bm.metricCount > sendBatchMaxSize {
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.metricCount -= sendBatchMaxSize
bm.dataPointCount -= sendBatchMaxSize
} else {
req = bm.metricData
bm.metricData = pdata.NewMetrics()
bm.metricCount = 0
bm.dataPointCount = 0
}
return bm.nextConsumer.ConsumeMetrics(ctx, req)
}

func (bm *batchMetrics) itemCount() int {
return bm.metricCount
return bm.dataPointCount
}

func (bm *batchMetrics) size() int {
Expand All @@ -296,11 +296,11 @@ func (bm *batchMetrics) size() int {
func (bm *batchMetrics) add(item interface{}) {
md := item.(pdata.Metrics)

newMetricsCount := md.MetricCount()
if newMetricsCount == 0 {
_, newDataPointCount := md.MetricAndDataPointCount()
if newDataPointCount == 0 {
return
}
bm.metricCount += newMetricsCount
bm.dataPointCount += newDataPointCount
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}

Expand Down
8 changes: 5 additions & 3 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {

requestCount := 100
metricsPerRequest := 5
dataPointsPreMetric := 2 // Since the int counter uses two datapoints.
dataPointsPerRequest := metricsPerRequest * dataPointsPreMetric
sink := new(consumertest.MetricsSink)

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
Expand All @@ -339,8 +341,8 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

expectedBatchesNum := requestCount * metricsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / metricsPerRequest
expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest

require.Equal(t, requestCount*metricsPerRequest, sink.MetricsCount())
receivedMds := sink.AllMetrics()
Expand All @@ -357,7 +359,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
assert.Equal(t, 1, len(viewData))
distData := viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, sink.MetricsCount(), int(distData.Sum()))
assert.Equal(t, sink.MetricsCount()*dataPointsPreMetric, int(distData.Sum()))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))

Expand Down
21 changes: 12 additions & 9 deletions processor/batchprocessor/splitmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import (

// splitMetrics removes metrics from the input data and returns a new data of the specified size.
func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
if src.MetricCount() <= size {
_, dataPoints := src.MetricAndDataPointCount()
if dataPoints <= size {
return src
}
totalCopiedMetrics := 0
shouldSend := false
totalCopiedDataPoints := 0
dest := pdata.NewMetrics()

src.ResourceMetrics().RemoveIf(func(srcRs pdata.ResourceMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if shouldSend {
return false
}

Expand All @@ -37,28 +39,29 @@ func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {

srcRs.InstrumentationLibraryMetrics().RemoveIf(func(srcIlm pdata.InstrumentationLibraryMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if shouldSend {
return false
}

destIlm := destRs.InstrumentationLibraryMetrics().AppendEmpty()
srcIlm.InstrumentationLibrary().CopyTo(destIlm.InstrumentationLibrary())

// If possible to move all metrics do that.
srcMetricsLen := srcIlm.Metrics().Len()
if size-totalCopiedMetrics >= srcMetricsLen {
totalCopiedMetrics += srcMetricsLen
srcDataPointCount := srcIlm.Metrics().DataPointCount()
if size-totalCopiedDataPoints >= srcDataPointCount {
totalCopiedDataPoints += srcDataPointCount
srcIlm.Metrics().MoveAndAppendTo(destIlm.Metrics())
return true
}

srcIlm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
if srcMetric.DataPointCount()+totalCopiedDataPoints > size {
shouldSend = true
return false
}
srcMetric.CopyTo(destIlm.Metrics().AppendEmpty())
totalCopiedMetrics++
totalCopiedDataPoints += srcMetric.DataPointCount()
return true
})
return false
Expand Down
51 changes: 44 additions & 7 deletions processor/batchprocessor/splitmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ func TestSplitMetrics_noop(t *testing.T) {
func TestSplitMetrics(t *testing.T) {
md := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := metrics.At(0).DataPointCount()
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metrics.At(i).DataPointCount())
}
cp := pdata.NewMetrics()
cpMetrics := cp.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics()
Expand All @@ -52,9 +54,10 @@ func TestSplitMetrics(t *testing.T) {
metrics.At(3).CopyTo(cpMetrics.At(3))
metrics.At(4).CopyTo(cpMetrics.At(4))

splitSize := 5
splitMetricCount := 5
splitSize := splitMetricCount * dataPointCount
split := splitMetrics(splitSize, md)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, splitMetricCount, split.MetricCount())
assert.Equal(t, cp, split)
assert.Equal(t, 15, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
Expand All @@ -79,8 +82,10 @@ func TestSplitMetrics(t *testing.T) {
func TestSplitMetricsMultipleResourceSpans(t *testing.T) {
md := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := metrics.At(0).DataPointCount()
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metrics.At(i).DataPointCount())
}
// add second index to resource metrics
testdata.GenerateMetricsManyMetricsSameResource(20).
Expand All @@ -90,9 +95,10 @@ func TestSplitMetricsMultipleResourceSpans(t *testing.T) {
metrics.At(i).SetName(getTestMetricName(1, i))
}

splitSize := 5
splitMetricCount := 5
splitSize := splitMetricCount * dataPointCount
split := splitMetrics(splitSize, md)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, splitMetricCount, split.MetricCount())
assert.Equal(t, 35, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
Expand All @@ -101,8 +107,10 @@ func TestSplitMetricsMultipleResourceSpans(t *testing.T) {
func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *testing.T) {
td := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := metrics.At(0).DataPointCount()
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metrics.At(i).DataPointCount())
}
td.ResourceMetrics().Resize(2)
// add second index to resource metrics
Expand All @@ -113,10 +121,11 @@ func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *tes
metrics.At(i).SetName(getTestMetricName(1, i))
}

splitSize := 25
splitMetricCount := 25
splitSize := splitMetricCount * dataPointCount
split := splitMetrics(splitSize, td)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, 40-splitSize, td.MetricCount())
assert.Equal(t, splitMetricCount, split.MetricCount())
assert.Equal(t, 40-splitMetricCount, td.MetricCount())
assert.Equal(t, 1, td.ResourceMetrics().Len())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name())
Expand Down Expand Up @@ -166,3 +175,31 @@ func BenchmarkCloneMetrics(b *testing.B) {
}
}
}

func TestSplitMetricsNotFull(t *testing.T) {
md := testdata.GenerateMetricsManyMetricsSameResource(10)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
dataPointCount := 2
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
assert.Equal(t, dataPointCount, metrics.At(i).DataPointCount())
}

splitSize := 9
split := splitMetrics(splitSize, md)
assert.Equal(t, 4, split.MetricCount())
assert.Equal(t, 6, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-3", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(3).Name())

split = splitMetrics(splitSize, md)
assert.Equal(t, 4, split.MetricCount())
assert.Equal(t, 2, md.MetricCount())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-7", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(3).Name())

split = splitMetrics(splitSize, md)
assert.Equal(t, 2, split.MetricCount())
assert.Equal(t, "test-metric-int-0-8", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-9", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(1).Name())
}

0 comments on commit 9fd1d2e

Please sign in to comment.