Skip to content

Commit

Permalink
fix(kuma-cp) hijacker merge labels (#2476)
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Lobkov <ilya.lobkov@konghq.com>
(cherry picked from commit 49e24fe)
  • Loading branch information
lobkovilya authored and jpeach committed Aug 11, 2021
1 parent ad0f92a commit e40a9d3
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 116 deletions.
132 changes: 73 additions & 59 deletions app/kuma-dp/pkg/dataplane/metrics/merge.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package metrics

import (
"fmt"
"io"
"regexp"
"sort"
"strconv"
"strings"

"github.com/pkg/errors"
io_prometheus_client "github.com/prometheus/client_model/go"
Expand All @@ -30,13 +33,45 @@ func MergeClusters(in io.Reader, out io.Writer) error {
continue
}

// metricsByClusterNames returns the data in the following format:
// 'cluster_name' ->
// - metric1{envoy_cluster_name="cluster_name-_0_",label1="value1"} 10
// - metric1{envoy_cluster_name="cluster_name-_1_",label1="value1"} 20
// - metric1{envoy_cluster_name="cluster_name-_2_",label1="value1"} 30
// 'another_cluster_name' ->
// - metric1{envoy_cluster_name="another_cluster_name-_0_",response_code="200"} 10
// - metric1{envoy_cluster_name="another_cluster_name-_0_",response_code="401"} 20
// - metric1{envoy_cluster_name="another_cluster_name-_1_",response_code="200"} 30
// - metric1{envoy_cluster_name="another_cluster_name-_2_",response_code="503"} 40
metricsByClusterName, err := metricsByClusterNames(metricFamily.Metric)
if err != nil {
return err
}

// renameCluster changes the value of 'envoy_cluster_name' label for every metric.
// So the data will look like:
// 'cluster_name' ->
// - metric1{envoy_cluster_name="cluster_name",label1="value1"} 10
// - metric1{envoy_cluster_name="cluster_name",label1="value1"} 20
// - metric1{envoy_cluster_name="cluster_name",label1="value1"} 30
// 'another_cluster_name' ->
// - metric1{envoy_cluster_name="another_cluster_name",response_code="200"} 10
// - metric1{envoy_cluster_name="another_cluster_name",response_code="401"} 20
// - metric1{envoy_cluster_name="another_cluster_name",response_code="200"} 30
// - metric1{envoy_cluster_name="another_cluster_name",response_code="503"} 40
for clusterName, metrics := range metricsByClusterName {
metricsByClusterName[clusterName] = merge(metricFamily.Type, clusterName, metrics)
renameCluster(clusterName, metrics)
}

// after the previous step we've got duplicates in the metrics, merge them during this step:
// 'cluster_name' ->
// - metric1{envoy_cluster_name="cluster_name",label1="value1"} 60
// 'another_cluster_name' ->
// - metric1{envoy_cluster_name="another_cluster_name",response_code="200"} 40
// - metric1{envoy_cluster_name="another_cluster_name",response_code="401"} 20
// - metric1{envoy_cluster_name="another_cluster_name",response_code="503"} 40
for clusterName, metrics := range metricsByClusterName {
metricsByClusterName[clusterName] = mergeDuplicates(metricFamily.Type, metrics)
}

metricFamily.Metric = nil
Expand All @@ -55,71 +90,52 @@ func MergeClusters(in io.Reader, out io.Writer) error {
return nil
}

func merge(typ *io_prometheus_client.MetricType, clusterName string, metrics []*io_prometheus_client.Metric) []*io_prometheus_client.Metric {
if len(metrics) == 1 {
return metrics
func renameCluster(clusterName string, metrics []*io_prometheus_client.Metric) {
for _, metric := range metrics {
for _, label := range metric.GetLabel() {
if label.GetName() == EnvoyClusterLabelName {
label.Value = &clusterName
}
}
}
}

labels, err := mergeLabels(clusterName, metrics)
if err != nil {
logger.Error(err, "unable to merge labels, falling back to unmerged state")
return metrics
func mergeDuplicates(typ *io_prometheus_client.MetricType, metrics []*io_prometheus_client.Metric) []*io_prometheus_client.Metric {
hashes := map[string][]*io_prometheus_client.Metric{}
for _, metric := range metrics {
hashes[hash(metric)] = append(hashes[hash(metric)], metric)
}

merged := &io_prometheus_client.Metric{
Label: labels,
}
var result []*io_prometheus_client.Metric

switch *typ {
case io_prometheus_client.MetricType_COUNTER:
merged.Counter = mergeCounter(metrics)
return []*io_prometheus_client.Metric{merged}
case io_prometheus_client.MetricType_GAUGE:
merged.Gauge = mergeGauge(metrics)
return []*io_prometheus_client.Metric{merged}
case io_prometheus_client.MetricType_SUMMARY:
merged.Summary = mergeSummary(metrics)
return []*io_prometheus_client.Metric{merged}
case io_prometheus_client.MetricType_UNTYPED:
merged.Untyped = mergeUntyped(metrics)
return []*io_prometheus_client.Metric{merged}
case io_prometheus_client.MetricType_HISTOGRAM:
merged.Histogram = mergeHistogram(metrics)
return []*io_prometheus_client.Metric{merged}
for _, dups := range hashes {
merged := &io_prometheus_client.Metric{
Label: dups[0].GetLabel(),
}
switch *typ {
case io_prometheus_client.MetricType_COUNTER:
merged.Counter = mergeCounter(dups)
case io_prometheus_client.MetricType_GAUGE:
merged.Gauge = mergeGauge(dups)
case io_prometheus_client.MetricType_SUMMARY:
merged.Summary = mergeSummary(dups)
case io_prometheus_client.MetricType_UNTYPED:
merged.Untyped = mergeUntyped(dups)
case io_prometheus_client.MetricType_HISTOGRAM:
merged.Histogram = mergeHistogram(dups)
}
result = append(result, merged)
}

return nil
return result
}

func mergeLabels(clusterName string, metrics []*io_prometheus_client.Metric) ([]*io_prometheus_client.LabelPair, error) {
labels := map[string]string{}
for _, m := range metrics {
for _, l := range m.Label {
if l.GetName() == EnvoyClusterLabelName {
continue
}
value, ok := labels[l.GetName()]
if ok && value != l.GetValue() {
return nil, errors.Errorf("failed to merge label '%s', values are not equal: '%s' != '%s'", l.GetName(), value, l.GetValue())
}
if !ok {
labels[l.GetName()] = l.GetValue()
}
}
func hash(metric *io_prometheus_client.Metric) string {
pairs := []string{}
for _, l := range metric.GetLabel() {
pairs = append(pairs, fmt.Sprintf("%s=%s", l.GetName(), l.GetValue()))
}
pairs := []*io_prometheus_client.LabelPair{{
Name: strptr(EnvoyClusterLabelName),
Value: &clusterName,
}}
for name, value := range labels {
n := name
v := value
pairs = append(pairs, &io_prometheus_client.LabelPair{
Name: &n,
Value: &v,
})
}
return pairs, nil
sort.Strings(pairs)
return strings.Join(pairs, ";")
}

func mergeCounter(metrics []*io_prometheus_client.Metric) *io_prometheus_client.Counter {
Expand Down Expand Up @@ -259,5 +275,3 @@ func isMergeableClusterName(clusterName string) (prefix string, n int, ok bool)
}
return matches[prefixIndex], num, true
}

func strptr(s string) *string { return &s }
8 changes: 4 additions & 4 deletions app/kuma-dp/pkg/dataplane/metrics/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ var _ = Describe("Merge", func() {
input: "./testdata/counter-with-labels.in",
expected: "./testdata/counter-with-labels.out",
}),
Entry("should not merge unmergeable clusters for Counter with labels", testCase{
input: "./testdata/counter-unmergeable.in",
expected: "./testdata/counter-unmergeable.out",
}),
Entry("should merge clusters for Counter", testCase{
input: "./testdata/counter-sparse.in",
expected: "./testdata/counter-sparse.out",
}),
Entry("should merge clusters for Counter with status codes label", testCase{
input: "./testdata/counter-status-codes.in",
expected: "./testdata/counter-status-codes.out",
}),
)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# TYPE envoy_cluster_external_upstream_rq counter
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="backend_kuma-demo_svc_3001-_0_"} 14
envoy_cluster_external_upstream_rq{envoy_response_code="404",envoy_cluster_name="backend_kuma-demo_svc_3001-_0_"} 2
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="backend_kuma-demo_svc_3001-_1_"} 24
envoy_cluster_external_upstream_rq{envoy_response_code="404",envoy_cluster_name="backend_kuma-demo_svc_3001-_1_"} 12
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="kuma_envoy_admin"} 255
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# TYPE envoy_cluster_external_upstream_rq counter
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="backend_kuma-demo_svc_3001"} 38
envoy_cluster_external_upstream_rq{envoy_response_code="404",envoy_cluster_name="backend_kuma-demo_svc_3001"} 14
envoy_cluster_external_upstream_rq{envoy_response_code="200",envoy_cluster_name="kuma_envoy_admin"} 255

27 changes: 0 additions & 27 deletions app/kuma-dp/pkg/dataplane/metrics/testdata/counter-unmergeable.in

This file was deleted.

26 changes: 0 additions & 26 deletions app/kuma-dp/pkg/dataplane/metrics/testdata/counter-unmergeable.out

This file was deleted.

20 changes: 20 additions & 0 deletions app/kuma-dp/pkg/dataplane/metrics/testdata/counter-with-labels.in
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,23 @@ envoy_cluster_assignment_timeout_received{envoy_cluster_name="kuma_envoy_admin",
envoy_cluster_assignment_timeout_received{envoy_cluster_name="localhost_3000",custom_label="value2"} 7
envoy_cluster_assignment_timeout_received{envoy_cluster_name="outbound_passthrough_ipv4",custom_label="value2"} 8
envoy_cluster_assignment_timeout_received{envoy_cluster_name="outbound_passthrough_ipv6",custom_label="value2"} 9

# TYPE response_codes counter
response_codes{response_code="200",envoy_cluster_name="backend-_0_"} 1
response_codes{response_code="404",envoy_cluster_name="backend-_0_"} 2
response_codes{response_code="500",envoy_cluster_name="backend-_0_"} 3
response_codes{response_code="503",envoy_cluster_name="backend-_0_"} 4
response_codes{response_code="200",envoy_cluster_name="backend-_1_"} 10
response_codes{response_code="404",envoy_cluster_name="backend-_1_"} 20
response_codes{response_code="500",envoy_cluster_name="backend-_1_"} 30
response_codes{response_code="401",envoy_cluster_name="backend-_1_"} 40
response_codes{response_code="200",envoy_cluster_name="frontend-_1_"} 300
response_codes{response_code="401",envoy_cluster_name="frontend-_2_"} 400

# TYPE several_labels counter
several_labels{envoy_cluster_name="backend-_0_",l1="v11",l2="v12",l3="v13"} 1
several_labels{envoy_cluster_name="backend-_0_",l1="v21",l2="v22",l3="v23"} 2
several_labels{envoy_cluster_name="backend-_0_",l1="v31",l2="v32",l3="v33"} 3
several_labels{envoy_cluster_name="backend-_1_",l1="v11",l2="v12",l3="v13"} 10
several_labels{envoy_cluster_name="backend-_1_",l1="v21",l2="v22",l3="v23"} 20
several_labels{envoy_cluster_name="backend-_1_",l1="v31",l2="v32",l3="v33"} 30
14 changes: 14 additions & 0 deletions app/kuma-dp/pkg/dataplane/metrics/testdata/counter-with-labels.out
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,17 @@ envoy_cluster_assignment_timeout_received{envoy_cluster_name="localhost_3000",cu
envoy_cluster_assignment_timeout_received{envoy_cluster_name="outbound_passthrough_ipv4",custom_label="value2"} 8
envoy_cluster_assignment_timeout_received{envoy_cluster_name="outbound_passthrough_ipv6",custom_label="value2"} 9

# TYPE response_codes counter
response_codes{response_code="200",envoy_cluster_name="backend"} 11
response_codes{response_code="404",envoy_cluster_name="backend"} 22
response_codes{response_code="500",envoy_cluster_name="backend"} 33
response_codes{response_code="503",envoy_cluster_name="backend"} 4
response_codes{response_code="401",envoy_cluster_name="backend"} 40
response_codes{response_code="200",envoy_cluster_name="frontend"} 300
response_codes{response_code="401",envoy_cluster_name="frontend"} 400

# TYPE several_labels counter
several_labels{envoy_cluster_name="backend",l1="v11",l2="v12",l3="v13"} 11
several_labels{envoy_cluster_name="backend",l1="v21",l2="v22",l3="v23"} 22
several_labels{envoy_cluster_name="backend",l1="v31",l2="v32",l3="v33"} 33

0 comments on commit e40a9d3

Please sign in to comment.