From d5d78dfcc77dba5dda0f57a6fe97a405e35a6c5d Mon Sep 17 00:00:00 2001 From: xiami Date: Mon, 1 Feb 2021 23:28:16 -0800 Subject: [PATCH 1/4] Refactor EMFExporter and prepare for EMF Metrics Batching logic --- exporter/awsemfexporter/datapoint.go | 296 ++++++++ exporter/awsemfexporter/datapoint_test.go | 708 ++++++++++++++++++ exporter/awsemfexporter/emf_exporter_test.go | 10 +- exporter/awsemfexporter/metric_translator.go | 258 +------ .../awsemfexporter/metric_translator_test.go | 148 ++-- exporter/awsemfexporter/util.go | 74 ++ exporter/awsemfexporter/util_test.go | 206 +++++ 7 files changed, 1387 insertions(+), 313 deletions(-) create mode 100644 exporter/awsemfexporter/datapoint.go create mode 100644 exporter/awsemfexporter/datapoint_test.go diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go new file mode 100644 index 000000000000..33fd9d8d55f7 --- /dev/null +++ b/exporter/awsemfexporter/datapoint.go @@ -0,0 +1,296 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package awsemfexporter + +import ( + "time" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/mapwithexpiry" +) + +const ( + CleanInterval = 5 * time.Minute + MinTimeDiff = 50 * time.Millisecond // We assume 50 milli-seconds is the minimal gap between two collected data sample to be valid to calculate delta + + namespaceKey = "CloudWatchNamespace" + metricNameKey = "CloudWatchMetricName" + logGroupKey = "CloudWatchLogGroup" + logStreamKey = "CloudWatchLogStream" +) + +var currentState = mapwithexpiry.NewMapWithExpiry(CleanInterval) + +// DataPoint represents a processed metric data point +type DataPoint struct { + Value interface{} + Labels map[string]string + TimestampMs int64 +} + +// DataPoints is a wrapper interface for: +// - pdata.IntDataPointSlice +// - pdata.DoubleDataPointSlice +// - pdata.IntHistogramDataPointSlice +// - pdata.DoubleHistogramDataPointSlice +// - pdata.DoubleSummaryDataPointSlice +type DataPoints interface { + Len() int + // NOTE: At() is an expensive call as it calculates the metric's value + At(i int) DataPoint +} + +// rateCalculationMetadata contains the metadata required to perform rate calculation +type rateCalculationMetadata struct { + needsCalculateRate bool + rateKeyParams map[string]string + timestampMs int64 +} + +// rateState stores a metric's value +type rateState struct { + value float64 + timestampMs int64 +} + +// IntDataPointSlice is a wrapper for pdata.IntDataPointSlice +type IntDataPointSlice struct { + instrumentationLibraryName string + rateCalculationMetadata + pdata.IntDataPointSlice +} + +// DoubleDataPointSlice is a wrapper for pdata.DoubleDataPointSlice +type DoubleDataPointSlice struct { + instrumentationLibraryName string + rateCalculationMetadata + pdata.DoubleDataPointSlice +} + +// DoubleHistogramDataPointSlice is a wrapper for pdata.DoubleHistogramDataPointSlice +type DoubleHistogramDataPointSlice struct { + instrumentationLibraryName string + pdata.DoubleHistogramDataPointSlice +} + +// DoubleSummaryDataPointSlice is a wrapper for pdata.DoubleSummaryDataPointSlice +type DoubleSummaryDataPointSlice struct { + instrumentationLibraryName string + pdata.DoubleSummaryDataPointSlice +} + +// At retrieves the IntDataPoint at the given index and performs rate calculation if necessary. +func (dps IntDataPointSlice) At(i int) DataPoint { + metric := dps.IntDataPointSlice.At(i) + labels := createLabels(metric.LabelsMap()) + timestampMs := unixNanoToMilliseconds(metric.Timestamp()) + + var metricVal float64 + metricVal = float64(metric.Value()) + if dps.needsCalculateRate { + rateKey := createMetricKey(labels, dps.rateKeyParams) + rateTS := dps.timestampMs + if timestampMs > 0 { + // Use metric timestamp if available + rateTS = timestampMs + } + metricVal = calculateRate(rateKey, metricVal, rateTS) + } + + return DataPoint{ + Value: metricVal, + Labels: labels, + TimestampMs: timestampMs, + } +} + +// At retrieves the DoubleDataPoint at the given index and performs rate calculation if necessary. +func (dps DoubleDataPointSlice) At(i int) DataPoint { + metric := dps.DoubleDataPointSlice.At(i) + labels := createLabels(metric.LabelsMap()) + timestampMs := unixNanoToMilliseconds(metric.Timestamp()) + + var metricVal float64 + metricVal = metric.Value() + if dps.needsCalculateRate { + rateKey := createMetricKey(labels, dps.rateKeyParams) + rateTS := dps.timestampMs + if timestampMs > 0 { + // Use metric timestamp if available + rateTS = timestampMs + } + metricVal = calculateRate(rateKey, metricVal, rateTS) + } + + return DataPoint{ + Value: metricVal, + Labels: labels, + TimestampMs: timestampMs, + } +} + +// At retrieves the DoubleHistogramDataPoint at the given index. +func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint { + metric := dps.DoubleHistogramDataPointSlice.At(i) + labels := createLabels(metric.LabelsMap()) + timestamp := unixNanoToMilliseconds(metric.Timestamp()) + + return DataPoint{ + Value: &CWMetricStats{ + Count: metric.Count(), + Sum: metric.Sum(), + }, + Labels: labels, + TimestampMs: timestamp, + } +} + +// At retrieves the DoubleSummaryDataPoint at the given index. +func (dps DoubleSummaryDataPointSlice) At(i int) DataPoint { + metric := dps.DoubleSummaryDataPointSlice.At(i) + labels := createLabels(metric.LabelsMap()) + timestampMs := unixNanoToMilliseconds(metric.Timestamp()) + + metricVal := &CWMetricStats{ + Count: metric.Count(), + Sum: metric.Sum(), + } + if quantileValues := metric.QuantileValues(); quantileValues.Len() > 0 { + metricVal.Min = quantileValues.At(0).Value() + metricVal.Max = quantileValues.At(quantileValues.Len() - 1).Value() + } + + return DataPoint{ + Value: metricVal, + Labels: labels, + TimestampMs: timestampMs, + } +} + +// createLabels converts OTel StringMap labels to a map and optionally adds in the +// OTel instrumentation library name +func createLabels(labelsMap pdata.StringMap) map[string]string { + labels := make(map[string]string, labelsMap.Len()+1) + labelsMap.ForEach(func(k, v string) { + labels[k] = v + }) + + return labels +} + +// calculateRate calculates the metric value's rate of change using valDelta / timeDelta. +func calculateRate(metricKey string, val float64, timestampMs int64) float64 { + var metricRate float64 + // get previous Metric content from map. Need to lock the map until set the new state + currentState.Lock() + if state, ok := currentState.Get(metricKey); ok { + prevStats := state.(*rateState) + deltaTime := timestampMs - prevStats.timestampMs + + deltaVal := val - prevStats.value + if deltaTime > MinTimeDiff.Milliseconds() && deltaVal >= 0 { + metricRate = deltaVal * 1e3 / float64(deltaTime) + } + } + content := &rateState{ + value: val, + timestampMs: timestampMs, + } + currentState.Set(metricKey, content) + currentState.Unlock() + return metricRate +} + +// getDataPoints retrieves data points from OT Metric. +func getDataPoints(pmd *pdata.Metric, metadata CWMetricMetadata, logger *zap.Logger) (dps DataPoints) { + if pmd == nil { + return + } + + rateKeyParams := map[string]string{ + namespaceKey: metadata.Namespace, + metricNameKey: pmd.Name(), + logGroupKey: metadata.LogGroup, + logStreamKey: metadata.LogStream, + } + + switch pmd.DataType() { + case pdata.MetricDataTypeIntGauge: + metric := pmd.IntGauge() + dps = IntDataPointSlice{ + metadata.InstrumentationLibraryName, + rateCalculationMetadata{ + false, + rateKeyParams, + metadata.TimestampMs, + }, + metric.DataPoints(), + } + case pdata.MetricDataTypeDoubleGauge: + metric := pmd.DoubleGauge() + dps = DoubleDataPointSlice{ + metadata.InstrumentationLibraryName, + rateCalculationMetadata{ + false, + rateKeyParams, + metadata.TimestampMs, + }, + metric.DataPoints(), + } + case pdata.MetricDataTypeIntSum: + metric := pmd.IntSum() + dps = IntDataPointSlice{ + metadata.InstrumentationLibraryName, + rateCalculationMetadata{ + metric.AggregationTemporality() == pdata.AggregationTemporalityCumulative, + rateKeyParams, + metadata.TimestampMs, + }, + metric.DataPoints(), + } + case pdata.MetricDataTypeDoubleSum: + metric := pmd.DoubleSum() + dps = DoubleDataPointSlice{ + metadata.InstrumentationLibraryName, + rateCalculationMetadata{ + metric.AggregationTemporality() == pdata.AggregationTemporalityCumulative, + rateKeyParams, + metadata.TimestampMs, + }, + metric.DataPoints(), + } + case pdata.MetricDataTypeDoubleHistogram: + metric := pmd.DoubleHistogram() + dps = DoubleHistogramDataPointSlice{ + metadata.InstrumentationLibraryName, + metric.DataPoints(), + } + case pdata.MetricDataTypeDoubleSummary: + metric := pmd.DoubleSummary() + dps = DoubleSummaryDataPointSlice{ + metadata.InstrumentationLibraryName, + metric.DataPoints(), + } + default: + logger.Warn("Unhandled metric data type.", + zap.String("DataType", pmd.DataType().String()), + zap.String("Name", pmd.Name()), + zap.String("Unit", pmd.Unit()), + ) + } + return +} diff --git a/exporter/awsemfexporter/datapoint_test.go b/exporter/awsemfexporter/datapoint_test.go new file mode 100644 index 000000000000..a9981e9a1518 --- /dev/null +++ b/exporter/awsemfexporter/datapoint_test.go @@ -0,0 +1,708 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package awsemfexporter + +import ( + "reflect" + "testing" + "time" + + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/golang/protobuf/ptypes/wrappers" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/internaldata" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" +) + +func generateTestIntGauge(name string) *metricspb.Metric { + return &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: name, + Type: metricspb.MetricDescriptor_GAUGE_INT64, + Unit: "Count", + LabelKeys: []*metricspb.LabelKey{ + {Key: "label1"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{ + {Value: "value1", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Value: &metricspb.Point_Int64Value{ + Int64Value: 1, + }, + }, + }, + }, + }, + } +} + +func generateTestDoubleGauge(name string) *metricspb.Metric { + return &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: name, + Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, + Unit: "Count", + LabelKeys: []*metricspb.LabelKey{ + {Key: "label1"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{ + {Value: "value1", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 0.1, + }, + }, + }, + }, + }, + } +} + +func generateTestIntSum(name string) *metricspb.Metric { + return &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: name, + Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, + Unit: "Count", + LabelKeys: []*metricspb.LabelKey{ + {Key: "label1"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{ + {Value: "value1", HasValue: true}, + {Value: "value2", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Value: &metricspb.Point_Int64Value{ + Int64Value: 1, + }, + }, + }, + }, + }, + } +} + +func generateTestDoubleSum(name string) *metricspb.Metric { + return &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: name, + Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, + Unit: "Count", + LabelKeys: []*metricspb.LabelKey{ + {Key: "label1"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{ + {Value: "value1", HasValue: true}, + {Value: "value2", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 0.1, + }, + }, + }, + }, + }, + } +} + +func generateTestDoubleHistogram(name string) *metricspb.Metric { + return &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: name, + Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, + Unit: "Seconds", + LabelKeys: []*metricspb.LabelKey{ + {Key: "label1"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{ + {Value: "value1", HasValue: true}, + {Value: "value2", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Value: &metricspb.Point_DistributionValue{ + DistributionValue: &metricspb.DistributionValue{ + Sum: 35.0, + Count: 18, + BucketOptions: &metricspb.DistributionValue_BucketOptions{ + Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ + Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{ + Bounds: []float64{0, 10}, + }, + }, + }, + Buckets: []*metricspb.DistributionValue_Bucket{ + { + Count: 5, + }, + { + Count: 6, + }, + { + Count: 7, + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func generateTestSummary(name string) *metricspb.Metric { + return &metricspb.Metric{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: name, + Type: metricspb.MetricDescriptor_SUMMARY, + Unit: "Seconds", + LabelKeys: []*metricspb.LabelKey{ + {Key: "label1"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{ + {Value: "value1", HasValue: true}, + }, + Points: []*metricspb.Point{ + { + Value: &metricspb.Point_SummaryValue{ + SummaryValue: &metricspb.SummaryValue{ + Sum: &wrappers.DoubleValue{ + Value: 15.0, + }, + Count: &wrappers.Int64Value{ + Value: 5, + }, + Snapshot: &metricspb.SummaryValue_Snapshot{ + Count: &wrappers.Int64Value{ + Value: 5, + }, + Sum: &wrappers.DoubleValue{ + Value: 15.0, + }, + PercentileValues: []*metricspb.SummaryValue_Snapshot_ValueAtPercentile{ + { + Percentile: 0.0, + Value: 1, + }, + { + Percentile: 100.0, + Value: 5, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func TestIntDataPointSliceAt(t *testing.T) { + timestamp := time.Now().UnixNano() / int64(time.Millisecond) + instrLibName := "cloudwatch-otel" + labels := map[string]string{"label1": "value1"} + rateKeyParams := map[string]string{ + (namespaceKey): "namespace", + (metricNameKey): "foo", + (logGroupKey): "log-group", + (logStreamKey): "log-stream", + } + + testCases := []struct { + testName string + needsCalculateRate bool + value interface{} + }{ + { + "no rate calculation", + false, + float64(-17), + }, + { + "w/ rate calculation", + true, + float64(0), + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + testDPS := pdata.NewIntDataPointSlice() + testDPS.Resize(1) + testDP := testDPS.At(0) + testDP.SetValue(int64(-17)) + testDP.LabelsMap().InitFromMap(labels) + + dps := IntDataPointSlice{ + instrLibName, + rateCalculationMetadata{ + tc.needsCalculateRate, + rateKeyParams, + timestamp, + }, + testDPS, + } + + expectedDP := DataPoint{ + Value: tc.value, + Labels: map[string]string{ + "label1": "value1", + }, + } + + assert.Equal(t, 1, dps.Len()) + dp := dps.At(0) + assert.Equal(t, expectedDP, dp) + }) + } +} + +func TestDoubleDataPointSliceAt(t *testing.T) { + timestamp := time.Now().UnixNano() / int64(time.Millisecond) + instrLibName := "cloudwatch-otel" + labels := map[string]string{"label1": "value1"} + rateKeyParams := map[string]string{ + (namespaceKey): "namespace", + (metricNameKey): "foo", + (logGroupKey): "log-group", + (logStreamKey): "log-stream", + } + + testCases := []struct { + testName string + needsCalculateRate bool + value interface{} + }{ + { + "no rate calculation", + false, + float64(0.3), + }, + { + "w/ rate calculation", + true, + float64(0), + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + testDPS := pdata.NewDoubleDataPointSlice() + testDPS.Resize(1) + testDP := testDPS.At(0) + testDP.SetValue(float64(0.3)) + testDP.LabelsMap().InitFromMap(labels) + + dps := DoubleDataPointSlice{ + instrLibName, + rateCalculationMetadata{ + tc.needsCalculateRate, + rateKeyParams, + timestamp, + }, + testDPS, + } + + expectedDP := DataPoint{ + Value: tc.value, + Labels: map[string]string{ + "label1": "value1", + }, + } + + assert.Equal(t, 1, dps.Len()) + dp := dps.At(0) + assert.Equal(t, expectedDP, dp) + }) + } +} + +func TestDoubleHistogramDataPointSliceAt(t *testing.T) { + instrLibName := "cloudwatch-otel" + labels := map[string]string{"label1": "value1"} + + testDPS := pdata.NewDoubleHistogramDataPointSlice() + testDPS.Resize(1) + testDP := testDPS.At(0) + testDP.SetCount(uint64(17)) + testDP.SetSum(float64(17.13)) + testDP.SetBucketCounts([]uint64{1, 2, 3}) + testDP.SetExplicitBounds([]float64{1, 2, 3}) + testDP.LabelsMap().InitFromMap(labels) + + dps := DoubleHistogramDataPointSlice{ + instrLibName, + testDPS, + } + + expectedDP := DataPoint{ + Value: &CWMetricStats{ + Sum: 17.13, + Count: 17, + }, + Labels: map[string]string{ + "label1": "value1", + }, + } + + assert.Equal(t, 1, dps.Len()) + dp := dps.At(0) + assert.Equal(t, expectedDP, dp) +} + +func TestDoubleSummaryDataPointSliceAt(t *testing.T) { + instrLibName := "cloudwatch-otel" + labels := map[string]string{"label1": "value1"} + + testDPS := pdata.NewDoubleSummaryDataPointSlice() + testDPS.Resize(1) + testDP := testDPS.At(0) + testDP.SetCount(uint64(17)) + testDP.SetSum(float64(17.13)) + testDP.QuantileValues().Resize(2) + testQuantileValue := testDP.QuantileValues().At(0) + testQuantileValue.SetQuantile(0) + testQuantileValue.SetValue(float64(1)) + testQuantileValue = testDP.QuantileValues().At(1) + testQuantileValue.SetQuantile(100) + testQuantileValue.SetValue(float64(5)) + testDP.LabelsMap().InitFromMap(labels) + + dps := DoubleSummaryDataPointSlice{ + instrLibName, + testDPS, + } + + expectedDP := DataPoint{ + Value: &CWMetricStats{ + Max: 5, + Min: 1, + Count: 17, + Sum: 17.13, + }, + Labels: map[string]string{ + "label1": "value1", + }, + } + + assert.Equal(t, 1, dps.Len()) + dp := dps.At(0) + assert.Equal(t, expectedDP, dp) +} + +func TestCreateLabels(t *testing.T) { + expectedLabels := map[string]string{ + "a": "A", + "b": "B", + "c": "C", + } + labelsMap := pdata.NewStringMap().InitFromMap(expectedLabels) + + labels := createLabels(labelsMap) + assert.Equal(t, expectedLabels, labels) + + // With isntrumentation library name + labels = createLabels(labelsMap) + assert.Equal(t, expectedLabels, labels) +} + +func TestCalculateRate(t *testing.T) { + intRateKey := "foo" + doubleRateKey := "bar" + time1 := time.Now().UnixNano() / int64(time.Millisecond) + time2 := time.Unix(0, time1*int64(time.Millisecond)).Add(time.Second*10).UnixNano() / int64(time.Millisecond) + time3 := time.Unix(0, time2*int64(time.Millisecond)).Add(time.Second*10).UnixNano() / int64(time.Millisecond) + + intVal1 := float64(0) + intVal2 := float64(10) + intVal3 := float64(200) + doubleVal1 := 0.0 + doubleVal2 := 5.0 + doubleVal3 := 15.1 + + rate := calculateRate(intRateKey, intVal1, time1) + assert.Equal(t, float64(0), rate) + rate = calculateRate(doubleRateKey, doubleVal1, time1) + assert.Equal(t, float64(0), rate) + + rate = calculateRate(intRateKey, intVal2, time2) + assert.Equal(t, float64(1), rate) + rate = calculateRate(doubleRateKey, doubleVal2, time2) + assert.Equal(t, 0.5, rate) + + // Test change of data type + rate = calculateRate(intRateKey, doubleVal3, time3) + assert.Equal(t, float64(0.51), rate) + rate = calculateRate(doubleRateKey, intVal3, time3) + assert.Equal(t, float64(19.5), rate) +} + +func TestGetDataPoints(t *testing.T) { + metadata := CWMetricMetadata{ + Namespace: "Namespace", + TimestampMs: time.Now().UnixNano() / int64(time.Millisecond), + LogGroup: "log-group", + LogStream: "log-stream", + InstrumentationLibraryName: "cloudwatch-otel", + } + + testCases := []struct { + testName string + metric *metricspb.Metric + expectedDataPoints DataPoints + }{ + { + "Int gauge", + generateTestIntGauge("foo"), + IntDataPointSlice{ + metadata.InstrumentationLibraryName, + rateCalculationMetadata{ + false, + map[string]string{ + (namespaceKey): metadata.Namespace, + (metricNameKey): "foo", + (logGroupKey): metadata.LogGroup, + (logStreamKey): metadata.LogStream, + }, + metadata.TimestampMs, + }, + pdata.IntDataPointSlice{}, + }, + }, + { + "Double gauge", + generateTestDoubleGauge("foo"), + DoubleDataPointSlice{ + metadata.InstrumentationLibraryName, + rateCalculationMetadata{ + false, + map[string]string{ + (namespaceKey): metadata.Namespace, + (metricNameKey): "foo", + (logGroupKey): metadata.LogGroup, + (logStreamKey): metadata.LogStream, + }, + metadata.TimestampMs, + }, + pdata.DoubleDataPointSlice{}, + }, + }, + { + "Int sum", + generateTestIntSum("foo"), + IntDataPointSlice{ + metadata.InstrumentationLibraryName, + rateCalculationMetadata{ + true, + map[string]string{ + (namespaceKey): metadata.Namespace, + (metricNameKey): "foo", + (logGroupKey): metadata.LogGroup, + (logStreamKey): metadata.LogStream, + }, + metadata.TimestampMs, + }, + pdata.IntDataPointSlice{}, + }, + }, + { + "Double sum", + generateTestDoubleSum("foo"), + DoubleDataPointSlice{ + metadata.InstrumentationLibraryName, + rateCalculationMetadata{ + true, + map[string]string{ + (namespaceKey): metadata.Namespace, + (metricNameKey): "foo", + (logGroupKey): metadata.LogGroup, + (logStreamKey): metadata.LogStream, + }, + metadata.TimestampMs, + }, + pdata.DoubleDataPointSlice{}, + }, + }, + { + "Double histogram", + generateTestDoubleHistogram("foo"), + DoubleHistogramDataPointSlice{ + metadata.InstrumentationLibraryName, + pdata.DoubleHistogramDataPointSlice{}, + }, + }, + { + "Summary", + generateTestSummary("foo"), + DoubleSummaryDataPointSlice{ + metadata.InstrumentationLibraryName, + pdata.DoubleSummaryDataPointSlice{}, + }, + }, + } + + for _, tc := range testCases { + oc := consumerdata.MetricsData{ + Metrics: []*metricspb.Metric{tc.metric}, + } + + // Retrieve *pdata.Metric + rm := internaldata.OCToMetrics(oc).ResourceMetrics().At(0) + metric := rm.InstrumentationLibraryMetrics().At(0).Metrics().At(0) + + logger := zap.NewNop() + + expectedLabels := pdata.NewStringMap().InitFromMap(map[string]string{"label1": "value1"}) + + t.Run(tc.testName, func(t *testing.T) { + dps := getDataPoints(&metric, metadata, logger) + assert.NotNil(t, dps) + assert.Equal(t, reflect.TypeOf(tc.expectedDataPoints), reflect.TypeOf(dps)) + switch convertedDPS := dps.(type) { + case IntDataPointSlice: + expectedDPS := tc.expectedDataPoints.(IntDataPointSlice) + assert.Equal(t, metadata.InstrumentationLibraryName, convertedDPS.instrumentationLibraryName) + assert.Equal(t, expectedDPS.rateCalculationMetadata, convertedDPS.rateCalculationMetadata) + assert.Equal(t, 1, convertedDPS.Len()) + dp := convertedDPS.IntDataPointSlice.At(0) + assert.Equal(t, int64(1), dp.Value()) + assert.Equal(t, expectedLabels, dp.LabelsMap()) + case DoubleDataPointSlice: + expectedDPS := tc.expectedDataPoints.(DoubleDataPointSlice) + assert.Equal(t, metadata.InstrumentationLibraryName, convertedDPS.instrumentationLibraryName) + assert.Equal(t, expectedDPS.rateCalculationMetadata, convertedDPS.rateCalculationMetadata) + assert.Equal(t, 1, convertedDPS.Len()) + dp := convertedDPS.DoubleDataPointSlice.At(0) + assert.Equal(t, 0.1, dp.Value()) + assert.Equal(t, expectedLabels, dp.LabelsMap()) + case DoubleHistogramDataPointSlice: + assert.Equal(t, metadata.InstrumentationLibraryName, convertedDPS.instrumentationLibraryName) + assert.Equal(t, 1, convertedDPS.Len()) + dp := convertedDPS.DoubleHistogramDataPointSlice.At(0) + assert.Equal(t, 35.0, dp.Sum()) + assert.Equal(t, uint64(18), dp.Count()) + assert.Equal(t, []float64{0, 10}, dp.ExplicitBounds()) + assert.Equal(t, expectedLabels, dp.LabelsMap()) + case DoubleSummaryDataPointSlice: + assert.Equal(t, metadata.InstrumentationLibraryName, convertedDPS.instrumentationLibraryName) + assert.Equal(t, 1, convertedDPS.Len()) + dp := convertedDPS.DoubleSummaryDataPointSlice.At(0) + assert.Equal(t, 15.0, dp.Sum()) + assert.Equal(t, uint64(5), dp.Count()) + assert.Equal(t, 2, dp.QuantileValues().Len()) + assert.Equal(t, float64(1), dp.QuantileValues().At(0).Value()) + assert.Equal(t, float64(5), dp.QuantileValues().At(1).Value()) + } + }) + } + + t.Run("Unhandled metric type", func(t *testing.T) { + metric := pdata.NewMetric() + metric.SetName("foo") + metric.SetUnit("Count") + metric.SetDataType(pdata.MetricDataTypeIntHistogram) + + obs, logs := observer.New(zap.WarnLevel) + logger := zap.New(obs) + + dps := getDataPoints(&metric, metadata, logger) + assert.Nil(t, dps) + + // Test output warning logs + expectedLogs := []observer.LoggedEntry{ + { + Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Unhandled metric data type."}, + Context: []zapcore.Field{ + zap.String("DataType", "IntHistogram"), + zap.String("Name", "foo"), + zap.String("Unit", "Count"), + }, + }, + } + assert.Equal(t, 1, logs.Len()) + assert.Equal(t, expectedLogs, logs.AllUntimed()) + }) + + t.Run("Nil metric", func(t *testing.T) { + dps := getDataPoints(nil, metadata, zap.NewNop()) + assert.Nil(t, dps) + }) +} + +func BenchmarkGetDataPoints(b *testing.B) { + oc := consumerdata.MetricsData{ + Metrics: []*metricspb.Metric{ + generateTestIntGauge("int-gauge"), + generateTestDoubleGauge("double-gauge"), + generateTestIntSum("int-sum"), + generateTestDoubleSum("double-sum"), + generateTestDoubleHistogram("double-histogram"), + generateTestSummary("summary"), + }, + } + rms := internaldata.OCToMetrics(oc).ResourceMetrics() + metrics := rms.At(0).InstrumentationLibraryMetrics().At(0).Metrics() + numMetrics := metrics.Len() + + metadata := CWMetricMetadata{ + Namespace: "Namespace", + TimestampMs: int64(1596151098037), + LogGroup: "log-group", + LogStream: "log-stream", + InstrumentationLibraryName: "cloudwatch-otel", + } + + logger := zap.NewNop() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < numMetrics; i++ { + metric := metrics.At(i) + getDataPoints(&metric, metadata, logger) + } + } +} diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index 9f34a7e4e0a9..3b6e6c757c99 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -105,7 +105,7 @@ func TestConsumeMetrics(t *testing.T) { Points: []*metricspb.Point{ { Timestamp: ×tamp.Timestamp{ - Seconds: 100, + Seconds: 0, }, Value: &metricspb.Point_Int64Value{ Int64Value: 1, @@ -166,7 +166,7 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { Points: []*metricspb.Point{ { Timestamp: ×tamp.Timestamp{ - Seconds: 100, + Seconds: 0, }, Value: &metricspb.Point_Int64Value{ Int64Value: 1, @@ -235,7 +235,7 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { Points: []*metricspb.Point{ { Timestamp: ×tamp.Timestamp{ - Seconds: 100, + Seconds: 0, }, Value: &metricspb.Point_Int64Value{ Int64Value: 1, @@ -304,7 +304,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { Points: []*metricspb.Point{ { Timestamp: ×tamp.Timestamp{ - Seconds: 100, + Seconds: 0, }, Value: &metricspb.Point_Int64Value{ Int64Value: 1, @@ -373,7 +373,7 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { Points: []*metricspb.Point{ { Timestamp: ×tamp.Timestamp{ - Seconds: 100, + Seconds: 0, }, Value: &metricspb.Point_Int64Value{ Int64Value: 1, diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index f521eec67c95..d17d7acb726a 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -15,27 +15,17 @@ package awsemfexporter import ( - "bytes" - "crypto/sha1" // #nosec "encoding/json" - "fmt" - "sort" "time" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/translator/conventions" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/mapwithexpiry" ) const ( - CleanInterval = 5 * time.Minute - MinTimeDiff = 50 * time.Millisecond // We assume 50 milli-seconds is the minimal gap between two collected data sample to be valid to calculate delta - // OTel instrumentation lib name as dimension OTellibDimensionKey = "OTelLib" - defaultNameSpace = "default" + defaultNamespace = "default" noInstrumentationLibraryName = "Undefined" // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html @@ -44,21 +34,12 @@ const ( // DimensionRollupOptions ZeroAndSingleDimensionRollup = "ZeroAndSingleDimensionRollup" SingleDimensionRollupOnly = "SingleDimensionRollupOnly" - - FakeMetricValue = 0 ) -var currentState = mapwithexpiry.NewMapWithExpiry(CleanInterval) - -type rateState struct { - value float64 - timestamp int64 -} - // CWMetrics defines type CWMetrics struct { Measurements []CwMeasurement - Timestamp int64 + TimestampMs int64 Fields map[string]interface{} } @@ -77,75 +58,23 @@ type CWMetricStats struct { Sum float64 } -// Wrapper interface for: -// - pdata.IntDataPointSlice -// - pdata.DoubleDataPointSlice -// - pdata.IntHistogramDataPointSlice -// - pdata.DoubleHistogramDataPointSlice -// - pdata.DoubleSummaryDataPointSlice -type DataPoints interface { - Len() int - At(int) DataPoint -} - -// DataPoint is a wrapper interface for: -// - pdata.IntDataPoint -// - pdata.DoubleDataPoint -// - pdata.IntHistogramDataPoint -// - pdata.DoubleHistogramDataPoint -// - pdata.DoubleSummaryDataPointSlice -type DataPoint interface { - LabelsMap() pdata.StringMap -} - -// Define wrapper interfaces such that At(i) returns a `DataPoint` -type IntDataPointSlice struct { - pdata.IntDataPointSlice -} -type DoubleDataPointSlice struct { - pdata.DoubleDataPointSlice -} -type DoubleHistogramDataPointSlice struct { - pdata.DoubleHistogramDataPointSlice -} -type DoubleSummaryDataPointSlice struct { - pdata.DoubleSummaryDataPointSlice -} - -func (dps IntDataPointSlice) At(i int) DataPoint { - return dps.IntDataPointSlice.At(i) -} -func (dps DoubleDataPointSlice) At(i int) DataPoint { - return dps.DoubleDataPointSlice.At(i) -} -func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint { - return dps.DoubleHistogramDataPointSlice.At(i) -} -func (dps DoubleSummaryDataPointSlice) At(i int) DataPoint { - return dps.DoubleSummaryDataPointSlice.At(i) +// CWMetricMetadata represents the metadata associated with a given CloudWatch metric +type CWMetricMetadata struct { + Namespace string + TimestampMs int64 + LogGroup string + LogStream string + InstrumentationLibraryName string } // TranslateOtToCWMetric converts OT metrics to CloudWatch Metric format func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, config *Config) ([]*CWMetrics, int) { var cwMetricList []*CWMetrics - namespace := config.Namespace var instrumentationLibName string - if len(namespace) == 0 { - serviceName, svcNameOk := rm.Resource().Attributes().Get(conventions.AttributeServiceName) - serviceNamespace, svcNsOk := rm.Resource().Attributes().Get(conventions.AttributeServiceNamespace) - if svcNameOk && svcNsOk && serviceName.Type() == pdata.AttributeValueSTRING && serviceNamespace.Type() == pdata.AttributeValueSTRING { - namespace = fmt.Sprintf("%s/%s", serviceNamespace.StringVal(), serviceName.StringVal()) - } else if svcNameOk && serviceName.Type() == pdata.AttributeValueSTRING { - namespace = serviceName.StringVal() - } else if svcNsOk && serviceNamespace.Type() == pdata.AttributeValueSTRING { - namespace = serviceNamespace.StringVal() - } - } - - if len(namespace) == 0 { - namespace = defaultNameSpace - } + cWNamespace := getNamespace(rm, config.Namespace) + logGroup, logStream := getLogInfo(rm, cWNamespace, config) + timestampMs := time.Now().UnixNano() / int64(time.Millisecond) ilms := rm.InstrumentationLibraryMetrics() for j := 0; j < ilms.Len(); j++ { @@ -159,7 +88,14 @@ func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, config *Config) ([]*CWMetr metrics := ilm.Metrics() for k := 0; k < metrics.Len(); k++ { metric := metrics.At(k) - cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, config) + metadata := CWMetricMetadata{ + Namespace: cWNamespace, + TimestampMs: timestampMs, + LogGroup: logGroup, + LogStream: logStream, + InstrumentationLibraryName: instrumentationLibName, + } + cwMetrics := getCWMetrics(&metric, metadata, instrumentationLibName, config) cwMetricList = append(cwMetricList, cwMetrics...) } } @@ -177,7 +113,7 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics, logger *zap.Logger) []*L if len(met.Measurements) > 0 { // Create `_aws` section only if there are measurements cwmMap["CloudWatchMetrics"] = met.Measurements - cwmMap["Timestamp"] = met.Timestamp + cwmMap["Timestamp"] = met.TimestampMs fieldMap["_aws"] = cwmMap } else { str, _ := json.Marshal(fieldMap) @@ -188,7 +124,7 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics, logger *zap.Logger) []*L if err != nil { continue } - metricCreationTime := met.Timestamp + metricCreationTime := met.TimestampMs logEvent := NewLogEvent( metricCreationTime, @@ -201,11 +137,16 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics, logger *zap.Logger) []*L } // getCWMetrics translates OTLP Metric to a list of CW Metrics -func getCWMetrics(metric *pdata.Metric, namespace string, instrumentationLibName string, config *Config) (cwMetrics []*CWMetrics) { +func getCWMetrics(metric *pdata.Metric, metadata CWMetricMetadata, instrumentationLibName string, config *Config) (cwMetrics []*CWMetrics) { if metric == nil { return } + dps := getDataPoints(metric, metadata, config.logger) + if dps == nil || dps.Len() == 0 { + return + } + // metric measure data from OT metricMeasure := make(map[string]string) metricMeasure["Name"] = metric.Name() @@ -213,37 +154,9 @@ func getCWMetrics(metric *pdata.Metric, namespace string, instrumentationLibName // metric measure slice could include multiple metric measures metricSlice := []map[string]string{metricMeasure} - // Retrieve data points - var dps DataPoints - switch metric.DataType() { - case pdata.MetricDataTypeIntGauge: - dps = IntDataPointSlice{metric.IntGauge().DataPoints()} - case pdata.MetricDataTypeDoubleGauge: - dps = DoubleDataPointSlice{metric.DoubleGauge().DataPoints()} - case pdata.MetricDataTypeIntSum: - dps = IntDataPointSlice{metric.IntSum().DataPoints()} - case pdata.MetricDataTypeDoubleSum: - dps = DoubleDataPointSlice{metric.DoubleSum().DataPoints()} - case pdata.MetricDataTypeDoubleHistogram: - dps = DoubleHistogramDataPointSlice{metric.DoubleHistogram().DataPoints()} - case pdata.MetricDataTypeDoubleSummary: - dps = DoubleSummaryDataPointSlice{metric.DoubleSummary().DataPoints()} - default: - config.logger.Warn( - "Unhandled metric data type.", - zap.String("DataType", metric.DataType().String()), - zap.String("Name", metric.Name()), - zap.String("Unit", metric.Unit()), - ) - return - } - - if dps.Len() == 0 { - return - } for m := 0; m < dps.Len(); m++ { dp := dps.At(m) - cwMetric := buildCWMetric(dp, metric, namespace, metricSlice, instrumentationLibName, config) + cwMetric := buildCWMetric(dp, metric, metadata.Namespace, metricSlice, instrumentationLibName, config) if cwMetric != nil { cwMetrics = append(cwMetrics, cwMetric) } @@ -256,19 +169,19 @@ func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlic dimensionRollupOption := config.DimensionRollupOption metricDeclarations := config.MetricDeclarations - labelsMap := dp.LabelsMap() - labelsSlice := make([]string, labelsMap.Len(), labelsMap.Len()+1) + labelsMap := dp.Labels + labelsSlice := make([]string, len(labelsMap), len(labelsMap)+1) // `labels` contains label key/value pairs - labels := make(map[string]string, labelsMap.Len()+1) + labels := make(map[string]string, len(labelsMap)+1) // `fields` contains metric and dimensions key/value pairs - fields := make(map[string]interface{}, labelsMap.Len()+2) + fields := make(map[string]interface{}, len(labelsMap)+2) idx := 0 - labelsMap.ForEach(func(k, v string) { + for k, v := range labelsMap { fields[k] = v labels[k] = v labelsSlice[idx] = k idx++ - }) + } // Apply single/zero dimension rollup to labels rollupDimensionArray := dimensionRollup(dimensionRollupOption, labelsSlice, instrumentationLibName) @@ -319,42 +232,12 @@ func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlic } } - timestamp := time.Now().UnixNano() / int64(time.Millisecond) - - // Extract metric - var metricVal interface{} - switch metric := dp.(type) { - case pdata.IntDataPoint: - // Put a fake but identical metric value here in order to add metric name into fields - // since calculateRate() needs metric name as one of metric identifiers - fields[pmd.Name()] = float64(FakeMetricValue) - metricVal = metric.Value() - if needsCalculateRate(pmd) { - metricVal = calculateRate(fields, float64(metric.Value()), timestamp) - } - case pdata.DoubleDataPoint: - fields[pmd.Name()] = float64(FakeMetricValue) - metricVal = metric.Value() - if needsCalculateRate(pmd) { - metricVal = calculateRate(fields, metric.Value(), timestamp) - } - case pdata.DoubleHistogramDataPoint: - metricVal = &CWMetricStats{ - Count: metric.Count(), - Sum: metric.Sum(), - } - case pdata.DoubleSummaryDataPoint: - metricStat := &CWMetricStats{ - Count: metric.Count(), - Sum: metric.Sum(), - } - quantileValues := metric.QuantileValues() - if quantileValues.Len() > 0 { - metricStat.Min = quantileValues.At(0).Value() - metricStat.Max = quantileValues.At(quantileValues.Len() - 1).Value() - } - metricVal = metricStat + timestampMs := time.Now().UnixNano() / int64(time.Millisecond) + if dp.TimestampMs > 0 { + timestampMs = dp.TimestampMs } + + metricVal := dp.Value if metricVal == nil { return nil } @@ -362,58 +245,12 @@ func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlic cwMetric := &CWMetrics{ Measurements: cwMeasurements, - Timestamp: timestamp, + TimestampMs: timestampMs, Fields: fields, } return cwMetric } -// rate is calculated by valDelta / timeDelta -func calculateRate(fields map[string]interface{}, val float64, timestamp int64) float64 { - keys := make([]string, 0, len(fields)) - var b bytes.Buffer - var metricRate float64 - // hash the key of str: metric + dimension key/value pairs (sorted alpha) - for k := range fields { - keys = append(keys, k) - } - sort.Strings(keys) - for _, k := range keys { - switch v := fields[k].(type) { - case float64: - b.WriteString(k) - continue - case string: - b.WriteString(k) - b.WriteString(v) - default: - continue - } - } - h := sha1.New() // #nosec - h.Write(b.Bytes()) - bs := h.Sum(nil) - hashStr := string(bs) - - // get previous Metric content from map. Need to lock the map until set the new state - currentState.Lock() - if state, ok := currentState.Get(hashStr); ok { - prevStats := state.(*rateState) - deltaTime := timestamp - prevStats.timestamp - deltaVal := val - prevStats.value - if deltaTime > MinTimeDiff.Milliseconds() && deltaVal >= 0 { - metricRate = deltaVal * 1e3 / float64(deltaTime) - } - } - content := &rateState{ - value: val, - timestamp: timestamp, - } - currentState.Set(hashStr, content) - currentState.Unlock() - return metricRate -} - // dimensionRollup creates rolled-up dimensions from the metric's label set. func dimensionRollup(dimensionRollupOption string, originalDimensionSlice []string, instrumentationLibName string) [][]string { var rollupDimensionArray [][]string @@ -437,16 +274,3 @@ func dimensionRollup(dimensionRollupOption string, originalDimensionSlice []stri return rollupDimensionArray } -func needsCalculateRate(pmd *pdata.Metric) bool { - switch pmd.DataType() { - case pdata.MetricDataTypeIntSum: - if pmd.IntSum().AggregationTemporality() == pdata.AggregationTemporalityCumulative { - return true - } - case pdata.MetricDataTypeDoubleSum: - if pmd.DoubleSum().AggregationTemporality() == pdata.AggregationTemporalityCumulative { - return true - } - } - return false -} diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 458233796620..caf6f6bb2c7a 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -578,7 +578,7 @@ func TestTranslateOtToCWMetricWithInstrLibrary(t *testing.T) { } assertCwMeasurementEqual(t, expectedMeasurement, met.Measurements[0]) - assert.Equal(t, int64(1), cwm[1].Fields["spanGaugeCounter"]) + assert.Equal(t, float64(1), cwm[1].Fields["spanGaugeCounter"]) assert.Equal(t, float64(0), cwm[2].Fields["spanDoubleCounter"]) assert.Equal(t, 0.1, cwm[3].Fields["spanGaugeDoubleCounter"]) expectedCwStats := &CWMetricStats{ @@ -645,7 +645,7 @@ func TestTranslateOtToCWMetricWithoutInstrLibrary(t *testing.T) { } assertCwMeasurementEqual(t, expectedMeasurement, met.Measurements[0]) - assert.Equal(t, int64(1), cwm[1].Fields["spanGaugeCounter"]) + assert.Equal(t, float64(1), cwm[1].Fields["spanGaugeCounter"]) assert.Equal(t, float64(0), cwm[2].Fields["spanDoubleCounter"]) assert.Equal(t, 0.1, cwm[3].Fields["spanGaugeDoubleCounter"]) expectedCwStats := &CWMetricStats{ @@ -998,7 +998,7 @@ func TestTranslateCWMetricToEMF(t *testing.T) { fields["spanCounter"] = 0 met := &CWMetrics{ - Timestamp: timestamp, + TimestampMs: timestamp, Fields: fields, Measurements: []CwMeasurement{cwMeasurement}, } @@ -1016,7 +1016,7 @@ func TestTranslateCWMetricToEMFNoMeasurements(t *testing.T) { fields["spanCounter"] = 0 met := &CWMetrics{ - Timestamp: timestamp, + TimestampMs: timestamp, Fields: fields, Measurements: nil, } @@ -1107,7 +1107,7 @@ func TestGetCWMetrics(t *testing.T) { }, Fields: map[string]interface{}{ OTelLib: instrumentationLibName, - "foo": int64(1), + "foo": float64(1), "label1": "value1", "label2": "value2", }, @@ -1126,7 +1126,7 @@ func TestGetCWMetrics(t *testing.T) { }, Fields: map[string]interface{}{ OTelLib: instrumentationLibName, - "foo": int64(3), + "foo": float64(3), "label2": "value2", }, }, @@ -1647,6 +1647,14 @@ func TestGetCWMetrics(t *testing.T) { }, } + metadata := CWMetricMetadata{ + Namespace: "Namespace", + TimestampMs: time.Now().UnixNano() / int64(time.Millisecond), + LogGroup: "log-group", + LogStream: "log-stream", + InstrumentationLibraryName: "cloudwatch-otel", + } + for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { oc := consumerdata.MetricsData{ @@ -1669,7 +1677,7 @@ func TestGetCWMetrics(t *testing.T) { assert.Equal(t, 1, metrics.Len()) metric := metrics.At(0) - cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, config) + cwMetrics := getCWMetrics(&metric, metadata, instrumentationLibName, config) assert.Equal(t, len(tc.expected), len(cwMetrics)) for i, expected := range tc.expected { @@ -1696,7 +1704,7 @@ func TestGetCWMetrics(t *testing.T) { logger: zap.New(obs), } - cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, obsConfig) + cwMetrics := getCWMetrics(&metric, metadata, instrumentationLibName, obsConfig) assert.Nil(t, cwMetrics) // Test output warning logs @@ -1715,7 +1723,7 @@ func TestGetCWMetrics(t *testing.T) { }) t.Run("Nil metric", func(t *testing.T) { - cwMetrics := getCWMetrics(nil, namespace, instrumentationLibName, config) + cwMetrics := getCWMetrics(nil, metadata, instrumentationLibName, config) assert.Nil(t, cwMetrics) }) } @@ -1741,11 +1749,11 @@ func TestBuildCWMetric(t *testing.T) { t.Run("Int gauge", func(t *testing.T) { metric.SetDataType(pdata.MetricDataTypeIntGauge) - dp := pdata.NewIntDataPoint() - dp.LabelsMap().InitFromMap(map[string]string{ + dp := DataPoint{} + dp.Value = int64(-17) + dp.Labels = map[string]string{ "label1": "value1", - }) - dp.SetValue(int64(-17)) + } cwMetric := buildCWMetric(dp, &metric, namespace, metricSlice, instrLibName, config) @@ -1767,11 +1775,12 @@ func TestBuildCWMetric(t *testing.T) { t.Run("Double gauge", func(t *testing.T) { metric.SetDataType(pdata.MetricDataTypeDoubleGauge) - dp := pdata.NewDoubleDataPoint() - dp.LabelsMap().InitFromMap(map[string]string{ + + dp := DataPoint{} + dp.Value = 0.3 + dp.Labels = map[string]string{ "label1": "value1", - }) - dp.SetValue(0.3) + } cwMetric := buildCWMetric(dp, &metric, namespace, metricSlice, instrLibName, config) @@ -1794,11 +1803,12 @@ func TestBuildCWMetric(t *testing.T) { t.Run("Int sum", func(t *testing.T) { metric.SetDataType(pdata.MetricDataTypeIntSum) metric.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) - dp := pdata.NewIntDataPoint() - dp.LabelsMap().InitFromMap(map[string]string{ + + dp := DataPoint{} + dp.Value = float64(17) + dp.Labels = map[string]string{ "label1": "value1", - }) - dp.SetValue(int64(-17)) + } cwMetric := buildCWMetric(dp, &metric, namespace, metricSlice, instrLibName, config) @@ -1812,7 +1822,7 @@ func TestBuildCWMetric(t *testing.T) { assertCwMeasurementEqual(t, expectedMeasurement, cwMetric.Measurements[0]) expectedFields := map[string]interface{}{ OTelLib: instrLibName, - "foo": float64(0), + "foo": float64(17), "label1": "value1", } assert.Equal(t, expectedFields, cwMetric.Fields) @@ -1821,11 +1831,12 @@ func TestBuildCWMetric(t *testing.T) { t.Run("Double sum", func(t *testing.T) { metric.SetDataType(pdata.MetricDataTypeDoubleSum) metric.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) - dp := pdata.NewDoubleDataPoint() - dp.LabelsMap().InitFromMap(map[string]string{ + + dp := DataPoint{} + dp.Value = 0.3 + dp.Labels = map[string]string{ "label1": "value1", - }) - dp.SetValue(0.3) + } cwMetric := buildCWMetric(dp, &metric, namespace, metricSlice, instrLibName, config) @@ -1839,7 +1850,7 @@ func TestBuildCWMetric(t *testing.T) { assertCwMeasurementEqual(t, expectedMeasurement, cwMetric.Measurements[0]) expectedFields := map[string]interface{}{ OTelLib: instrLibName, - "foo": float64(0), + "foo": float64(0.3), "label1": "value1", } assert.Equal(t, expectedFields, cwMetric.Fields) @@ -1847,14 +1858,17 @@ func TestBuildCWMetric(t *testing.T) { t.Run("Double histogram", func(t *testing.T) { metric.SetDataType(pdata.MetricDataTypeDoubleHistogram) - dp := pdata.NewDoubleHistogramDataPoint() - dp.LabelsMap().InitFromMap(map[string]string{ + + cWMetricStats := &CWMetricStats{ + Count: uint64(17), + Sum: 17.13, + } + + dp := DataPoint{} + dp.Value = cWMetricStats + dp.Labels = map[string]string{ "label1": "value1", - }) - dp.SetCount(uint64(17)) - dp.SetSum(17.13) - dp.SetBucketCounts([]uint64{1, 2, 3}) - dp.SetExplicitBounds([]float64{1, 2, 3}) + } cwMetric := buildCWMetric(dp, &metric, namespace, metricSlice, instrLibName, config) @@ -1879,7 +1893,7 @@ func TestBuildCWMetric(t *testing.T) { t.Run("Invalid datapoint type", func(t *testing.T) { metric.SetDataType(pdata.MetricDataTypeIntGauge) - dp := pdata.NewIntHistogramDataPoint() + dp := DataPoint{} cwMetric := buildCWMetric(dp, &metric, namespace, metricSlice, instrLibName, config) assert.Nil(t, cwMetric) @@ -1949,9 +1963,10 @@ func TestBuildCWMetric(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - dp := pdata.NewIntDataPoint() - dp.LabelsMap().InitFromMap(tc.labels) - dp.SetValue(int64(-17)) + dp := DataPoint{} + dp.Value = int64(-17) + dp.Labels = tc.labels + config = &Config{ Namespace: namespace, DimensionRollupOption: tc.dimensionRollupOption, @@ -2272,9 +2287,9 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - dp := pdata.NewIntDataPoint() - dp.LabelsMap().InitFromMap(tc.labels) - dp.SetValue(metricValue) + dp := DataPoint{} + dp.Labels = tc.labels + dp.Value = metricValue config := &Config{ Namespace: namespace, DimensionRollupOption: tc.dimensionRollupOption, @@ -2315,30 +2330,6 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { } } -func TestCalculateRate(t *testing.T) { - prevValue := int64(0) - curValue := int64(10) - fields := make(map[string]interface{}) - fields[OTellibDimensionKey] = "cloudwatch-otel" - fields["spanName"] = "test" - fields["spanCounter"] = prevValue - fields["type"] = "Int64" - prevTime := time.Now().UnixNano() / int64(time.Millisecond) - curTime := time.Unix(0, prevTime*int64(time.Millisecond)).Add(time.Second*10).UnixNano() / int64(time.Millisecond) - rate := calculateRate(fields, float64(prevValue), prevTime) - assert.Equal(t, float64(0), rate) - rate = calculateRate(fields, float64(curValue), curTime) - assert.Equal(t, float64(1), rate) - - prevDoubleValue := 0.0 - curDoubleValue := 5.0 - fields["type"] = "Float64" - rate = calculateRate(fields, prevDoubleValue, prevTime) - assert.Equal(t, float64(0), rate) - rate = calculateRate(fields, curDoubleValue, curTime) - assert.Equal(t, 0.5, rate) -} - func TestDimensionRollup(t *testing.T) { testCases := []struct { testName string @@ -2446,31 +2437,6 @@ func TestDimensionRollup(t *testing.T) { } } -func TestNeedsCalculateRate(t *testing.T) { - metric := pdata.NewMetric() - metric.SetDataType(pdata.MetricDataTypeIntGauge) - assert.False(t, needsCalculateRate(&metric)) - metric.SetDataType(pdata.MetricDataTypeDoubleGauge) - assert.False(t, needsCalculateRate(&metric)) - - metric.SetDataType(pdata.MetricDataTypeIntHistogram) - assert.False(t, needsCalculateRate(&metric)) - metric.SetDataType(pdata.MetricDataTypeDoubleHistogram) - assert.False(t, needsCalculateRate(&metric)) - - metric.SetDataType(pdata.MetricDataTypeIntSum) - metric.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) - assert.True(t, needsCalculateRate(&metric)) - metric.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) - assert.False(t, needsCalculateRate(&metric)) - - metric.SetDataType(pdata.MetricDataTypeDoubleSum) - metric.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) - assert.True(t, needsCalculateRate(&metric)) - metric.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) - assert.False(t, needsCalculateRate(&metric)) -} - func BenchmarkTranslateOtToCWMetricWithInstrLibrary(b *testing.B) { md := createMetricTestData() rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0) @@ -2542,7 +2508,7 @@ func BenchmarkTranslateCWMetricToEMF(b *testing.B) { fields["spanCounter"] = 0 met := &CWMetrics{ - Timestamp: timestamp, + TimestampMs: timestamp, Fields: fields, Measurements: []CwMeasurement{cwMeasurement}, } diff --git a/exporter/awsemfexporter/util.go b/exporter/awsemfexporter/util.go index 7e567e43c184..ea009bf48443 100644 --- a/exporter/awsemfexporter/util.go +++ b/exporter/awsemfexporter/util.go @@ -15,7 +15,12 @@ package awsemfexporter import ( + "fmt" + "sort" "strings" + "time" + + "go.opentelemetry.io/collector/translator/conventions" "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" @@ -55,3 +60,72 @@ func replace(s, pattern string, value pdata.AttributeValue, logger *zap.Logger) } return strings.Replace(s, pattern, value.StringVal(), -1) } + +// getNamespace retrieves namespace for given set of metrics from user config. +func getNamespace(rm *pdata.ResourceMetrics, namespace string) string { + if len(namespace) == 0 { + serviceName, svcNameOk := rm.Resource().Attributes().Get(conventions.AttributeServiceName) + serviceNamespace, svcNsOk := rm.Resource().Attributes().Get(conventions.AttributeServiceNamespace) + if svcNameOk && svcNsOk && serviceName.Type() == pdata.AttributeValueSTRING && serviceNamespace.Type() == pdata.AttributeValueSTRING { + namespace = fmt.Sprintf("%s/%s", serviceNamespace.StringVal(), serviceName.StringVal()) + } else if svcNameOk && serviceName.Type() == pdata.AttributeValueSTRING { + namespace = serviceName.StringVal() + } else if svcNsOk && serviceNamespace.Type() == pdata.AttributeValueSTRING { + namespace = serviceNamespace.StringVal() + } + } + + if len(namespace) == 0 { + namespace = defaultNamespace + } + return namespace +} + +// getLogInfo retrieves the log group and log stream names from a given set of metrics. +func getLogInfo(rm *pdata.ResourceMetrics, cWNamespace string, config *Config) (logGroup, logStream string) { + if cWNamespace != "" { + logGroup = fmt.Sprintf("/metrics/%s", cWNamespace) + } + + // Override log group/stream if specified in config. However, in this case, customer won't have correlation experience + if len(config.LogGroupName) > 0 { + logGroup = replacePatterns(config.LogGroupName, rm.Resource().Attributes(), config.logger) + } + if len(config.LogStreamName) > 0 { + logStream = replacePatterns(config.LogStreamName, rm.Resource().Attributes(), config.logger) + } + + return +} + +// createMetricKey generates a hashed key from metric labels and additional parameters +func createMetricKey(labels map[string]string, parameters map[string]string) string { + var sb strings.Builder + keys := make([]string, 0, len(labels)+len(parameters)) + values := make(map[string]string, len(labels)+len(parameters)) + + for k, v := range labels { + keys = append(keys, k) + values[k] = v + } + for k, v := range parameters { + keys = append(keys, k) + values[k] = v + } + + sort.Strings(keys) + for i, key := range keys { + keyValuePair := key + ":" + values[key] + sb.WriteString(keyValuePair) + if i < len(keys)-1 { + sb.WriteString(",") + } + } + + return sb.String() +} + +// unixNanoToMilliseconds converts a timestamp in nanoseconds to milliseconds. +func unixNanoToMilliseconds(timestamp pdata.TimestampUnixNano) int64 { + return int64(uint64(timestamp) / uint64(time.Millisecond)) +} diff --git a/exporter/awsemfexporter/util_test.go b/exporter/awsemfexporter/util_test.go index cc122a3895f2..e5c76e493a2e 100644 --- a/exporter/awsemfexporter/util_test.go +++ b/exporter/awsemfexporter/util_test.go @@ -15,6 +15,11 @@ package awsemfexporter import ( + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/translator/conventions" + "go.opentelemetry.io/collector/translator/internaldata" "testing" "github.com/stretchr/testify/assert" @@ -111,3 +116,204 @@ func TestReplacePatternNilAttrValue(t *testing.T) { assert.Equal(t, "/aws/ecs/containerinsights/undefined/performance", s) } + +func TestGetNamespace(t *testing.T) { + defaultMetric := createMetricTestData() + testCases := []struct { + testName string + metric consumerdata.MetricsData + configNamespace string + namespace string + }{ + { + "non-empty namespace", + defaultMetric, + "namespace", + "namespace", + }, + { + "empty namespace", + defaultMetric, + "", + "myServiceNS/myServiceName", + }, + { + "empty namespace, no service namespace", + consumerdata.MetricsData{ + Resource: &resourcepb.Resource{ + Labels: map[string]string{ + conventions.AttributeServiceName: "myServiceName", + }, + }, + }, + "", + "myServiceName", + }, + { + "empty namespace, no service name", + consumerdata.MetricsData{ + Resource: &resourcepb.Resource{ + Labels: map[string]string{ + conventions.AttributeServiceNamespace: "myServiceNS", + }, + }, + }, + "", + "myServiceNS", + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + rms := internaldata.OCToMetrics(tc.metric) + rm := rms.ResourceMetrics().At(0) + namespace := getNamespace(&rm, tc.configNamespace) + assert.Equal(t, tc.namespace, namespace) + }) + } +} + +func TestGetLogInfo(t *testing.T) { + metric := consumerdata.MetricsData{ + Node: &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: "test-emf"}, + LibraryInfo: &commonpb.LibraryInfo{ExporterVersion: "SomeVersion"}, + }, + Resource: &resourcepb.Resource{ + Labels: map[string]string{ + "aws.ecs.cluster.name": "test-cluster-name", + "aws.ecs.task.id": "test-task-id", + }, + }, + } + rm := internaldata.OCToMetrics(metric).ResourceMetrics().At(0) + + testCases := []struct { + testName string + namespace string + configLogGroup string + configLogStream string + logGroup string + logStream string + }{ + { + "non-empty namespace, no config", + "namespace", + "", + "", + "/metrics/namespace", + "", + }, + { + "empty namespace, no config", + "", + "", + "", + "", + "", + }, + { + "non-empty namespace, config w/o pattern", + "namespace", + "test-logGroupName", + "test-logStreamName", + "test-logGroupName", + "test-logStreamName", + }, + { + "empty namespace, config w/o pattern", + "", + "test-logGroupName", + "test-logStreamName", + "test-logGroupName", + "test-logStreamName", + }, + { + "non-empty namespace, config w/ pattern", + "namespace", + "/aws/ecs/containerinsights/{ClusterName}/performance", + "{TaskId}", + "/aws/ecs/containerinsights/test-cluster-name/performance", + "test-task-id", + }, + { + "empty namespace, config w/ pattern", + "", + "/aws/ecs/containerinsights/{ClusterName}/performance", + "{TaskId}", + "/aws/ecs/containerinsights/test-cluster-name/performance", + "test-task-id", + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + config := &Config{ + LogGroupName: tc.configLogGroup, + LogStreamName: tc.configLogStream, + } + logGroup, logStream := getLogInfo(&rm, tc.namespace, config) + assert.Equal(t, tc.logGroup, logGroup) + assert.Equal(t, tc.logStream, logStream) + }) + } +} + +func TestCreateMetricKey(t *testing.T) { + testCases := []struct { + testName string + labels map[string]string + params map[string]string + expectedKey string + }{ + { + "single label w/o params", + map[string]string{ + "a": "A", + }, + nil, + "a:A", + }, + { + "single label w/ params", + map[string]string{ + "a": "A", + }, + map[string]string{ + "param1": "foo", + }, + "a:A,param1:foo", + }, + { + "multiple labels w/o params", + map[string]string{ + "b": "B", + "a": "A", + "c": "C", + }, + nil, + "a:A,b:B,c:C", + }, + { + "multiple labels w/ params", + map[string]string{ + "b": "B", + "a": "A", + "c": "C", + }, + map[string]string{ + "param1": "foo", + "bar": "car", + "apple": "banana", + }, + "a:A,apple:banana,b:B,bar:car,c:C,param1:foo", + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + key := createMetricKey(tc.labels, tc.params) + assert.Equal(t, tc.expectedKey, key) + }) + } +} \ No newline at end of file From 4e957041147846affc5cb969335056781750dff7 Mon Sep 17 00:00:00 2001 From: xiami Date: Wed, 3 Feb 2021 19:22:42 -0800 Subject: [PATCH 2/4] Import `label.Distinct` for sorting the metric lebels --- exporter/awsemfexporter/datapoint.go | 49 +++-- exporter/awsemfexporter/datapoint_test.go | 171 ++++++++++++++---- exporter/awsemfexporter/go.mod | 1 + .../mapwithexpiry/map_with_expiry.go | 8 +- exporter/awsemfexporter/metric_translator.go | 25 ++- .../awsemfexporter/metric_translator_test.go | 124 ++++++------- exporter/awsemfexporter/util.go | 28 --- exporter/awsemfexporter/util_test.go | 62 +------ 8 files changed, 255 insertions(+), 213 deletions(-) diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go index 33fd9d8d55f7..c2aeafdf2761 100644 --- a/exporter/awsemfexporter/datapoint.go +++ b/exporter/awsemfexporter/datapoint.go @@ -17,6 +17,8 @@ package awsemfexporter import ( "time" + "go.opentelemetry.io/otel/label" + "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" @@ -24,8 +26,8 @@ import ( ) const ( - CleanInterval = 5 * time.Minute - MinTimeDiff = 50 * time.Millisecond // We assume 50 milli-seconds is the minimal gap between two collected data sample to be valid to calculate delta + cleanInterval = 5 * time.Minute + minTimeDiff = 50 * time.Millisecond // We assume 50 milli-seconds is the minimal gap between two collected data sample to be valid to calculate delta namespaceKey = "CloudWatchNamespace" metricNameKey = "CloudWatchMetricName" @@ -33,7 +35,7 @@ const ( logStreamKey = "CloudWatchLogStream" ) -var currentState = mapwithexpiry.NewMapWithExpiry(CleanInterval) +var currentState = mapwithexpiry.NewMapWithExpiry(cleanInterval) // DataPoint represents a processed metric data point type DataPoint struct { @@ -57,10 +59,18 @@ type DataPoints interface { // rateCalculationMetadata contains the metadata required to perform rate calculation type rateCalculationMetadata struct { needsCalculateRate bool - rateKeyParams map[string]string + rateKeyParams rateKeyParams timestampMs int64 } +type rateKeyParams struct { + namespaceKey string + metricNameKey string + logGroupKey string + logStreamKey string + labels label.Distinct +} + // rateState stores a metric's value type rateState struct { value float64 @@ -96,13 +106,15 @@ type DoubleSummaryDataPointSlice struct { // At retrieves the IntDataPoint at the given index and performs rate calculation if necessary. func (dps IntDataPointSlice) At(i int) DataPoint { metric := dps.IntDataPointSlice.At(i) - labels := createLabels(metric.LabelsMap()) timestampMs := unixNanoToMilliseconds(metric.Timestamp()) + labels := createLabels(metric.LabelsMap()) var metricVal float64 metricVal = float64(metric.Value()) if dps.needsCalculateRate { - rateKey := createMetricKey(labels, dps.rateKeyParams) + sortedLabels := getSortedLabels(metric.LabelsMap()) + dps.rateKeyParams.labels = sortedLabels + rateKey := dps.rateKeyParams rateTS := dps.timestampMs if timestampMs > 0 { // Use metric timestamp if available @@ -127,7 +139,9 @@ func (dps DoubleDataPointSlice) At(i int) DataPoint { var metricVal float64 metricVal = metric.Value() if dps.needsCalculateRate { - rateKey := createMetricKey(labels, dps.rateKeyParams) + sortedLabels := getSortedLabels(metric.LabelsMap()) + dps.rateKeyParams.labels = sortedLabels + rateKey := dps.rateKeyParams rateTS := dps.timestampMs if timestampMs > 0 { // Use metric timestamp if available @@ -181,8 +195,7 @@ func (dps DoubleSummaryDataPointSlice) At(i int) DataPoint { } } -// createLabels converts OTel StringMap labels to a map and optionally adds in the -// OTel instrumentation library name +// createLabels converts OTel StringMap labels to a map func createLabels(labelsMap pdata.StringMap) map[string]string { labels := make(map[string]string, labelsMap.Len()+1) labelsMap.ForEach(func(k, v string) { @@ -192,8 +205,20 @@ func createLabels(labelsMap pdata.StringMap) map[string]string { return labels } +// getSortedLabels converts OTel StringMap labels to sorted labels as label.Distinct +func getSortedLabels(labelsMap pdata.StringMap) label.Distinct { + var kvs []label.KeyValue + var sortable label.Sortable + labelsMap.ForEach(func(k, v string) { + kvs = append(kvs, label.String(k, v)) + }) + set := label.NewSetWithSortable(kvs, &sortable) + + return set.Equivalent() +} + // calculateRate calculates the metric value's rate of change using valDelta / timeDelta. -func calculateRate(metricKey string, val float64, timestampMs int64) float64 { +func calculateRate(metricKey interface{}, val float64, timestampMs int64) float64 { var metricRate float64 // get previous Metric content from map. Need to lock the map until set the new state currentState.Lock() @@ -202,7 +227,7 @@ func calculateRate(metricKey string, val float64, timestampMs int64) float64 { deltaTime := timestampMs - prevStats.timestampMs deltaVal := val - prevStats.value - if deltaTime > MinTimeDiff.Milliseconds() && deltaVal >= 0 { + if deltaTime > minTimeDiff.Milliseconds() && deltaVal >= 0 { metricRate = deltaVal * 1e3 / float64(deltaTime) } } @@ -221,7 +246,7 @@ func getDataPoints(pmd *pdata.Metric, metadata CWMetricMetadata, logger *zap.Log return } - rateKeyParams := map[string]string{ + rateKeyParams := rateKeyParams{ namespaceKey: metadata.Namespace, metricNameKey: pmd.Name(), logGroupKey: metadata.LogGroup, diff --git a/exporter/awsemfexporter/datapoint_test.go b/exporter/awsemfexporter/datapoint_test.go index a9981e9a1518..9b06c5154d60 100644 --- a/exporter/awsemfexporter/datapoint_test.go +++ b/exporter/awsemfexporter/datapoint_test.go @@ -245,11 +245,11 @@ func TestIntDataPointSliceAt(t *testing.T) { timestamp := time.Now().UnixNano() / int64(time.Millisecond) instrLibName := "cloudwatch-otel" labels := map[string]string{"label1": "value1"} - rateKeyParams := map[string]string{ - (namespaceKey): "namespace", - (metricNameKey): "foo", - (logGroupKey): "log-group", - (logStreamKey): "log-stream", + rateKeyParams := rateKeyParams{ + namespaceKey: "namespace", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", } testCases := []struct { @@ -290,7 +290,7 @@ func TestIntDataPointSliceAt(t *testing.T) { expectedDP := DataPoint{ Value: tc.value, Labels: map[string]string{ - "label1": "value1", + "label1": "value1", }, } @@ -305,11 +305,11 @@ func TestDoubleDataPointSliceAt(t *testing.T) { timestamp := time.Now().UnixNano() / int64(time.Millisecond) instrLibName := "cloudwatch-otel" labels := map[string]string{"label1": "value1"} - rateKeyParams := map[string]string{ - (namespaceKey): "namespace", - (metricNameKey): "foo", - (logGroupKey): "log-group", - (logStreamKey): "log-stream", + rateKeyParams := rateKeyParams{ + namespaceKey: "namespace", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", } testCases := []struct { @@ -350,7 +350,7 @@ func TestDoubleDataPointSliceAt(t *testing.T) { expectedDP := DataPoint{ Value: tc.value, Labels: map[string]string{ - "label1": "value1", + "label1": "value1", }, } @@ -385,7 +385,7 @@ func TestDoubleHistogramDataPointSliceAt(t *testing.T) { Count: 17, }, Labels: map[string]string{ - "label1": "value1", + "label1": "value1", }, } @@ -425,7 +425,7 @@ func TestDoubleSummaryDataPointSliceAt(t *testing.T) { Sum: 17.13, }, Labels: map[string]string{ - "label1": "value1", + "label1": "value1", }, } @@ -502,11 +502,11 @@ func TestGetDataPoints(t *testing.T) { metadata.InstrumentationLibraryName, rateCalculationMetadata{ false, - map[string]string{ - (namespaceKey): metadata.Namespace, - (metricNameKey): "foo", - (logGroupKey): metadata.LogGroup, - (logStreamKey): metadata.LogStream, + rateKeyParams{ + namespaceKey: metadata.Namespace, + metricNameKey: "foo", + logGroupKey: metadata.LogGroup, + logStreamKey: metadata.LogStream, }, metadata.TimestampMs, }, @@ -520,11 +520,11 @@ func TestGetDataPoints(t *testing.T) { metadata.InstrumentationLibraryName, rateCalculationMetadata{ false, - map[string]string{ - (namespaceKey): metadata.Namespace, - (metricNameKey): "foo", - (logGroupKey): metadata.LogGroup, - (logStreamKey): metadata.LogStream, + rateKeyParams{ + namespaceKey: metadata.Namespace, + metricNameKey: "foo", + logGroupKey: metadata.LogGroup, + logStreamKey: metadata.LogStream, }, metadata.TimestampMs, }, @@ -538,11 +538,11 @@ func TestGetDataPoints(t *testing.T) { metadata.InstrumentationLibraryName, rateCalculationMetadata{ true, - map[string]string{ - (namespaceKey): metadata.Namespace, - (metricNameKey): "foo", - (logGroupKey): metadata.LogGroup, - (logStreamKey): metadata.LogStream, + rateKeyParams{ + namespaceKey: metadata.Namespace, + metricNameKey: "foo", + logGroupKey: metadata.LogGroup, + logStreamKey: metadata.LogStream, }, metadata.TimestampMs, }, @@ -556,11 +556,11 @@ func TestGetDataPoints(t *testing.T) { metadata.InstrumentationLibraryName, rateCalculationMetadata{ true, - map[string]string{ - (namespaceKey): metadata.Namespace, - (metricNameKey): "foo", - (logGroupKey): metadata.LogGroup, - (logStreamKey): metadata.LogStream, + rateKeyParams{ + namespaceKey: metadata.Namespace, + metricNameKey: "foo", + logGroupKey: metadata.LogGroup, + logStreamKey: metadata.LogStream, }, metadata.TimestampMs, }, @@ -706,3 +706,106 @@ func BenchmarkGetDataPoints(b *testing.B) { } } } + +func TestGetSortedLabelsEquals(t *testing.T) { + labelMap1 := pdata.NewStringMap() + labelMap1.Insert("k1", "v1") + labelMap1.Insert("k2", "v2") + + labelMap2 := pdata.NewStringMap() + labelMap2.Insert("k2", "v2") + labelMap2.Insert("k1", "v1") + + sortedLabels1 := getSortedLabels(labelMap1) + sortedLabels2 := getSortedLabels(labelMap2) + + rateKeyParams1 := rateKeyParams{ + namespaceKey: "namespace", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", + labels: sortedLabels1, + } + rateKeyParams2 := rateKeyParams{ + namespaceKey: "namespace", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", + labels: sortedLabels2, + } + assert.Equal(t, rateKeyParams1, rateKeyParams2) +} + +func TestGetSortedLabelsNotEqual(t *testing.T) { + labelMap1 := pdata.NewStringMap() + labelMap1.Insert("k1", "v1") + labelMap1.Insert("k2", "v2") + + labelMap2 := pdata.NewStringMap() + labelMap2.Insert("k2", "v2") + labelMap2.Insert("k1", "v3") + + sortedLabels1 := getSortedLabels(labelMap1) + sortedLabels2 := getSortedLabels(labelMap2) + + rateKeyParams1 := rateKeyParams{ + namespaceKey: "namespace", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", + labels: sortedLabels1, + } + rateKeyParams2 := rateKeyParams{ + namespaceKey: "namespace", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", + labels: sortedLabels2, + } + assert.NotEqual(t, rateKeyParams1, rateKeyParams2) +} + +func TestGetSortedLabelsNotEqualOnPram(t *testing.T) { + labelMap1 := pdata.NewStringMap() + labelMap1.Insert("k1", "v1") + labelMap1.Insert("k2", "v2") + + labelMap2 := pdata.NewStringMap() + labelMap2.Insert("k2", "v2") + labelMap2.Insert("k1", "v1") + + sortedLabels1 := getSortedLabels(labelMap1) + sortedLabels2 := getSortedLabels(labelMap2) + + rateKeyParams1 := rateKeyParams{ + namespaceKey: "namespaceA", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", + labels: sortedLabels1, + } + rateKeyParams2 := rateKeyParams{ + namespaceKey: "namespaceB", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", + labels: sortedLabels2, + } + assert.NotEqual(t, rateKeyParams1, rateKeyParams2) +} + +func TestGetSortedLabelsNotEqualOnEmptyLabel(t *testing.T) { + rateKeyParams1 := rateKeyParams{ + namespaceKey: "namespaceA", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", + } + rateKeyParams2 := rateKeyParams{ + namespaceKey: "namespaceA", + metricNameKey: "foo", + logGroupKey: "log-group", + logStreamKey: "log-stream", + } + assert.Equal(t, rateKeyParams1, rateKeyParams2) +} diff --git a/exporter/awsemfexporter/go.mod b/exporter/awsemfexporter/go.mod index bf5be6bfb93d..762e62dacb91 100644 --- a/exporter/awsemfexporter/go.mod +++ b/exporter/awsemfexporter/go.mod @@ -9,5 +9,6 @@ require ( github.com/google/uuid v1.2.0 github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.19.0 + go.opentelemetry.io/otel v0.16.0 go.uber.org/zap v1.16.0 ) diff --git a/exporter/awsemfexporter/mapwithexpiry/map_with_expiry.go b/exporter/awsemfexporter/mapwithexpiry/map_with_expiry.go index b9ff1064c566..142a5095784b 100644 --- a/exporter/awsemfexporter/mapwithexpiry/map_with_expiry.go +++ b/exporter/awsemfexporter/mapwithexpiry/map_with_expiry.go @@ -28,11 +28,11 @@ type mapEntry struct { type MapWithExpiry struct { lock *sync.Mutex ttl time.Duration - entries map[string]*mapEntry + entries map[interface{}]*mapEntry } func NewMapWithExpiry(ttl time.Duration) *MapWithExpiry { - return &MapWithExpiry{lock: &sync.Mutex{}, ttl: ttl, entries: make(map[string]*mapEntry)} + return &MapWithExpiry{lock: &sync.Mutex{}, ttl: ttl, entries: make(map[interface{}]*mapEntry)} } func (m *MapWithExpiry) CleanUp(now time.Time) { @@ -43,7 +43,7 @@ func (m *MapWithExpiry) CleanUp(now time.Time) { } } -func (m *MapWithExpiry) Get(key string) (interface{}, bool) { +func (m *MapWithExpiry) Get(key interface{}) (interface{}, bool) { res, ok := m.entries[key] if ok { return res.content, true @@ -51,7 +51,7 @@ func (m *MapWithExpiry) Get(key string) (interface{}, bool) { return nil, false } -func (m *MapWithExpiry) Set(key string, content interface{}) { +func (m *MapWithExpiry) Set(key interface{}, content interface{}) { m.entries[key] = &mapEntry{content: content, creation: time.Now()} } diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index d17d7acb726a..2f9352f1e1a6 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -24,7 +24,7 @@ import ( const ( // OTel instrumentation lib name as dimension - OTellibDimensionKey = "OTelLib" + oTellibDimensionKey = "OTelLib" defaultNamespace = "default" noInstrumentationLibraryName = "Undefined" @@ -32,8 +32,8 @@ const ( maximumLogEventsPerPut = 10000 // DimensionRollupOptions - ZeroAndSingleDimensionRollup = "ZeroAndSingleDimensionRollup" - SingleDimensionRollupOnly = "SingleDimensionRollupOnly" + zeroAndSingleDimensionRollup = "ZeroAndSingleDimensionRollup" + singleDimensionRollupOnly = "SingleDimensionRollupOnly" ) // CWMetrics defines @@ -188,8 +188,8 @@ func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlic // Add OTel instrumentation lib name as an additional dimension if it is defined if instrumentationLibName != noInstrumentationLibraryName { - labels[OTellibDimensionKey] = instrumentationLibName - fields[OTellibDimensionKey] = instrumentationLibName + labels[oTellibDimensionKey] = instrumentationLibName + fields[oTellibDimensionKey] = instrumentationLibName } // Create list of dimension sets @@ -204,14 +204,14 @@ func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlic if instrumentationLibName != noInstrumentationLibraryName { // If OTel instrumentation lib name is defined, add instrumentation lib // name as a dimension - dims = append(dims, OTellibDimensionKey) + dims = append(dims, oTellibDimensionKey) } if len(rollupDimensionArray) > 0 { // Perform de-duplication check for edge case with a single label and single roll-up // is activated - if len(labelsSlice) > 1 || (dimensionRollupOption != SingleDimensionRollupOnly && - dimensionRollupOption != ZeroAndSingleDimensionRollup) { + if len(labelsSlice) > 1 || (dimensionRollupOption != singleDimensionRollupOnly && + dimensionRollupOption != zeroAndSingleDimensionRollup) { dimensions = [][]string{dims} } dimensions = append(dimensions, rollupDimensionArray...) @@ -245,7 +245,7 @@ func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlic cwMetric := &CWMetrics{ Measurements: cwMeasurements, - TimestampMs: timestampMs, + TimestampMs: timestampMs, Fields: fields, } return cwMetric @@ -256,15 +256,15 @@ func dimensionRollup(dimensionRollupOption string, originalDimensionSlice []stri var rollupDimensionArray [][]string dimensionZero := make([]string, 0) if instrumentationLibName != noInstrumentationLibraryName { - dimensionZero = append(dimensionZero, OTellibDimensionKey) + dimensionZero = append(dimensionZero, oTellibDimensionKey) } - if dimensionRollupOption == ZeroAndSingleDimensionRollup { + if dimensionRollupOption == zeroAndSingleDimensionRollup { //"Zero" dimension rollup if len(originalDimensionSlice) > 0 { rollupDimensionArray = append(rollupDimensionArray, dimensionZero) } } - if dimensionRollupOption == ZeroAndSingleDimensionRollup || dimensionRollupOption == SingleDimensionRollupOnly { + if dimensionRollupOption == zeroAndSingleDimensionRollup || dimensionRollupOption == singleDimensionRollupOnly { //"One" dimension rollup for _, dimensionKey := range originalDimensionSlice { rollupDimensionArray = append(rollupDimensionArray, append(dimensionZero, dimensionKey)) @@ -273,4 +273,3 @@ func dimensionRollup(dimensionRollupOption string, originalDimensionSlice []stri return rollupDimensionArray } - diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index caf6f6bb2c7a..70c2345f2b68 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -543,7 +543,7 @@ func assertCwStatsEqual(t *testing.T, expected, actual *CWMetricStats) { func TestTranslateOtToCWMetricWithInstrLibrary(t *testing.T) { config := &Config{ Namespace: "", - DimensionRollupOption: ZeroAndSingleDimensionRollup, + DimensionRollupOption: zeroAndSingleDimensionRollup, logger: zap.NewNop(), } md := createMetricTestData() @@ -564,10 +564,10 @@ func TestTranslateOtToCWMetricWithInstrLibrary(t *testing.T) { expectedMeasurement := CwMeasurement{ Namespace: "myServiceNS/myServiceName", Dimensions: [][]string{ - {OTellibDimensionKey, "isItAnError", "spanName"}, - {OTellibDimensionKey}, - {OTellibDimensionKey, "spanName"}, - {OTellibDimensionKey, "isItAnError"}, + {oTellibDimensionKey, "isItAnError", "spanName"}, + {oTellibDimensionKey}, + {oTellibDimensionKey, "spanName"}, + {oTellibDimensionKey, "isItAnError"}, }, Metrics: []map[string]string{ { @@ -613,7 +613,7 @@ func TestTranslateOtToCWMetricWithInstrLibrary(t *testing.T) { func TestTranslateOtToCWMetricWithoutInstrLibrary(t *testing.T) { config := &Config{ Namespace: "", - DimensionRollupOption: ZeroAndSingleDimensionRollup, + DimensionRollupOption: zeroAndSingleDimensionRollup, logger: zap.NewNop(), } md := createMetricTestData() @@ -625,7 +625,7 @@ func TestTranslateOtToCWMetricWithoutInstrLibrary(t *testing.T) { assert.Equal(t, 1, len(cwm[0].Measurements)) met := cwm[0] - assert.NotContains(t, met.Fields, OTellibDimensionKey) + assert.NotContains(t, met.Fields, oTellibDimensionKey) assert.Equal(t, met.Fields["spanCounter"], float64(0)) expectedMeasurement := CwMeasurement{ @@ -677,7 +677,7 @@ func TestTranslateOtToCWMetricWithoutInstrLibrary(t *testing.T) { func TestTranslateOtToCWMetricWithNameSpace(t *testing.T) { config := &Config{ Namespace: "", - DimensionRollupOption: ZeroAndSingleDimensionRollup, + DimensionRollupOption: zeroAndSingleDimensionRollup, } md := consumerdata.MetricsData{ Node: &commonpb.Node{ @@ -853,12 +853,12 @@ func TestTranslateOtToCWMetricWithFiltering(t *testing.T) { "has match w/ Zero + Single dim rollup", []string{"spanCounter"}, nil, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ {"spanName", "isItAnError"}, - {"spanName", OTellibDimensionKey}, - {OTellibDimensionKey, "isItAnError"}, - {OTellibDimensionKey}, + {"spanName", oTellibDimensionKey}, + {oTellibDimensionKey, "isItAnError"}, + {oTellibDimensionKey}, }, 1, }, @@ -869,7 +869,7 @@ func TestTranslateOtToCWMetricWithFiltering(t *testing.T) { "", [][]string{ {"spanName", "isItAnError"}, - {"spanName", OTellibDimensionKey}, + {"spanName", oTellibDimensionKey}, }, 1, }, @@ -885,7 +885,7 @@ func TestTranslateOtToCWMetricWithFiltering(t *testing.T) { "", [][]string{ {"spanName", "isItAnError"}, - {"spanName", OTellibDimensionKey}, + {"spanName", oTellibDimensionKey}, }, 1, }, @@ -906,11 +906,11 @@ func TestTranslateOtToCWMetricWithFiltering(t *testing.T) { "No match w/ rollup", []string{"invalid"}, nil, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ - {OTellibDimensionKey, "spanName"}, - {OTellibDimensionKey, "isItAnError"}, - {OTellibDimensionKey}, + {oTellibDimensionKey, "spanName"}, + {oTellibDimensionKey, "isItAnError"}, + {oTellibDimensionKey}, }, 1, }, @@ -927,7 +927,7 @@ func TestTranslateOtToCWMetricWithFiltering(t *testing.T) { for _, tc := range testCases { m := MetricDeclaration{ - Dimensions: [][]string{{"isItAnError", "spanName"}, {"spanName", OTellibDimensionKey}}, + Dimensions: [][]string{{"isItAnError", "spanName"}, {"spanName", oTellibDimensionKey}}, MetricNameSelectors: tc.metricNameSelectors, LabelMatchers: tc.labelMatchers, } @@ -956,7 +956,7 @@ func TestTranslateOtToCWMetricWithFiltering(t *testing.T) { t.Run("No instrumentation library name w/ no dim rollup", func(t *testing.T) { rm = internaldata.OCToMetrics(md).ResourceMetrics().At(0) m := MetricDeclaration{ - Dimensions: [][]string{{"isItAnError", "spanName"}, {"spanName", OTellibDimensionKey}}, + Dimensions: [][]string{{"isItAnError", "spanName"}, {"spanName", oTellibDimensionKey}}, MetricNameSelectors: []string{"spanCounter"}, } config := &Config{ @@ -985,7 +985,7 @@ func TestTranslateOtToCWMetricWithFiltering(t *testing.T) { func TestTranslateCWMetricToEMF(t *testing.T) { cwMeasurement := CwMeasurement{ Namespace: "test-emf", - Dimensions: [][]string{{OTellibDimensionKey}, {OTellibDimensionKey, "spanName"}}, + Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, Metrics: []map[string]string{{ "Name": "spanCounter", "Unit": "Count", @@ -993,7 +993,7 @@ func TestTranslateCWMetricToEMF(t *testing.T) { } timestamp := int64(1596151098037) fields := make(map[string]interface{}) - fields[OTellibDimensionKey] = "cloudwatch-otel" + fields[oTellibDimensionKey] = "cloudwatch-otel" fields["spanName"] = "test" fields["spanCounter"] = 0 @@ -1011,7 +1011,7 @@ func TestTranslateCWMetricToEMF(t *testing.T) { func TestTranslateCWMetricToEMFNoMeasurements(t *testing.T) { timestamp := int64(1596151098037) fields := make(map[string]interface{}) - fields[OTellibDimensionKey] = "cloudwatch-otel" + fields[oTellibDimensionKey] = "cloudwatch-otel" fields["spanName"] = "test" fields["spanCounter"] = 0 @@ -1039,7 +1039,7 @@ func TestTranslateCWMetricToEMFNoMeasurements(t *testing.T) { func TestGetCWMetrics(t *testing.T) { namespace := "Namespace" - OTelLib := OTellibDimensionKey + OTelLib := oTellibDimensionKey instrumentationLibName := "InstrLibName" config := &Config{ Namespace: "", @@ -1731,7 +1731,7 @@ func TestGetCWMetrics(t *testing.T) { func TestBuildCWMetric(t *testing.T) { namespace := "Namespace" instrLibName := "InstrLibName" - OTelLib := OTellibDimensionKey + OTelLib := oTellibDimensionKey config := &Config{ Namespace: "", DimensionRollupOption: "", @@ -1917,7 +1917,7 @@ func TestBuildCWMetric(t *testing.T) { { "Single label w/ single rollup", map[string]string{"a": "foo"}, - SingleDimensionRollupOnly, + singleDimensionRollupOnly, [][]string{ {"a", OTelLib}, }, @@ -1925,7 +1925,7 @@ func TestBuildCWMetric(t *testing.T) { { "Single label w/ zero + single rollup", map[string]string{"a": "foo"}, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ {"a", OTelLib}, {OTelLib}, @@ -1950,7 +1950,7 @@ func TestBuildCWMetric(t *testing.T) { "b": "bar", "c": "car", }, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ {"a", "b", "c", OTelLib}, {OTelLib, "a"}, @@ -1973,7 +1973,7 @@ func TestBuildCWMetric(t *testing.T) { } expectedFields := map[string]interface{}{ - OTellibDimensionKey: OTelLib, + oTellibDimensionKey: OTelLib, "foo": int64(-17), } for k, v := range tc.labels { @@ -1999,7 +1999,7 @@ func TestBuildCWMetric(t *testing.T) { func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { namespace := "Namespace" - OTelLib := OTellibDimensionKey + OTelLib := oTellibDimensionKey instrumentationLibName := "cloudwatch-otel" metricName := "metric1" metricValue := int64(-17) @@ -2046,7 +2046,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { MetricNameSelectors: []string{metricName}, }, }, - SingleDimensionRollupOnly, + singleDimensionRollupOnly, [][]string{{"a"}, {"a", OTelLib}}, }, { @@ -2058,7 +2058,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { MetricNameSelectors: []string{metricName}, }, }, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{{"a"}, {"a", OTelLib}, {OTelLib}}, }, { @@ -2094,7 +2094,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { MetricNameSelectors: []string{metricName}, }, }, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ {"a"}, {OTelLib, "a"}, @@ -2135,7 +2135,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { MetricNameSelectors: []string{metricName}, }, }, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ {"a", "b"}, {"b"}, @@ -2153,7 +2153,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { MetricNameSelectors: []string{metricName}, }, }, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ {"b"}, {OTelLib, "a"}, @@ -2170,7 +2170,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { MetricNameSelectors: []string{metricName}, }, }, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ {"a", "b"}, {"b"}, @@ -2223,7 +2223,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { MetricNameSelectors: []string{metricName}, }, }, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, [][]string{ {"a", "b"}, {"b"}, @@ -2280,7 +2280,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { MetricNameSelectors: []string{metricName}, }, }, - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, nil, }, } @@ -2302,7 +2302,7 @@ func TestBuildCWMetricWithMetricDeclarations(t *testing.T) { } expectedFields := map[string]interface{}{ - OTellibDimensionKey: instrumentationLibName, + oTellibDimensionKey: instrumentationLibName, metricName: metricValue, } for k, v := range tc.labels { @@ -2354,7 +2354,7 @@ func TestDimensionRollup(t *testing.T) { }, { "single dim w/o instrumentation library name", - SingleDimensionRollupOnly, + singleDimensionRollupOnly, []string{"a", "b", "c"}, noInstrumentationLibraryName, [][]string{ @@ -2365,32 +2365,32 @@ func TestDimensionRollup(t *testing.T) { }, { "single dim w/ instrumentation library name", - SingleDimensionRollupOnly, + singleDimensionRollupOnly, []string{"a", "b", "c"}, "cloudwatch-otel", [][]string{ - {OTellibDimensionKey, "a"}, - {OTellibDimensionKey, "b"}, - {OTellibDimensionKey, "c"}, + {oTellibDimensionKey, "a"}, + {oTellibDimensionKey, "b"}, + {oTellibDimensionKey, "c"}, }, }, { "single dim w/o instrumentation library name and only one label", - SingleDimensionRollupOnly, + singleDimensionRollupOnly, []string{"a"}, noInstrumentationLibraryName, [][]string{{"a"}}, }, { "single dim w/ instrumentation library name and only one label", - SingleDimensionRollupOnly, + singleDimensionRollupOnly, []string{"a"}, "cloudwatch-otel", - [][]string{{OTellibDimensionKey, "a"}}, + [][]string{{oTellibDimensionKey, "a"}}, }, { "zero + single dim w/o instrumentation library name", - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, []string{"a", "b", "c"}, noInstrumentationLibraryName, [][]string{ @@ -2402,27 +2402,27 @@ func TestDimensionRollup(t *testing.T) { }, { "zero + single dim w/ instrumentation library name", - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, []string{"a", "b", "c", "A"}, "cloudwatch-otel", [][]string{ - {OTellibDimensionKey}, - {OTellibDimensionKey, "a"}, - {OTellibDimensionKey, "b"}, - {OTellibDimensionKey, "c"}, - {OTellibDimensionKey, "A"}, + {oTellibDimensionKey}, + {oTellibDimensionKey, "a"}, + {oTellibDimensionKey, "b"}, + {oTellibDimensionKey, "c"}, + {oTellibDimensionKey, "A"}, }, }, { "zero dim rollup w/o instrumentation library name and no labels", - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, []string{}, noInstrumentationLibraryName, nil, }, { "zero dim rollup w/ instrumentation library name and no labels", - ZeroAndSingleDimensionRollup, + zeroAndSingleDimensionRollup, []string{}, "cloudwatch-otel", nil, @@ -2445,7 +2445,7 @@ func BenchmarkTranslateOtToCWMetricWithInstrLibrary(b *testing.B) { ilm.InstrumentationLibrary().SetName("cloudwatch-lib") config := &Config{ Namespace: "", - DimensionRollupOption: ZeroAndSingleDimensionRollup, + DimensionRollupOption: zeroAndSingleDimensionRollup, } b.ResetTimer() @@ -2459,7 +2459,7 @@ func BenchmarkTranslateOtToCWMetricWithoutInstrLibrary(b *testing.B) { rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0) config := &Config{ Namespace: "", - DimensionRollupOption: ZeroAndSingleDimensionRollup, + DimensionRollupOption: zeroAndSingleDimensionRollup, } b.ResetTimer() @@ -2482,7 +2482,7 @@ func BenchmarkTranslateOtToCWMetricWithFiltering(b *testing.B) { m.Init(logger) config := &Config{ Namespace: "", - DimensionRollupOption: ZeroAndSingleDimensionRollup, + DimensionRollupOption: zeroAndSingleDimensionRollup, MetricDeclarations: []*MetricDeclaration{&m}, } @@ -2495,7 +2495,7 @@ func BenchmarkTranslateOtToCWMetricWithFiltering(b *testing.B) { func BenchmarkTranslateCWMetricToEMF(b *testing.B) { cwMeasurement := CwMeasurement{ Namespace: "test-emf", - Dimensions: [][]string{{OTellibDimensionKey}, {OTellibDimensionKey, "spanName"}}, + Dimensions: [][]string{{oTellibDimensionKey}, {oTellibDimensionKey, "spanName"}}, Metrics: []map[string]string{{ "Name": "spanCounter", "Unit": "Count", @@ -2503,7 +2503,7 @@ func BenchmarkTranslateCWMetricToEMF(b *testing.B) { } timestamp := int64(1596151098037) fields := make(map[string]interface{}) - fields[OTellibDimensionKey] = "cloudwatch-otel" + fields[oTellibDimensionKey] = "cloudwatch-otel" fields["spanName"] = "test" fields["spanCounter"] = 0 @@ -2523,6 +2523,6 @@ func BenchmarkTranslateCWMetricToEMF(b *testing.B) { func BenchmarkDimensionRollup(b *testing.B) { dimensions := []string{"a", "b", "c"} for n := 0; n < b.N; n++ { - dimensionRollup(ZeroAndSingleDimensionRollup, dimensions, "cloudwatch-otel") + dimensionRollup(zeroAndSingleDimensionRollup, dimensions, "cloudwatch-otel") } } diff --git a/exporter/awsemfexporter/util.go b/exporter/awsemfexporter/util.go index ea009bf48443..acb4895a52e2 100644 --- a/exporter/awsemfexporter/util.go +++ b/exporter/awsemfexporter/util.go @@ -16,7 +16,6 @@ package awsemfexporter import ( "fmt" - "sort" "strings" "time" @@ -98,33 +97,6 @@ func getLogInfo(rm *pdata.ResourceMetrics, cWNamespace string, config *Config) ( return } -// createMetricKey generates a hashed key from metric labels and additional parameters -func createMetricKey(labels map[string]string, parameters map[string]string) string { - var sb strings.Builder - keys := make([]string, 0, len(labels)+len(parameters)) - values := make(map[string]string, len(labels)+len(parameters)) - - for k, v := range labels { - keys = append(keys, k) - values[k] = v - } - for k, v := range parameters { - keys = append(keys, k) - values[k] = v - } - - sort.Strings(keys) - for i, key := range keys { - keyValuePair := key + ":" + values[key] - sb.WriteString(keyValuePair) - if i < len(keys)-1 { - sb.WriteString(",") - } - } - - return sb.String() -} - // unixNanoToMilliseconds converts a timestamp in nanoseconds to milliseconds. func unixNanoToMilliseconds(timestamp pdata.TimestampUnixNano) int64 { return int64(uint64(timestamp) / uint64(time.Millisecond)) diff --git a/exporter/awsemfexporter/util_test.go b/exporter/awsemfexporter/util_test.go index e5c76e493a2e..f6f3d5010742 100644 --- a/exporter/awsemfexporter/util_test.go +++ b/exporter/awsemfexporter/util_test.go @@ -15,12 +15,13 @@ package awsemfexporter import ( + "testing" + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/translator/conventions" "go.opentelemetry.io/collector/translator/internaldata" - "testing" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer/pdata" @@ -258,62 +259,3 @@ func TestGetLogInfo(t *testing.T) { }) } } - -func TestCreateMetricKey(t *testing.T) { - testCases := []struct { - testName string - labels map[string]string - params map[string]string - expectedKey string - }{ - { - "single label w/o params", - map[string]string{ - "a": "A", - }, - nil, - "a:A", - }, - { - "single label w/ params", - map[string]string{ - "a": "A", - }, - map[string]string{ - "param1": "foo", - }, - "a:A,param1:foo", - }, - { - "multiple labels w/o params", - map[string]string{ - "b": "B", - "a": "A", - "c": "C", - }, - nil, - "a:A,b:B,c:C", - }, - { - "multiple labels w/ params", - map[string]string{ - "b": "B", - "a": "A", - "c": "C", - }, - map[string]string{ - "param1": "foo", - "bar": "car", - "apple": "banana", - }, - "a:A,apple:banana,b:B,bar:car,c:C,param1:foo", - }, - } - - for _, tc := range testCases { - t.Run(tc.testName, func(t *testing.T) { - key := createMetricKey(tc.labels, tc.params) - assert.Equal(t, tc.expectedKey, key) - }) - } -} \ No newline at end of file From 8490be5245dcf084f05f94f4ab8292ec9bbe893d Mon Sep 17 00:00:00 2001 From: xiami Date: Wed, 3 Feb 2021 21:34:49 -0800 Subject: [PATCH 3/4] fix the lint errors --- exporter/awsemfexporter/datapoint.go | 15 +++++---------- exporter/awsemfexporter/datapoint_test.go | 8 ++++---- exporter/awsemfexporter/go.mod | 2 +- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go index c2aeafdf2761..f87ebf720b7a 100644 --- a/exporter/awsemfexporter/datapoint.go +++ b/exporter/awsemfexporter/datapoint.go @@ -28,11 +28,6 @@ import ( const ( cleanInterval = 5 * time.Minute minTimeDiff = 50 * time.Millisecond // We assume 50 milli-seconds is the minimal gap between two collected data sample to be valid to calculate delta - - namespaceKey = "CloudWatchNamespace" - metricNameKey = "CloudWatchMetricName" - logGroupKey = "CloudWatchLogGroup" - logStreamKey = "CloudWatchLogStream" ) var currentState = mapwithexpiry.NewMapWithExpiry(cleanInterval) @@ -246,7 +241,7 @@ func getDataPoints(pmd *pdata.Metric, metadata CWMetricMetadata, logger *zap.Log return } - rateKeyParams := rateKeyParams{ + rateKeys := rateKeyParams{ namespaceKey: metadata.Namespace, metricNameKey: pmd.Name(), logGroupKey: metadata.LogGroup, @@ -260,7 +255,7 @@ func getDataPoints(pmd *pdata.Metric, metadata CWMetricMetadata, logger *zap.Log metadata.InstrumentationLibraryName, rateCalculationMetadata{ false, - rateKeyParams, + rateKeys, metadata.TimestampMs, }, metric.DataPoints(), @@ -271,7 +266,7 @@ func getDataPoints(pmd *pdata.Metric, metadata CWMetricMetadata, logger *zap.Log metadata.InstrumentationLibraryName, rateCalculationMetadata{ false, - rateKeyParams, + rateKeys, metadata.TimestampMs, }, metric.DataPoints(), @@ -282,7 +277,7 @@ func getDataPoints(pmd *pdata.Metric, metadata CWMetricMetadata, logger *zap.Log metadata.InstrumentationLibraryName, rateCalculationMetadata{ metric.AggregationTemporality() == pdata.AggregationTemporalityCumulative, - rateKeyParams, + rateKeys, metadata.TimestampMs, }, metric.DataPoints(), @@ -293,7 +288,7 @@ func getDataPoints(pmd *pdata.Metric, metadata CWMetricMetadata, logger *zap.Log metadata.InstrumentationLibraryName, rateCalculationMetadata{ metric.AggregationTemporality() == pdata.AggregationTemporalityCumulative, - rateKeyParams, + rateKeys, metadata.TimestampMs, }, metric.DataPoints(), diff --git a/exporter/awsemfexporter/datapoint_test.go b/exporter/awsemfexporter/datapoint_test.go index 9b06c5154d60..3c84245be10b 100644 --- a/exporter/awsemfexporter/datapoint_test.go +++ b/exporter/awsemfexporter/datapoint_test.go @@ -245,7 +245,7 @@ func TestIntDataPointSliceAt(t *testing.T) { timestamp := time.Now().UnixNano() / int64(time.Millisecond) instrLibName := "cloudwatch-otel" labels := map[string]string{"label1": "value1"} - rateKeyParams := rateKeyParams{ + rateKeys := rateKeyParams{ namespaceKey: "namespace", metricNameKey: "foo", logGroupKey: "log-group", @@ -281,7 +281,7 @@ func TestIntDataPointSliceAt(t *testing.T) { instrLibName, rateCalculationMetadata{ tc.needsCalculateRate, - rateKeyParams, + rateKeys, timestamp, }, testDPS, @@ -305,7 +305,7 @@ func TestDoubleDataPointSliceAt(t *testing.T) { timestamp := time.Now().UnixNano() / int64(time.Millisecond) instrLibName := "cloudwatch-otel" labels := map[string]string{"label1": "value1"} - rateKeyParams := rateKeyParams{ + rateKeys := rateKeyParams{ namespaceKey: "namespace", metricNameKey: "foo", logGroupKey: "log-group", @@ -341,7 +341,7 @@ func TestDoubleDataPointSliceAt(t *testing.T) { instrLibName, rateCalculationMetadata{ tc.needsCalculateRate, - rateKeyParams, + rateKeys, timestamp, }, testDPS, diff --git a/exporter/awsemfexporter/go.mod b/exporter/awsemfexporter/go.mod index 762e62dacb91..c386d9835850 100644 --- a/exporter/awsemfexporter/go.mod +++ b/exporter/awsemfexporter/go.mod @@ -9,6 +9,6 @@ require ( github.com/google/uuid v1.2.0 github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.19.0 - go.opentelemetry.io/otel v0.16.0 + go.opentelemetry.io/otel v0.15.0 go.uber.org/zap v1.16.0 ) From 9437b099b62c61cf0582a3d211856cf834d56577 Mon Sep 17 00:00:00 2001 From: xiami Date: Wed, 3 Feb 2021 21:54:41 -0800 Subject: [PATCH 4/4] fix the lint errors --- exporter/awsemfexporter/datapoint.go | 3 +-- exporter/awsemfexporter/util.go | 3 +-- exporter/awsemfexporter/util_test.go | 5 ++--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go index f87ebf720b7a..f489c6461d69 100644 --- a/exporter/awsemfexporter/datapoint.go +++ b/exporter/awsemfexporter/datapoint.go @@ -17,9 +17,8 @@ package awsemfexporter import ( "time" - "go.opentelemetry.io/otel/label" - "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/otel/label" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/mapwithexpiry" diff --git a/exporter/awsemfexporter/util.go b/exporter/awsemfexporter/util.go index acb4895a52e2..7435d4cea38b 100644 --- a/exporter/awsemfexporter/util.go +++ b/exporter/awsemfexporter/util.go @@ -19,9 +19,8 @@ import ( "strings" "time" - "go.opentelemetry.io/collector/translator/conventions" - "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/translator/conventions" "go.uber.org/zap" ) diff --git a/exporter/awsemfexporter/util_test.go b/exporter/awsemfexporter/util_test.go index f6f3d5010742..39fa2a3350da 100644 --- a/exporter/awsemfexporter/util_test.go +++ b/exporter/awsemfexporter/util_test.go @@ -19,12 +19,11 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" + "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/conventions" "go.opentelemetry.io/collector/translator/internaldata" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" )