Skip to content

Commit

Permalink
[exporter/loadbalacing] Refactor how metrics are split and then re-jo…
Browse files Browse the repository at this point in the history
…ined after load-balacing (open-telemetry#33293)

**Description:** The previous code splits an incoming `pmetric.Metrics`
into individual `pmetric.Metrics` instances, at the granularity of
`pmetric.Metric`. Then afterwards, it used the various routingID
functions to create a map of booleans, in order to define how the
metrics should be routed. Finally, it merged the metrics by routing key,
and exported them by concatenating them all together. While this worked,
it's somewhat hard to follow, and inefficient for most of the
routingIDs. In a future PR, we'd like to add a new routingID, which
would require splitting at the datapoint level. This would add a ton of
extra work for the other routingIDs, which don't care about specific
datapoints.

Therefore, the new code has dedicated splitting functions for each
routingID. These functions directly return a
`map[string]pmetric.Metrics` instance. IE, a map of routing keys to its
metrics. These functions can be unit tested directly, and makes the
logic in `ConsumeMetrics()` very easy to follow. Lastly, when combining
metrics for routing, the new code utilizes the `MergeMetrics()` helper
function from `internal/exp/metrics`. This merges the metrics and
removes duplicate ResourceMetrics / ScopeMetrics instances. Which saves
compute and bandwidth for serialization downstream.


**Link to tracking Issue:** 32513

**Testing:** I created a full suite of tests for each routingID enum.
For both single endpoint, as well an multi-endpoint loadbalancing

**Documentation:** The code is documented in comments. I added a
changelog entry to explain changes for users

---------

Co-authored-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
  • Loading branch information
RichieSams and mx-psi authored Jun 19, 2024
1 parent 2fd817f commit d6eaca8
Show file tree
Hide file tree
Showing 34 changed files with 2,185 additions and 441 deletions.
27 changes: 27 additions & 0 deletions .chloggen/loadbalancer_exporter_refactor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancerexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Refactors how the load balancing exporter splits metrics

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32513]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: All splitting is *behaviorally*, the same. However, the `resource` routingID now uses the `internal/exp/metrics/identity` package to generate the load balancing key, instead of bespoke code. This means that when upgrading to this version your routes for specific metric groupings could change. However, this will be stable and all future metrics will follow the new routing

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 7 additions & 0 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ const (
resourceRouting
)

const (
svcRoutingStr = "service"
traceIDRoutingStr = "traceID"
metricNameRoutingStr = "metric"
resourceRoutingStr = "resource"
)

// Config defines configuration for the exporter.
type Config struct {
Protocol Protocol `mapstructure:"protocol"`
Expand Down
16 changes: 14 additions & 2 deletions exporter/loadbalancingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.16
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.29.10
github.com/aws/smithy-go v1.20.2
github.com/json-iterator/go v1.1.12
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.103.0
github.com/stretchr/testify v1.9.0
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/component v0.103.0
Expand All @@ -22,6 +26,7 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.3
k8s.io/apimachinery v0.29.3
k8s.io/client-go v0.29.3
Expand Down Expand Up @@ -69,7 +74,6 @@ require (
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand All @@ -82,6 +86,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.2.3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.103.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand Down Expand Up @@ -149,7 +154,6 @@ require (
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
Expand All @@ -168,3 +172,11 @@ retract (

// ambiguous import: found package cloud.google.com/go/compute/metadata in multiple modules
replace cloud.google.com/go v0.65.0 => cloud.google.com/go v0.110.10

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics
7 changes: 0 additions & 7 deletions exporter/loadbalancingexporter/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"

import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand All @@ -13,9 +12,3 @@ func mergeTraces(t1 ptrace.Traces, t2 ptrace.Traces) ptrace.Traces {
t2.ResourceSpans().MoveAndAppendTo(t1.ResourceSpans())
return t1
}

// mergeMetrics concatenates two pmetric.Metrics into a single pmetric.Metrics.
func mergeMetrics(m1 pmetric.Metrics, m2 pmetric.Metrics) pmetric.Metrics {
m2.ResourceMetrics().MoveAndAppendTo(m1.ResourceMetrics())
return m1
}
82 changes: 0 additions & 82 deletions exporter/loadbalancingexporter/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
)
Expand Down Expand Up @@ -66,60 +65,6 @@ func TestMergeTraces(t *testing.T) {
require.Equal(t, expectedTraces, mergedTraces)
}

func TestMergeMetricsTwoEmpty(t *testing.T) {
expectedEmpty := pmetric.NewMetrics()
metric1 := pmetric.NewMetrics()
metric2 := pmetric.NewMetrics()

mergedMetrics := mergeMetrics(metric1, metric2)

require.Equal(t, expectedEmpty, mergedMetrics)
}

func TestMergeMetricsSingleEmpty(t *testing.T) {
expectedMetrics := simpleMetricsWithResource()

metric1 := pmetric.NewMetrics()
metric2 := simpleMetricsWithResource()

mergedMetrics := mergeMetrics(metric1, metric2)

require.Equal(t, expectedMetrics, mergedMetrics)
}

func TestMergeMetrics(t *testing.T) {
expectedMetrics := pmetric.NewMetrics()
expectedMetrics.ResourceMetrics().EnsureCapacity(3)
ametrics := expectedMetrics.ResourceMetrics().AppendEmpty()
ametrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-1")
ametrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m1")
bmetrics := expectedMetrics.ResourceMetrics().AppendEmpty()
bmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-2")
bmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m1")
cmetrics := expectedMetrics.ResourceMetrics().AppendEmpty()
cmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-3")
cmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m2")

metric1 := pmetric.NewMetrics()
metric1.ResourceMetrics().EnsureCapacity(2)
m1ametrics := metric1.ResourceMetrics().AppendEmpty()
m1ametrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-1")
m1ametrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m1")
m1bmetrics := metric1.ResourceMetrics().AppendEmpty()
m1bmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-2")
m1bmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m1")

metric2 := pmetric.NewMetrics()
metric2.ResourceMetrics().EnsureCapacity(1)
m2cmetrics := metric2.ResourceMetrics().AppendEmpty()
m2cmetrics.Resource().Attributes().PutStr(conventions.AttributeServiceName, "service-name-3")
m2cmetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("m2")

mergedMetrics := mergeMetrics(metric1, metric2)

require.Equal(t, expectedMetrics, mergedMetrics)
}

func benchMergeTraces(b *testing.B, tracesCount int) {
traces1 := ptrace.NewTraces()
traces2 := ptrace.NewTraces()
Expand All @@ -146,30 +91,3 @@ func BenchmarkMergeTraces_X500(b *testing.B) {
func BenchmarkMergeTraces_X1000(b *testing.B) {
benchMergeTraces(b, 1000)
}

func benchMergeMetrics(b *testing.B, metricsCount int) {
metrics1 := pmetric.NewMetrics()
metrics2 := pmetric.NewMetrics()

for i := 0; i < metricsCount; i++ {
appendSimpleMetricWithID(metrics2.ResourceMetrics().AppendEmpty(), "metrics-2")
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
mergeMetrics(metrics1, metrics2)
}
}

func BenchmarkMergeMetrics_X100(b *testing.B) {
benchMergeMetrics(b, 100)
}

func BenchmarkMergeMetrics_X500(b *testing.B) {
benchMergeMetrics(b, 500)
}

func BenchmarkMergeMetrics_X1000(b *testing.B) {
benchMergeMetrics(b, 1000)
}
Loading

0 comments on commit d6eaca8

Please sign in to comment.