diff --git a/.chloggen/metricstransform-processor-scale-exp-hist.yaml b/.chloggen/metricstransform-processor-scale-exp-hist.yaml new file mode 100644 index 000000000000..ba3d39eb08b9 --- /dev/null +++ b/.chloggen/metricstransform-processor-scale-exp-hist.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: metricstransformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add scaling exponential histogram support + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29803] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/metricstransformprocessor/README.md b/processor/metricstransformprocessor/README.md index 2cbb3da3237f..bac2242119d5 100644 --- a/processor/metricstransformprocessor/README.md +++ b/processor/metricstransformprocessor/README.md @@ -107,7 +107,7 @@ processors: label_set: [labels...] # aggregation_type defines how data points will be aggregated; if action is aggregate_labels or aggregate_label_values, aggregation_type is required aggregation_type: {sum, mean, min, max, count, median} - # experimental_scale specifies the scalar to apply to values + # experimental_scale specifies the scalar to apply to values. Scaling exponential histograms inherently involves some loss of accuracy. experimental_scale: # value_actions contain a list of operations that will be performed on the selected label value_actions: diff --git a/processor/metricstransformprocessor/metrics_testcase_builder_test.go b/processor/metricstransformprocessor/metrics_testcase_builder_test.go index d402c69b5820..dc2142b2ca76 100644 --- a/processor/metricstransformprocessor/metrics_testcase_builder_test.go +++ b/processor/metricstransformprocessor/metrics_testcase_builder_test.go @@ -116,6 +116,66 @@ func (b builder) addHistogramDatapointWithMinMaxAndExemplars(start, ts pcommon.T return b } +type expHistogramConfig struct { + count uint64 + sum float64 + min float64 + max float64 + zeroThreshold float64 + zeroCount uint64 + scale int32 + positiveOffset int32 + positiveCount []uint64 + negativeOffset int32 + negativeCount []uint64 + exemplarValues []float64 +} + +func (b builder) addExpHistogramDatapoint(config expHistogramConfig) builder { + if b.metric.Type() != pmetric.MetricTypeExponentialHistogram { + panic(b.metric.Type().String()) + } + dp := b.metric.ExponentialHistogram().DataPoints().AppendEmpty() + dp.SetCount(config.count) + dp.SetSum(config.sum) + dp.SetMin(config.min) + dp.SetMax(config.max) + dp.SetZeroThreshold(config.zeroThreshold) + dp.SetZeroCount(config.zeroCount) + dp.SetScale(config.scale) + dp.Positive().SetOffset(config.positiveOffset) + dp.Positive().BucketCounts().FromRaw(config.positiveCount) + dp.Negative().SetOffset(config.negativeOffset) + dp.Negative().BucketCounts().FromRaw(config.negativeCount) + for ei := 0; ei < len(config.exemplarValues); ei++ { + exemplar := dp.Exemplars().AppendEmpty() + exemplar.SetTimestamp(1) + exemplar.SetDoubleValue(config.exemplarValues[ei]) + } + dp.SetStartTimestamp(1) + dp.SetTimestamp(1) + return b +} + +func buildExpHistogramBucket(m map[int]uint64) []uint64 { + if len(m) == 0 { + return []uint64{} + } + maxIndex := 0 + for index := range m { + if index > maxIndex { + maxIndex = index + } + } + + result := make([]uint64, maxIndex+1) + for index, count := range m { + result[index] = count + } + + return result +} + // setUnit sets the unit of this metric func (b builder) setUnit(unit string) builder { b.metric.SetUnit(unit) diff --git a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go index 85337e5d842b..cc6e5ff742d7 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go +++ b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go @@ -1634,6 +1634,151 @@ var ( addHistogramDatapointWithMinMaxAndExemplars(2, 2, 2, 40, 10, 30, []float64{20}, []uint64{1, 2}, []float64{10, 30}).build(), }, }, + { + name: "metric_experimental_scale_value_exp_histogram", + transforms: []internalTransform{ + { + MetricIncludeFilter: internalFilterStrict{include: "metric1"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: scaleValue, + Scale: 1000, + }, + }, + }, + }, + { + MetricIncludeFilter: internalFilterStrict{include: "metric2"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: scaleValue, + Scale: .1, + }, + }, + }, + }, + { + MetricIncludeFilter: internalFilterStrict{include: "metric3"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: scaleValue, + Scale: 100000, + }, + }, + }, + }, + { + MetricIncludeFilter: internalFilterStrict{include: "metric4"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: scaleValue, + Scale: 42.123, + }, + }, + }, + }, + }, + in: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric1"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 5, + sum: 1359, + scale: 4, + min: 10, + max: 500, + zeroThreshold: 5, + zeroCount: 1, + positiveOffset: 53, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 53: 1, 74: 1, 90: 2}), // 10, 100, 250, 499, 500 + exemplarValues: []float64{100, 300}, + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric2"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 10100.000123, + scale: 2, + min: 0.000123, + max: 10000, + positiveOffset: -52, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 78: 1, 105: 1}), // 0.000123, 100, 10000 + exemplarValues: []float64{100, 300}, + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric3"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 4.3678, + scale: 7, + min: 1.123, + max: 1.789, + positiveOffset: 21, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 48: 1, 86: 1}), // 1.123, 1.456, 1.789 + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric4"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 6.00003, + scale: 20, + min: 2, + max: 2.00002, + negativeOffset: 1048575, + negativeCount: buildExpHistogramBucket(map[int]uint64{0: 1, 8: 1, 16: 1}), // 2, 2.00001, 2.00002 + }).build(), + }, + out: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric1"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 5, + sum: 1359000, + scale: 4, + min: 10000, + max: 500000, + zeroThreshold: 5000, + zeroCount: 1, + positiveOffset: 212, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 53: 1, 74: 1, 90: 2}), + exemplarValues: []float64{100000, 300000}, + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric2"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 1010.0000123, + scale: 2, + min: 0.0000123, + max: 1000, + positiveOffset: -65, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 78: 1, 105: 1}), + exemplarValues: []float64{10, 30}, + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric3"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 436780, + scale: 7, + min: 112300, + max: 178900, + positiveOffset: 2147, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 48: 1, 86: 1}), + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric4"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 252.73926368999997, + scale: 20, + min: 84.246, + max: 84.24684246, + negativeOffset: 6707253, + negativeCount: buildExpHistogramBucket(map[int]uint64{0: 1, 8: 1, 16: 1}), + }).build(), + }, + }, { name: "metric_experimental_scale_with_attr_filtering", transforms: []internalTransform{ diff --git a/processor/metricstransformprocessor/operation_scale_value.go b/processor/metricstransformprocessor/operation_scale_value.go index 0200b3eb36a4..c119d30c81ea 100644 --- a/processor/metricstransformprocessor/operation_scale_value.go +++ b/processor/metricstransformprocessor/operation_scale_value.go @@ -4,6 +4,8 @@ package metricstransformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor" import ( + "math" + "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -19,6 +21,9 @@ func scaleValueOp(metric pmetric.Metric, op internalOperation, f internalFilter) case pmetric.MetricTypeHistogram: scaleHistogramOp(metric, op, f) return + case pmetric.MetricTypeExponentialHistogram: + scaleExpHistogramOp(metric, op, f) + return default: return } @@ -60,14 +65,71 @@ func scaleHistogramOp(metric pmetric.Metric, op internalOperation, f internalFil bounds.SetAt(bi, bounds.At(bi)*op.configOperation.Scale) } - for exemplars, ei := dp.Exemplars(), 0; ei < exemplars.Len(); ei++ { - exemplar := exemplars.At(ei) - switch exemplar.ValueType() { - case pmetric.ExemplarValueTypeInt: - exemplar.SetIntValue(int64(float64(exemplar.IntValue()) * op.configOperation.Scale)) - case pmetric.ExemplarValueTypeDouble: - exemplar.SetDoubleValue(exemplar.DoubleValue() * op.configOperation.Scale) - } + scaleExemplars(dp.Exemplars(), &op) + } +} + +func scaleExpHistogramOp(metric pmetric.Metric, op internalOperation, f internalFilter) { + var dps = metric.ExponentialHistogram().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + if !f.matchAttrs(dp.Attributes()) { + continue + } + + if dp.HasSum() { + dp.SetSum(dp.Sum() * op.configOperation.Scale) + } + if dp.HasMin() { + dp.SetMin(dp.Min() * op.configOperation.Scale) + } + if dp.HasMax() { + dp.SetMax(dp.Max() * op.configOperation.Scale) + } + + dp.SetZeroThreshold(dp.ZeroThreshold() * op.configOperation.Scale) + + // For the buckets, we only need to change the offset. + // The bucket counts and the scale remain the same. + if len(dp.Positive().BucketCounts().AsRaw()) != 0 { + dp.Positive().SetOffset(updateOffset(dp.Scale(), dp.Positive().Offset(), &op)) + } + + if len(dp.Negative().BucketCounts().AsRaw()) != 0 { + dp.Negative().SetOffset(updateOffset(dp.Scale(), dp.Negative().Offset(), &op)) + } + + scaleExemplars(dp.Exemplars(), &op) + } +} + +func updateOffset(scale int32, offset int32, op *internalOperation) int32 { + // Take the middle of the first bucket. + base := math.Pow(2, math.Pow(2, float64(-scale))) + value := (math.Pow(base, float64(offset)) + math.Pow(base, float64(offset+1))) / 2 + + // Scale it according to the config. + scaledValue := value * op.configOperation.Scale + + // Find the new offset by mapping the scaled value. + return mapToIndex(scaledValue, int(scale)) +} + +// mapToIndex returns the index that corresponds to the given value on the scale. +// See https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function. +func mapToIndex(value float64, scale int) int32 { + scaleFactor := math.Ldexp(math.Log2E, scale) + return int32(math.Ceil(math.Log(value)*scaleFactor) - 1) +} + +func scaleExemplars(exemplars pmetric.ExemplarSlice, op *internalOperation) { + for e, ei := exemplars, 0; ei < e.Len(); ei++ { + exemplar := e.At(ei) + switch exemplar.ValueType() { + case pmetric.ExemplarValueTypeInt: + exemplar.SetIntValue(int64(float64(exemplar.IntValue()) * op.configOperation.Scale)) + case pmetric.ExemplarValueTypeDouble: + exemplar.SetDoubleValue(exemplar.DoubleValue() * op.configOperation.Scale) } } }