Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp) metrics hijacker incorrect fallback during merge #2476

Merged
merged 1 commit into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 73 additions & 59 deletions app/kuma-dp/pkg/dataplane/metrics/merge.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package metrics

import (
"fmt"
"io"
"regexp"
"sort"
"strconv"
"strings"

"github.com/pkg/errors"
io_prometheus_client "github.com/prometheus/client_model/go"
Expand All @@ -30,13 +33,45 @@ func MergeClusters(in io.Reader, out io.Writer) error {
continue
}

// metricsByClusterNames returns the data in the following format:
// 'cluster_name' ->
// - metric1{envoy_cluster_name="cluster_name-_0_",label1="value1"} 10
// - metric1{envoy_cluster_name="cluster_name-_1_",label1="value1"} 20
// - metric1{envoy_cluster_name="cluster_name-_2_",label1="value1"} 30
// 'another_cluster_name' ->
// - metric1{envoy_cluster_name="another_cluster_name-_0_",response_code="200"} 10
// - metric1{envoy_cluster_name="another_cluster_name-_0_",response_code="401"} 20
// - metric1{envoy_cluster_name="another_cluster_name-_1_",response_code="200"} 30
// - metric1{envoy_cluster_name="another_cluster_name-_2_",response_code="503"} 40
metricsByClusterName, err := metricsByClusterNames(metricFamily.Metric)
if err != nil {
return err
}

// renameCluster changes the value of 'envoy_cluster_name' label for every metric.
// So the data will look like:
// 'cluster_name' ->
// - metric1{envoy_cluster_name="cluster_name",label1="value1"} 10
// - metric1{envoy_cluster_name="cluster_name",label1="value1"} 20
// - metric1{envoy_cluster_name="cluster_name",label1="value1"} 30
// 'another_cluster_name' ->
// - metric1{envoy_cluster_name="another_cluster_name",response_code="200"} 10
// - metric1{envoy_cluster_name="another_cluster_name",response_code="401"} 20
// - metric1{envoy_cluster_name="another_cluster_name",response_code="200"} 30
// - metric1{envoy_cluster_name="another_cluster_name",response_code="503"} 40
for clusterName, metrics := range metricsByClusterName {
metricsByClusterName[clusterName] = merge(metricFamily.Type, clusterName, metrics)
renameCluster(clusterName, metrics)
}

// after the previous step we've got duplicates in the metrics, merge them during this step:
// 'cluster_name' ->
// - metric1{envoy_cluster_name="cluster_name",label1="value1"} 60
// 'another_cluster_name' ->
// - metric1{envoy_cluster_name="another_cluster_name",response_code="200"} 40
// - metric1{envoy_cluster_name="another_cluster_name",response_code="401"} 20
// - metric1{envoy_cluster_name="another_cluster_name",response_code="503"} 40
for clusterName, metrics := range metricsByClusterName {
metricsByClusterName[clusterName] = mergeDuplicates(metricFamily.Type, metrics)
}

metricFamily.Metric = nil
Expand All @@ -55,71 +90,52 @@ func MergeClusters(in io.Reader, out io.Writer) error {
return nil
}

func merge(typ *io_prometheus_client.MetricType, clusterName string, metrics []*io_prometheus_client.Metric) []*io_prometheus_client.Metric {
if len(metrics) == 1 {
return metrics
func renameCluster(clusterName string, metrics []*io_prometheus_client.Metric) {
for _, metric := range metrics {
for _, label := range metric.GetLabel() {
if label.GetName() == EnvoyClusterLabelName {
label.Value = &clusterName
}
}
}
}

labels, err := mergeLabels(clusterName, metrics)
if err != nil {
logger.Error(err, "unable to merge labels, falling back to unmerged state")
return metrics
func mergeDuplicates(typ *io_prometheus_client.MetricType, metrics []*io_prometheus_client.Metric) []*io_prometheus_client.Metric {
hashes := map[string][]*io_prometheus_client.Metric{}
for _, metric := range metrics {
hashes[hash(metric)] = append(hashes[hash(metric)], metric)
}

merged := &io_prometheus_client.Metric{
Label: labels,
}
var result []*io_prometheus_client.Metric

switch *typ {
case io_prometheus_client.MetricType_COUNTER:
merged.Counter = mergeCounter(metrics)
return []*io_prometheus_client.Metric{merged}
case io_prometheus_client.MetricType_GAUGE:
merged.Gauge = mergeGauge(metrics)
return []*io_prometheus_client.Metric{merged}
case io_prometheus_client.MetricType_SUMMARY:
merged.Summary = mergeSummary(metrics)
return []*io_prometheus_client.Metric{merged}
case io_prometheus_client.MetricType_UNTYPED:
merged.Untyped = mergeUntyped(metrics)
return []*io_prometheus_client.Metric{merged}
case io_prometheus_client.MetricType_HISTOGRAM:
merged.Histogram = mergeHistogram(metrics)
return []*io_prometheus_client.Metric{merged}
for _, dups := range hashes {
merged := &io_prometheus_client.Metric{
Label: dups[0].GetLabel(),
}
switch *typ {
case io_prometheus_client.MetricType_COUNTER:
merged.Counter = mergeCounter(dups)
case io_prometheus_client.MetricType_GAUGE:
merged.Gauge = mergeGauge(dups)
case io_prometheus_client.MetricType_SUMMARY:
merged.Summary = mergeSummary(dups)
case io_prometheus_client.MetricType_UNTYPED:
merged.Untyped = mergeUntyped(dups)
case io_prometheus_client.MetricType_HISTOGRAM:
merged.Histogram = mergeHistogram(dups)
}
result = append(result, merged)
}

return nil
return result
}

func mergeLabels(clusterName string, metrics []*io_prometheus_client.Metric) ([]*io_prometheus_client.LabelPair, error) {
labels := map[string]string{}
for _, m := range metrics {
for _, l := range m.Label {
if l.GetName() == EnvoyClusterLabelName {
continue
}
value, ok := labels[l.GetName()]
if ok && value != l.GetValue() {
return nil, errors.Errorf("failed to merge label '%s', values are not equal: '%s' != '%s'", l.GetName(), value, l.GetValue())
}
if !ok {
labels[l.GetName()] = l.GetValue()
}
}
func hash(metric *io_prometheus_client.Metric) string {
pairs := []string{}
for _, l := range metric.GetLabel() {
pairs = append(pairs, fmt.Sprintf("%s=%s", l.GetName(), l.GetValue()))
}
pairs := []*io_prometheus_client.LabelPair{{
Name: strptr(EnvoyClusterLabelName),
Value: &clusterName,
}}
for name, value := range labels {
n := name
v := value
pairs = append(pairs, &io_prometheus_client.LabelPair{
Name: &n,
Value: &v,
})
}
return pairs, nil
sort.Strings(pairs)
return strings.Join(pairs, ";")
}

func mergeCounter(metrics []*io_prometheus_client.Metric) *io_prometheus_client.Counter {
Expand Down Expand Up @@ -259,5 +275,3 @@ func isMergeableClusterName(clusterName string) (prefix string, n int, ok bool)
}
return matches[prefixIndex], num, true
}

func strptr(s string) *string { return &s }
8 changes: 4 additions & 4 deletions app/kuma-dp/pkg/dataplane/metrics/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ var _ = Describe("Merge", func() {
input: "./testdata/counter-with-labels.in",
expected: "./testdata/counter-with-labels.out",
}),
Entry("should not merge unmergeable clusters for Counter with labels", testCase{
input: "./testdata/counter-unmergeable.in",
expected: "./testdata/counter-unmergeable.out",
}),
Entry("should merge clusters for Counter", testCase{
input: "./testdata/counter-sparse.in",
expected: "./testdata/counter-sparse.out",
}),
Entry("should merge clusters for Counter with status codes label", testCase{
input: "./testdata/counter-status-codes.in",
expected: "./testdata/counter-status-codes.out",
}),
)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# TYPE envoy_cluster_external_upstream_rq counter
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="backend_kuma-demo_svc_3001-_0_"} 14
envoy_cluster_external_upstream_rq{envoy_response_code="404",envoy_cluster_name="backend_kuma-demo_svc_3001-_0_"} 2
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="backend_kuma-demo_svc_3001-_1_"} 24
envoy_cluster_external_upstream_rq{envoy_response_code="404",envoy_cluster_name="backend_kuma-demo_svc_3001-_1_"} 12
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="kuma_envoy_admin"} 255
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TYPE envoy_cluster_external_upstream_rq counter
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="backend_kuma-demo_svc_3001"} 38
envoy_cluster_external_upstream_rq{envoy_response_code="404",envoy_cluster_name="backend_kuma-demo_svc_3001"} 14
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="kuma_envoy_admin"} 255

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,23 @@ envoy_cluster_assignment_timeout_received{envoy_cluster_name="kuma_envoy_admin",
envoy_cluster_assignment_timeout_received{envoy_cluster_name="localhost_3000",custom_label="value2"} 7
envoy_cluster_assignment_timeout_received{envoy_cluster_name="outbound_passthrough_ipv4",custom_label="value2"} 8
envoy_cluster_assignment_timeout_received{envoy_cluster_name="outbound_passthrough_ipv6",custom_label="value2"} 9

# TYPE response_codes counter
response_codes{response_code="200",envoy_cluster_name="backend-_0_"} 1
response_codes{response_code="404",envoy_cluster_name="backend-_0_"} 2
response_codes{response_code="500",envoy_cluster_name="backend-_0_"} 3
response_codes{response_code="503",envoy_cluster_name="backend-_0_"} 4
response_codes{response_code="200",envoy_cluster_name="backend-_1_"} 10
response_codes{response_code="404",envoy_cluster_name="backend-_1_"} 20
response_codes{response_code="500",envoy_cluster_name="backend-_1_"} 30
response_codes{response_code="401",envoy_cluster_name="backend-_1_"} 40
response_codes{response_code="200",envoy_cluster_name="frontend-_1_"} 300
response_codes{response_code="401",envoy_cluster_name="frontend-_2_"} 400

# TYPE several_labels counter
several_labels{envoy_cluster_name="backend-_0_",l1="v11",l2="v12",l3="v13"} 1
several_labels{envoy_cluster_name="backend-_0_",l1="v21",l2="v22",l3="v23"} 2
several_labels{envoy_cluster_name="backend-_0_",l1="v31",l2="v32",l3="v33"} 3
several_labels{envoy_cluster_name="backend-_1_",l1="v11",l2="v12",l3="v13"} 10
several_labels{envoy_cluster_name="backend-_1_",l1="v21",l2="v22",l3="v23"} 20
several_labels{envoy_cluster_name="backend-_1_",l1="v31",l2="v32",l3="v33"} 30
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,17 @@ envoy_cluster_assignment_timeout_received{envoy_cluster_name="localhost_3000",cu
envoy_cluster_assignment_timeout_received{envoy_cluster_name="outbound_passthrough_ipv4",custom_label="value2"} 8
envoy_cluster_assignment_timeout_received{envoy_cluster_name="outbound_passthrough_ipv6",custom_label="value2"} 9

# TYPE response_codes counter
response_codes{response_code="200",envoy_cluster_name="backend"} 11
response_codes{response_code="404",envoy_cluster_name="backend"} 22
response_codes{response_code="500",envoy_cluster_name="backend"} 33
response_codes{response_code="503",envoy_cluster_name="backend"} 4
response_codes{response_code="401",envoy_cluster_name="backend"} 40
response_codes{response_code="200",envoy_cluster_name="frontend"} 300
response_codes{response_code="401",envoy_cluster_name="frontend"} 400

# TYPE several_labels counter
several_labels{envoy_cluster_name="backend",l1="v11",l2="v12",l3="v13"} 11
several_labels{envoy_cluster_name="backend",l1="v21",l2="v22",l3="v23"} 22
several_labels{envoy_cluster_name="backend",l1="v31",l2="v32",l3="v33"} 33