Skip to content

Commit

Permalink
Fix batch processor metrics reorder, improve performance (open-teleme…
Browse files Browse the repository at this point in the history
…try#3034)

Benchmarks Before:
```
goos: darwin
goarch: amd64
pkg: go.opentelemetry.io/collector/processor/batchprocessor
cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz
BenchmarkSplitMetrics
BenchmarkSplitMetrics-16    	    6660	    170653 ns/op	  182889 B/op	    3309 allocs/op
PASS

Process finished with the exit code 0
```

Benchmarks After:
```
goos: darwin
goarch: amd64
pkg: go.opentelemetry.io/collector/processor/batchprocessor
cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz
BenchmarkSplitMetrics
BenchmarkSplitMetrics-16    	    7858	    134259 ns/op	  141881 B/op	    2596 allocs/op
PASS

Process finished with the exit code 0
```

Benchmarks Reference Clone:
```
goos: darwin
goarch: amd64
pkg: go.opentelemetry.io/collector/processor/batchprocessor
cpu: Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz
BenchmarkCloneMetrics
BenchmarkCloneMetrics-16    	    8726	    127948 ns/op	  137816 B/op	    2503 allocs/op
PASS

Process finished with the exit code 0
```

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored May 5, 2021
1 parent f71d14b commit 1b3ef42
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 67 deletions.
88 changes: 42 additions & 46 deletions processor/batchprocessor/splitmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,52 @@ import (
)

// splitMetrics removes metrics from the input data and returns a new data of the specified size.
func splitMetrics(size int, toSplit pdata.Metrics) pdata.Metrics {
if toSplit.MetricCount() <= size {
return toSplit
func splitMetrics(size int, src pdata.Metrics) pdata.Metrics {
if src.MetricCount() <= size {
return src
}
copiedMetrics := 0
result := pdata.NewMetrics()
result.ResourceMetrics().Resize(toSplit.ResourceMetrics().Len())
rms := toSplit.ResourceMetrics()
totalCopiedMetrics := 0
dest := pdata.NewMetrics()

rmsCount := 0
for i := rms.Len() - 1; i >= 0; i-- {
rmsCount++
rm := rms.At(i)
destRs := result.ResourceMetrics().At(result.ResourceMetrics().Len() - 1 - i)
rm.Resource().CopyTo(destRs.Resource())

destRs.InstrumentationLibraryMetrics().Resize(rm.InstrumentationLibraryMetrics().Len())
src.ResourceMetrics().RemoveIf(func(srcRm pdata.ResourceMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
return false
}

ilmCount := 0
for j := rm.InstrumentationLibraryMetrics().Len() - 1; j >= 0; j-- {
ilmCount++
instMetrics := rm.InstrumentationLibraryMetrics().At(j)
destInstMetrics := destRs.InstrumentationLibraryMetrics().At(destRs.InstrumentationLibraryMetrics().Len() - 1 - j)
instMetrics.InstrumentationLibrary().CopyTo(destInstMetrics.InstrumentationLibrary())
destRs := dest.ResourceMetrics().AppendEmpty()
srcRm.Resource().CopyTo(destRs.Resource())

if size-copiedMetrics >= instMetrics.Metrics().Len() {
destInstMetrics.Metrics().Resize(instMetrics.Metrics().Len())
} else {
destInstMetrics.Metrics().Resize(size - copiedMetrics)
}
for k, destIdx := instMetrics.Metrics().Len()-1, 0; k >= 0 && copiedMetrics < size; k, destIdx = k-1, destIdx+1 {
metric := instMetrics.Metrics().At(k)
metric.CopyTo(destInstMetrics.Metrics().At(destIdx))
copiedMetrics++
// remove metric
instMetrics.Metrics().Resize(instMetrics.Metrics().Len() - 1)
srcRm.InstrumentationLibraryMetrics().RemoveIf(func(srcIm pdata.InstrumentationLibraryMetrics) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
return false
}
if instMetrics.Metrics().Len() == 0 {
rm.InstrumentationLibraryMetrics().Resize(rm.InstrumentationLibraryMetrics().Len() - 1)
}
if copiedMetrics == size {
result.ResourceMetrics().Resize(rmsCount)
return result

destIms := destRs.InstrumentationLibraryMetrics().AppendEmpty()
srcIm.InstrumentationLibrary().CopyTo(destIms.InstrumentationLibrary())

// If possible to move all metrics do that.
srcMetricsLen := srcIm.Metrics().Len()
if size-totalCopiedMetrics >= srcMetricsLen {
totalCopiedMetrics += srcMetricsLen
srcIm.Metrics().MoveAndAppendTo(destIms.Metrics())
return true
}
}
destRs.InstrumentationLibraryMetrics().Resize(ilmCount)
if rm.InstrumentationLibraryMetrics().Len() == 0 {
rms.Resize(rms.Len() - 1)
}
}
result.ResourceMetrics().Resize(rmsCount)
return result

srcIm.Metrics().RemoveIf(func(srcMetric pdata.Metric) bool {
// If we are done skip everything else.
if totalCopiedMetrics == size {
return false
}
srcMetric.CopyTo(destIms.Metrics().AppendEmpty())
totalCopiedMetrics++
return true
})
return false
})
return srcRm.InstrumentationLibraryMetrics().Len() == 0
})

return dest
}
99 changes: 78 additions & 21 deletions processor/batchprocessor/splitmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,45 +46,59 @@ func TestSplitMetrics(t *testing.T) {
cp.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).InstrumentationLibrary())
md.ResourceMetrics().At(0).Resource().CopyTo(
cp.ResourceMetrics().At(0).Resource())
metrics.At(19).CopyTo(cpMetrics.At(0))
metrics.At(18).CopyTo(cpMetrics.At(1))
metrics.At(17).CopyTo(cpMetrics.At(2))
metrics.At(16).CopyTo(cpMetrics.At(3))
metrics.At(15).CopyTo(cpMetrics.At(4))
metrics.At(0).CopyTo(cpMetrics.At(0))
metrics.At(1).CopyTo(cpMetrics.At(1))
metrics.At(2).CopyTo(cpMetrics.At(2))
metrics.At(3).CopyTo(cpMetrics.At(3))
metrics.At(4).CopyTo(cpMetrics.At(4))

splitSize := 5
split := splitMetrics(splitSize, md)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, cp, split)
assert.Equal(t, 15, md.MetricCount())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())

split = splitMetrics(splitSize, md)
assert.Equal(t, 10, md.MetricCount())
assert.Equal(t, "test-metric-int-0-5", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-9", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())

split = splitMetrics(splitSize, md)
assert.Equal(t, 5, md.MetricCount())
assert.Equal(t, "test-metric-int-0-10", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-14", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())

split = splitMetrics(splitSize, md)
assert.Equal(t, 5, md.MetricCount())
assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
}

func TestSplitMetricsMultipleResourceSpans(t *testing.T) {
td := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
md := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(0, i))
}
td.ResourceMetrics().Resize(2)
// add second index to resource metrics
testdata.GenerateMetricsManyMetricsSameResource(20).
ResourceMetrics().At(0).CopyTo(td.ResourceMetrics().At(1))
metrics = td.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics()
ResourceMetrics().At(0).CopyTo(md.ResourceMetrics().AppendEmpty())
metrics = md.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < metrics.Len(); i++ {
metrics.At(i).SetName(getTestMetricName(1, i))
}

splitSize := 5
split := splitMetrics(splitSize, td)
split := splitMetrics(splitSize, md)
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, 35, td.MetricCount())
assert.Equal(t, "test-metric-int-1-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-1-15", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
assert.Equal(t, 35, md.MetricCount())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
}

func TestSplitMetricsMultipleResourceSpans_split_size_greater_than_metric_size(t *testing.T) {
func TestSplitMetricsMultipleResourceSpans_SplitSizeGreaterThanMetricSize(t *testing.T) {
td := testdata.GenerateMetricsManyMetricsSameResource(20)
metrics := td.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < metrics.Len(); i++ {
Expand All @@ -104,8 +118,51 @@ func TestSplitMetricsMultipleResourceSpans_split_size_greater_than_metric_size(t
assert.Equal(t, splitSize, split.MetricCount())
assert.Equal(t, 40-splitSize, td.MetricCount())
assert.Equal(t, 1, td.ResourceMetrics().Len())
assert.Equal(t, "test-metric-int-1-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-1-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-15", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-19", split.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(19).Name())
assert.Equal(t, "test-metric-int-1-0", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-1-4", split.ResourceMetrics().At(1).InstrumentationLibraryMetrics().At(0).Metrics().At(4).Name())
}

func BenchmarkSplitMetrics(b *testing.B) {
md := pdata.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < 20; i++ {
testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := md.Clone()
split := splitMetrics(128, cloneReq)
if split.MetricCount() != 128 || cloneReq.MetricCount() != 400-128 {
b.Fail()
}
}
}

func BenchmarkCloneMetrics(b *testing.B) {
md := pdata.NewMetrics()
rms := md.ResourceMetrics()
for i := 0; i < 20; i++ {
testdata.GenerateMetricsManyMetricsSameResource(20).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
ms := rms.At(rms.Len() - 1).InstrumentationLibraryMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
ms.At(i).SetName(getTestMetricName(1, i))
}
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
cloneReq := md.Clone()
if cloneReq.MetricCount() != 400 {
b.Fail()
}
}
}

0 comments on commit 1b3ef42

Please sign in to comment.