diff --git a/.chloggen/cds-1320.yaml b/.chloggen/cds-1320.yaml new file mode 100644 index 000000000000..bdca6d2069e8 --- /dev/null +++ b/.chloggen/cds-1320.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/transform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add custom function to the transform processor to convert exponential histograms to explicit histograms." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33827] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 57441da0b189..455b486fd428 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -220,6 +220,7 @@ In addition to OTTL functions, the processor defines its own functions to help w - [copy_metric](#copy_metric) - [scale_metric](#scale_metric) - [aggregate_on_attributes](#aggregate_on_attributes) +- [convert_exponential_histogram_to_histogram](#convert_exponential_histogram_to_histogram) - [aggregate_on_attribute_value](#aggregate_on_attribute_value) ### convert_sum_to_gauge @@ -356,6 +357,84 @@ Examples: - `copy_metric(desc="new desc") where description == "old desc"` + +### convert_exponential_histogram_to_histogram + +__Warning:__ The approach used in this function to convert exponential histograms to explicit histograms __is not__ part of the __OpenTelemetry Specification__. + +`convert_exponential_histogram_to_histogram(distribution, [ExplicitBounds])` + +The `convert_exponential_histogram_to_histogram` function converts an ExponentialHistogram to an Explicit (_normal_) Histogram. + +This function requires 2 arguments: + +- `distribution` - This argument defines the distribution algorithm used to allocate the exponential histogram datapoints into a new Explicit Histogram. There are 4 options: +
+ - __upper__ - This approach identifies the highest possible value of each exponential bucket (_the upper bound_) and uses it to distribute the datapoints by comparing the upper bound of each bucket with the ExplicitBounds provided. This approach works better for small/narrow exponential histograms where the difference between the upper bounds and lower bounds are small. + + _For example, Given:_ + 1. count = 10 + 2. Boundaries: [5, 10, 15, 20, 25] + 3. Upper Bound: 15 + _Process:_ + 4. Start with zeros: [0, 0, 0, 0, 0] + 5. Iterate the boundaries and compare $upper = 15$ with each boundary: + - $15>5$ (_skip_) + - $15>10$ (_skip_) + - $15<=15$ (allocate count to this boundary) + 6. Allocate count: [0, 0, __10__, 0, 0] + 7. Final Counts: [0, 0, __10__, 0, 0] +
+ - __midpoint__ - This approach works in a similar way to the __upper__ approach, but instead of using the upper bound, it uses the midpoint of each exponential bucket. The midpoint is identified by calculating the average of the upper and lower bounds. This approach also works better for small/narrow exponential histograms. +
+ + >The __uniform__ and __random__ distribution algorithms both utilise the concept of intersecting boundaries. + Intersecting boundaries are any boundary in the `boundaries array` that falls between or on the lower and upper values of the Exponential Histogram boundaries. + _For Example:_ if you have an Exponential Histogram bucket with a lower bound of 10 and upper of 20, and your boundaries array is [5, 10, 15, 20, 25], the intersecting boundaries are 10, 15, and 20 because they lie within the range [10, 20]. +
+ - __uniform__ - This approach distributes the datapoints for each bucket uniformly across the intersecting __ExplicitBounds__. The algorithm works as follows: + + - If there are valid intersecting boundaries, the function evenly distributes the count across these boundaries. + - Calculate the count to be allocated to each boundary. + - If there is a remainder after dividing the count equally, it distributes the remainder by incrementing the count for some of the boundaries until the remainder is exhausted. + + _For example Given:_ + 1. count = 10 + 2. Exponential Histogram Bounds: [10, 20] + 3. Boundaries: [5, 10, 15, 20, 25] + 4. Intersecting Boundaries: [10, 15, 20] + 5. Number of Intersecting Boundaries: 3 + 6. Using the formula: $count/numOfIntersections=10/3=3r1$ + _Uniform Allocation:_ + 7. Start with zeros: [0, 0, 0, 0, 0] + 8. Allocate 3 to each: [0, 3, 3, 3, 0] + 9. Distribute remainder $r$ 1: [0, 4, 3, 3, 0] + 10. Final Counts: [0, 4, 3, 3, 0] +
+ - __random__ - This approach distributes the datapoints for each bucket randomly across the intersecting __ExplicitBounds__. This approach works in a similar manner to the uniform distribution algorithm with the main difference being that points are distributed randomly instead of uniformly. This works as follows: + - If there are valid intersecting boundaries, calculate the proportion of the count that should be allocated to each boundary based on the overlap of the boundary with the provided range (lower to upper). + - For each boundary, a random fraction of the calculated proportion is allocated. + - Any remaining count (_due to rounding or random distribution_) is then distributed randomly among the intersecting boundaries. + - If the bucket range does not intersect with any boundaries, the entire count is assigned to the start boundary. +
+- `ExplicitBounds` represents the list of bucket boundaries for the new histogram. This argument is __required__ and __cannot be empty__. + +__WARNINGS:__ + +- The process of converting an ExponentialHistogram to an Explicit Histogram is not perfect and may result in a loss of precision. It is important to define an appropriate set of bucket boundaries and identify the best distribution approach for your data in order to minimize this loss. + + For example, selecting Boundaries that are too high or too low may result histogram buckets that are too wide or too narrow, respectively. + +- __Negative Bucket Counts__ are not supported in Explicit Histograms, as such negative bucket counts are ignored. + +- __ZeroCounts__ are only allocated if the ExplicitBounds array contains a zero boundary. That is, if the Explicit Boundaries that you provide does not start with `0`, the function will not allocate any zero counts from the Exponential Histogram. + +This function should only be used when Exponential Histograms are not suitable for the downstream consumers or if upstream metric sources are unable to generate Explicit Histograms. + +__Example__: + +- `convert_exponential_histogram_to_histogram("random", [0.0, 10.0, 100.0, 1000.0, 10000.0])` + ### scale_metric `scale_metric(factor, Optional[unit])` @@ -462,6 +541,7 @@ statements: To aggregate only using a specified set of attributes, you can use `keep_matching_keys`. + ## Examples ### Perform transformation if field does not exist diff --git a/processor/transformprocessor/go.mod b/processor/transformprocessor/go.mod index e5e6c1c0048d..19629b783abf 100644 --- a/processor/transformprocessor/go.mod +++ b/processor/transformprocessor/go.mod @@ -25,7 +25,10 @@ require ( go.uber.org/zap v1.27.0 ) -require go.opentelemetry.io/collector/consumer/consumertest v0.109.1-0.20240918193345-a3c0565031b0 +require ( + go.opentelemetry.io/collector/consumer/consumertest v0.109.1-0.20240918193345-a3c0565031b0 + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 +) require ( github.com/alecthomas/participle/v2 v2.1.1 // indirect @@ -66,7 +69,6 @@ require ( go.opentelemetry.io/otel v1.30.0 // indirect go.opentelemetry.io/otel/sdk v1.30.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect diff --git a/processor/transformprocessor/internal/metrics/func_convert_exponential_hist_to_explicit_hist.go b/processor/transformprocessor/internal/metrics/func_convert_exponential_hist_to_explicit_hist.go new file mode 100644 index 000000000000..9472cdf3e99f --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_convert_exponential_hist_to_explicit_hist.go @@ -0,0 +1,307 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" + +import ( + "context" + "fmt" + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" + "golang.org/x/exp/rand" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +type convertExponentialHistToExplicitHistArguments struct { + DistributionFn string + ExplicitBounds []float64 +} + +// distributionFnMap - map of conversion functions +var distributionFnMap = map[string]distAlgorithm{ + "upper": upperAlgorithm, + "midpoint": midpointAlgorithm, + "random": randomAlgorithm, + "uniform": uniformAlgorithm, +} + +func newconvertExponentialHistToExplicitHistFactory() ottl.Factory[ottlmetric.TransformContext] { + return ottl.NewFactory("convert_exponential_histogram_to_histogram", + &convertExponentialHistToExplicitHistArguments{}, createconvertExponentialHistToExplicitHistFunction) +} + +func createconvertExponentialHistToExplicitHistFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + args, ok := oArgs.(*convertExponentialHistToExplicitHistArguments) + + if !ok { + return nil, fmt.Errorf("convertExponentialHistToExplicitHistFactory args must be of type *convertExponentialHistToExplicitHistArguments") + } + + if len(args.DistributionFn) == 0 { + args.DistributionFn = "random" + } + + if _, ok := distributionFnMap[args.DistributionFn]; !ok { + return nil, fmt.Errorf("invalid conversion function: %s, must be one of [upper, midpoint, random, uniform]", args.DistributionFn) + + } + + return convertExponentialHistToExplicitHist(args.DistributionFn, args.ExplicitBounds) +} + +// convertExponentialHistToExplicitHist converts an exponential histogram to a bucketed histogram +func convertExponentialHistToExplicitHist(distributionFn string, explicitBounds []float64) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + + if len(explicitBounds) == 0 { + return nil, fmt.Errorf("explicit bounds cannot be empty: %v", explicitBounds) + } + + distFn, ok := distributionFnMap[distributionFn] + if !ok { + return nil, fmt.Errorf("invalid distribution algorithm: %s, must be one of [upper, midpoint, random, uniform]", distributionFn) + } + + return func(_ context.Context, tCtx ottlmetric.TransformContext) (any, error) { + metric := tCtx.GetMetric() + + // only execute on exponential histograms + if metric.Type() != pmetric.MetricTypeExponentialHistogram { + return nil, nil + } + + explicitHist := pmetric.NewHistogram() + dps := metric.ExponentialHistogram().DataPoints() + explicitHist.SetAggregationTemporality(metric.ExponentialHistogram().AggregationTemporality()) + + // map over each exponential histogram data point and calculate the bucket counts + for i := 0; i < dps.Len(); i++ { + expDataPoint := dps.At(i) + bucketCounts := calculateBucketCounts(expDataPoint, explicitBounds, distFn) + explicitHistDp := explicitHist.DataPoints().AppendEmpty() + explicitHistDp.SetStartTimestamp(expDataPoint.StartTimestamp()) + explicitHistDp.SetTimestamp(expDataPoint.Timestamp()) + explicitHistDp.SetCount(expDataPoint.Count()) + explicitHistDp.SetSum(expDataPoint.Sum()) + explicitHistDp.SetMin(expDataPoint.Min()) + explicitHistDp.SetMax(expDataPoint.Max()) + expDataPoint.Exemplars().CopyTo(explicitHistDp.Exemplars()) + explicitHistDp.ExplicitBounds().FromRaw(explicitBounds) + explicitHistDp.BucketCounts().FromRaw(bucketCounts) + expDataPoint.Attributes().CopyTo(explicitHistDp.Attributes()) + } + + // create new metric and override metric + newMetric := pmetric.NewMetric() + newMetric.SetName(metric.Name()) + newMetric.SetDescription(metric.Description()) + newMetric.SetUnit(metric.Unit()) + explicitHist.CopyTo(newMetric.SetEmptyHistogram()) + newMetric.CopyTo(metric) + + return nil, nil + }, nil +} + +type distAlgorithm func(count uint64, upper, lower float64, boundaries []float64, bucketCountsDst *[]uint64) + +func calculateBucketCounts(dp pmetric.ExponentialHistogramDataPoint, boundaries []float64, distFn distAlgorithm) []uint64 { + scale := int(dp.Scale()) + factor := math.Ldexp(math.Ln2, -scale) + posB := dp.Positive().BucketCounts() + bucketCounts := make([]uint64, len(boundaries)) + + // add zerocount if boundary starts at zero + if zerocount := dp.ZeroCount(); zerocount > 0 && boundaries[0] == 0 { + bucketCounts[0] += zerocount + } + + for pos := 0; pos < posB.Len(); pos++ { + index := dp.Positive().Offset() + int32(pos) + upper := math.Exp(float64(index+1) * factor) + lower := math.Exp(float64(index) * factor) + count := posB.At(pos) + runDistFn := true + + // if the lower bound is greater than the last boundary, add the count to the overflow bucket + if lower > boundaries[len(boundaries)-1] { + bucketCounts[len(boundaries)-1] += count + continue + } + + // check if lower and upper bounds are within the boundaries + for bIndex := 1; bIndex < len(boundaries); bIndex++ { + if lower > boundaries[bIndex-1] && upper <= boundaries[bIndex] { + bucketCounts[bIndex-1] += count + runDistFn = false + break + } + } + + if runDistFn { + distFn(count, upper, lower, boundaries, &bucketCounts) + } + } + + return bucketCounts +} + +// upperAlgorithm function calculates the bucket counts for a given exponential histogram data point. +// The algorithm is inspired by the logExponentialHistogramDataPoints function used to Print Exponential Histograms in Otel. +// found here: https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/internal/otlptext/databuffer.go#L144-L201 +// +// - factor is calculated as math.Ldexp(math.Ln2, -scale) +// +// - next we iterate the bucket counts and positions (pos) in the exponential histogram datapoint. +// +// - the index is calculated by adding the exponential offset to the positive bucket position (pos) +// +// - the factor is then used to calculate the upper bound of the bucket which is calculated as +// upper = math.Exp((index+1) * factor) +var upperAlgorithm distAlgorithm = func(count uint64, + upper, _ float64, boundaries []float64, + bucketCountsDst *[]uint64) { + // count := bucketCountsSrc.At(index) + + // At this point we know that the upper bound represents the highest value that can be in this bucket, so we take the + // upper bound and compare it to each of the explicit boundaries provided by the user until we find a boundary + // that fits, that is, the first instance where upper bound <= explicit boundary. + for j, boundary := range boundaries { + if upper <= boundary { + (*bucketCountsDst)[j] += count + return + } + } + (*bucketCountsDst)[len(boundaries)-1] += count // Overflow bucket +} + +// midpointAlgorithm calculates the bucket counts for a given exponential histogram data point. +// This algorithm is similar to calculateBucketCountsWithUpperBounds, but instead of using the upper bound of the bucket +// to determine the bucket, it uses the midpoint of the upper and lower bounds. +// The midpoint is calculated as (upper + lower) / 2. +var midpointAlgorithm distAlgorithm = func(count uint64, + upper, lower float64, boundaries []float64, + bucketCountsDst *[]uint64) { + midpoint := (upper + lower) / 2 + + for j, boundary := range boundaries { + if midpoint <= boundary { + if j > 0 { + (*bucketCountsDst)[j-1] += count + return + } + (*bucketCountsDst)[j] += count + return + } + } + (*bucketCountsDst)[len(boundaries)-1] += count // Overflow bucket +} + +// uniformAlgorithm distributes counts from a given set of bucket sounrces into a set of linear boundaries using uniform distribution +var uniformAlgorithm distAlgorithm = func(count uint64, + upper, lower float64, boundaries []float64, + bucketCountsDst *[]uint64) { + + // Find the boundaries that intersect with the bucket range + var start, end int + for start = 0; start < len(boundaries); start++ { + if lower <= boundaries[start] { + break + } + } + + for end = start; end < len(boundaries); end++ { + if upper <= boundaries[end] { + break + } + } + + // make sure end value does not exceed the length of the boundaries + if end > len(boundaries)-1 { + end = len(boundaries) - 1 + } + + // Distribute the count uniformly across the intersecting boundaries + if end > start { + countPerBoundary := count / uint64(end-start+1) + remainder := count % uint64(end-start+1) + + for j := start; j <= end; j++ { + (*bucketCountsDst)[j] += countPerBoundary + if remainder > 0 { + (*bucketCountsDst)[j]++ + remainder-- + } + } + } else { + // Handle the case where the bucket range does not intersect with any boundaries + (*bucketCountsDst)[start] += count + } +} + +// randomAlgorithm distributes counts from a given set of bucket sources into a set of linear boundaries using random distribution +var randomAlgorithm distAlgorithm = func(count uint64, + upper, lower float64, boundaries []float64, + bucketCountsDst *[]uint64) { + // Find the boundaries that intersect with the bucket range + start := 0 + for start < len(boundaries) && boundaries[start] < lower { + start++ + } + end := start + for end < len(boundaries) && boundaries[end] < upper { + end++ + } + + // make sure end value does not exceed the length of the boundaries + if end > len(boundaries)-1 { + end = len(boundaries) - 1 + } + + // Randomly distribute the count across the intersecting boundaries + if end > start { + rangeWidth := upper - lower + totalAllocated := uint64(0) + + for j := start; j <= end; j++ { + var boundaryLower, boundaryUpper float64 + if j == 0 { + // For the first boundary, set the lower limit to the bucket's lower bound + boundaryLower = lower + } else { + // Otherwise, set it to the previous boundary + boundaryLower = boundaries[j-1] + } + if j == len(boundaries) { + // For the last boundary, set the upper limit to the bucket's upper bound + boundaryUpper = upper + } else { + // Otherwise, set it to the current boundary + boundaryUpper = boundaries[j] + } + + // Calculate the overlap width between the boundary range and the bucket range + overlapWidth := math.Min(boundaryUpper, upper) - math.Max(boundaryLower, lower) + // Proportionally allocate the count based on the overlap width + allocatedCount := uint64(float64(count) * (overlapWidth / rangeWidth)) + + // Randomly assign the counts to the boundaries + randomlyAllocatedCount := uint64(rand.Float64() * float64(allocatedCount)) + (*bucketCountsDst)[j] += randomlyAllocatedCount + totalAllocated += randomlyAllocatedCount + } + + // Distribute any remaining count + remainingCount := count - totalAllocated + for remainingCount > 0 { + randomBoundary := rand.Intn(end-start+1) + start + (*bucketCountsDst)[randomBoundary]++ + remainingCount-- + } + } else { + // If the bucket range does not intersect with any boundaries, assign the entire count to the start boundary + (*bucketCountsDst)[start] += count + } +} diff --git a/processor/transformprocessor/internal/metrics/func_convert_exponential_hist_to_explicit_hist_test.go b/processor/transformprocessor/internal/metrics/func_convert_exponential_hist_to_explicit_hist_test.go new file mode 100644 index 000000000000..1dd76ad4f097 --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_convert_exponential_hist_to_explicit_hist_test.go @@ -0,0 +1,788 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +var nonExponentialHist = func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetName("not-exponentialhist") + m.SetEmptyGauge() + return m +} + +func TestUpper_convert_exponential_hist_to_explicit_hist(t *testing.T) { + ts := pcommon.NewTimestampFromTime(time.Now()) + defaultTestMetric := func() pmetric.Metric { + exponentialHistInput := pmetric.NewMetric() + exponentialHistInput.SetName("response_time") + dp := exponentialHistInput.SetEmptyExponentialHistogram().DataPoints().AppendEmpty() + exponentialHistInput.ExponentialHistogram().SetAggregationTemporality(1) + dp.SetCount(2) + dp.SetScale(7) + dp.SetSum(361) + dp.SetMax(195) + dp.SetMin(166) + + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.Positive().BucketCounts().Append( + 1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1) + + dp.Positive().SetOffset(944) + return exponentialHistInput + } + + tests := []struct { + name string + input func() pmetric.Metric + arg []float64 // ExplicitBounds + distribution string + want func(pmetric.Metric) + }{ + { + // having explicit bounds that are all smaller than the exponential histogram's scale + // will results in all the exponential histogram's data points being placed in the overflow bucket + name: "convert exponential histogram to explicit histogram with smaller bounds with upper distribute", + input: defaultTestMetric, + arg: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + distribution: "upper", + want: func(metric pmetric.Metric) { + + metric.SetName("response_time") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(2) + dp.SetSum(361) + dp.SetMax(195) + dp.SetMin(166) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(0, 0, 0, 0, 2) // expect all counts in the overflow bucket + + // set explictbounds + dp.ExplicitBounds().Append(1.0, 2.0, 3.0, 4.0, 5.0) + + }, + }, + { + // having explicit bounds that are all larger than the exponential histogram's scale + // will results in all the exponential histogram's data points being placed in the 1st bucket + name: "convert exponential histogram to explicit histogram with large bounds", + input: defaultTestMetric, + arg: []float64{1000.0, 2000.0, 3000.0, 4000.0, 5000.0}, + distribution: "upper", + want: func(metric pmetric.Metric) { + + metric.SetName("response_time") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(2) + dp.SetSum(361) + dp.SetMax(195) + dp.SetMin(166) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(2, 0, 0, 0, 0) // expect all counts in the 1st bucket + + // set explictbounds + dp.ExplicitBounds().Append(1000.0, 2000.0, 3000.0, 4000.0, 5000.0) + + }, + }, + { + + name: "convert exponential histogram to explicit history", + input: defaultTestMetric, + arg: []float64{160.0, 170.0, 180.0, 190.0, 200.0}, + distribution: "upper", + want: func(metric pmetric.Metric) { + + metric.SetName("response_time") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(2) + dp.SetSum(361) + dp.SetMax(195) + dp.SetMin(166) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(1, 0, 0, 1, 0) + + // set explictbounds + dp.ExplicitBounds().Append(160.0, 170.0, 180.0, 190.0, 200.0) + + }, + }, + { + name: "convert exponential histogram to explicit history with 0 scale", + input: defaultTestMetric, + arg: []float64{160.0, 170.0, 180.0, 190.0, 200.0}, + distribution: "upper", + want: func(metric pmetric.Metric) { + + metric.SetName("response_time") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(2) + dp.SetSum(361) + dp.SetMax(195) + dp.SetMin(166) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(1, 0, 0, 1, 0) + + // set explictbounds + dp.ExplicitBounds().Append(160.0, 170.0, 180.0, 190.0, 200.0) + + }, + }, + { + // 0 scale exponential histogram will result in an extremely large upper bound + // resulting in all the counts being in buckets much larger than the explicit bounds + // thus all counts will be in the overflow bucket + name: "0 scale expontential histogram given using upper distribute", + input: func() pmetric.Metric { + m := pmetric.NewMetric() + defaultTestMetric().CopyTo(m) + m.ExponentialHistogram().DataPoints().At(0).SetScale(0) + return m + }, + arg: []float64{160.0, 170.0, 180.0, 190.0, 200.0}, + distribution: "upper", + want: func(metric pmetric.Metric) { + metric.SetName("response_time") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(2) + dp.SetSum(361) + dp.SetMax(195) + dp.SetMin(166) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(0, 0, 0, 0, 2) + + // set explictbounds + dp.ExplicitBounds().Append(160.0, 170.0, 180.0, 190.0, 200.0) + }, + }, + { + name: "empty expontential histogram given using upper distribute", + input: func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetName("empty") + m.SetEmptyExponentialHistogram() + return m + }, + arg: []float64{160.0, 170.0, 180.0, 190.0, 200.0}, + distribution: "upper", + want: func(metric pmetric.Metric) { + metric.SetName("empty") + metric.SetEmptyHistogram() + }, + }, + { + name: "non-expontential histogram", + arg: []float64{0}, + distribution: "upper", + input: nonExponentialHist, + want: func(metric pmetric.Metric) { + nonExponentialHist().CopyTo(metric) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := pmetric.NewMetric() + tt.input().CopyTo(metric) + + ctx := ottlmetric.NewTransformContext(metric, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + + exprFunc, err := convertExponentialHistToExplicitHist(tt.distribution, tt.arg) + assert.NoError(t, err) + _, err = exprFunc(nil, ctx) + assert.NoError(t, err) + + expected := pmetric.NewMetric() + tt.want(expected) + + assert.Equal(t, expected, metric) + }) + } +} + +func TestMidpoint_convert_exponential_hist_to_explicit_hist(t *testing.T) { + ts := pcommon.NewTimestampFromTime(time.Now()) + defaultTestMetric := func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetName("test-metric") + dp := m.SetEmptyExponentialHistogram().DataPoints().AppendEmpty() + m.ExponentialHistogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetScale(0) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + + dp.SetTimestamp(ts) + + dp.Attributes().PutStr("metric_type", "timing") + dp.Positive().SetOffset(5) + dp.Positive().BucketCounts().FromRaw([]uint64{10, 22, 12}) + return m + } + + tests := []struct { + name string + input func() pmetric.Metric + arg []float64 // ExplicitBounds + distribution string + want func(pmetric.Metric) + }{ + { + // having explicit bounds that are all smaller than the exponential histogram's scale + // will results in all the exponential histogram's data points being placed in the overflow bucket + name: "convert exponential histogram to explicit histogram with smaller bounds", + input: defaultTestMetric, + arg: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + distribution: "midpoint", + want: func(metric pmetric.Metric) { + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(0, 0, 0, 0, 44) // expect all counts in the overflow bucket + + // set explictbounds + dp.ExplicitBounds().Append(1.0, 2.0, 3.0, 4.0, 5.0) + + }, + }, + { + // having explicit bounds that are all larger than the exponential histogram's scale + // will results in all the exponential histogram's data points being placed in the 1st bucket + name: "convert exponential histogram to explicit histogram with large bounds", + input: defaultTestMetric, + arg: []float64{1000.0, 2000.0, 3000.0, 4000.0, 5000.0}, + distribution: "midpoint", + want: func(metric pmetric.Metric) { + + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(44, 0, 0, 0, 0) // expect all counts in the 1st bucket + + // set explictbounds + dp.ExplicitBounds().Append(1000.0, 2000.0, 3000.0, 4000.0, 5000.0) + + }, + }, + { + + name: "convert exponential histogram to explicit hist", + input: defaultTestMetric, + arg: []float64{10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0}, + distribution: "midpoint", + want: func(metric pmetric.Metric) { + + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(0, 0, 0, 10, 0, 0, 0, 0, 22, 12) + + // set explictbounds + dp.ExplicitBounds().Append(10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0) + + }, + }, + { + + name: "convert exponential histogram to explicit hist with zero count", + input: func() pmetric.Metric { + m := defaultTestMetric() + m.ExponentialHistogram().DataPoints().At(0).SetZeroCount(5) + return m + }, + arg: []float64{0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0}, + distribution: "midpoint", + want: func(metric pmetric.Metric) { + + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(5, 0, 0, 0, 10, 0, 0, 0, 0, 22, 12) + + // set explictbounds + dp.ExplicitBounds().Append(0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0) + + }, + }, + { + name: "empty expontential histogram given", + input: func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetName("empty") + m.SetEmptyExponentialHistogram() + return m + }, + arg: []float64{160.0, 170.0, 180.0, 190.0, 200.0}, + distribution: "midpoint", + want: func(metric pmetric.Metric) { + metric.SetName("empty") + metric.SetEmptyHistogram() + }, + }, + { + name: "non-expontential histogram given using upper distribute", + arg: []float64{0}, + distribution: "midpoint", + input: nonExponentialHist, + want: func(metric pmetric.Metric) { + nonExponentialHist().CopyTo(metric) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := pmetric.NewMetric() + tt.input().CopyTo(metric) + + ctx := ottlmetric.NewTransformContext(metric, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + + exprFunc, err := convertExponentialHistToExplicitHist(tt.distribution, tt.arg) + assert.NoError(t, err) + _, err = exprFunc(nil, ctx) + assert.NoError(t, err) + + expected := pmetric.NewMetric() + tt.want(expected) + + assert.Equal(t, expected, metric) + }) + } +} + +func TestUniforn_convert_exponential_hist_to_explicit_hist(t *testing.T) { + ts := pcommon.NewTimestampFromTime(time.Now()) + defaultTestMetric := func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetName("test-metric") + dp := m.SetEmptyExponentialHistogram().DataPoints().AppendEmpty() + m.ExponentialHistogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetScale(0) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + + dp.SetTimestamp(ts) + + dp.Attributes().PutStr("metric_type", "timing") + dp.Positive().SetOffset(5) + dp.Positive().BucketCounts().FromRaw([]uint64{10, 22, 12}) + return m + } + + tests := []struct { + name string + input func() pmetric.Metric + arg []float64 // ExplicitBounds + distribution string + want func(pmetric.Metric) + }{ + { + // having explicit bounds that are all smaller than the exponential histogram's scale + // will results in all the exponential histogram's data points being placed in the overflow bucket + name: "convert exponential histogram to explicit histogram with smaller bounds", + input: defaultTestMetric, + arg: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + distribution: "uniform", + want: func(metric pmetric.Metric) { + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(0, 0, 0, 0, 44) // expect all counts in the overflow bucket + + // set explictbounds + dp.ExplicitBounds().Append(1.0, 2.0, 3.0, 4.0, 5.0) + + }, + }, + { + // having explicit bounds that are all larger than the exponential histogram's scale + // will results in all the exponential histogram's data points being placed in the 1st bucket + name: "convert exponential histogram to explicit histogram with large bounds", + input: defaultTestMetric, + arg: []float64{1000.0, 2000.0, 3000.0, 4000.0, 5000.0}, + distribution: "uniform", + want: func(metric pmetric.Metric) { + + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(44, 0, 0, 0, 0) // expect all counts in the 1st bucket + + // set explictbounds + dp.ExplicitBounds().Append(1000.0, 2000.0, 3000.0, 4000.0, 5000.0) + + }, + }, + { + + name: "convert exponential histogram to explicit hist", + input: defaultTestMetric, + arg: []float64{10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0}, + distribution: "uniform", + want: func(metric pmetric.Metric) { + + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(0, 0, 0, 3, 3, 2, 8, 6, 5, 17) + + // set explictbounds + dp.ExplicitBounds().Append(10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0) + + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := pmetric.NewMetric() + tt.input().CopyTo(metric) + + ctx := ottlmetric.NewTransformContext(metric, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + + exprFunc, err := convertExponentialHistToExplicitHist(tt.distribution, tt.arg) + assert.NoError(t, err) + _, err = exprFunc(nil, ctx) + assert.NoError(t, err) + + expected := pmetric.NewMetric() + tt.want(expected) + + assert.Equal(t, expected, metric) + }) + } +} + +func TestRandom_convert_exponential_hist_to_explicit_hist(t *testing.T) { + ts := pcommon.NewTimestampFromTime(time.Now()) + defaultTestMetric := func() pmetric.Metric { + m := pmetric.NewMetric() + m.SetName("test-metric") + dp := m.SetEmptyExponentialHistogram().DataPoints().AppendEmpty() + m.ExponentialHistogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetScale(0) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + + dp.SetTimestamp(ts) + + dp.Attributes().PutStr("metric_type", "timing") + dp.Positive().SetOffset(5) + dp.Positive().BucketCounts().FromRaw([]uint64{10, 22, 12}) + return m + } + + tests := []struct { + name string + input func() pmetric.Metric + arg []float64 // ExplicitBounds + distribution string + want func(pmetric.Metric) + }{ + { + // having explicit bounds that are all smaller than the exponential histogram's scale + // will results in all the exponential histogram's data points being placed in the overflow bucket + name: "convert exponential histogram to explicit histogram with smaller bounds", + input: defaultTestMetric, + arg: []float64{1.0, 2.0, 3.0, 4.0, 5.0}, + distribution: "random", + want: func(metric pmetric.Metric) { + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(0, 0, 0, 0, 44) // expect all counts in the overflow bucket + + // set explictbounds + dp.ExplicitBounds().Append(1.0, 2.0, 3.0, 4.0, 5.0) + + }, + }, + { + // having explicit bounds that are all larger than the exponential histogram's scale + // will results in all the exponential histogram's data points being placed in the 1st bucket + name: "convert exponential histogram to explicit histogram with large bounds", + input: defaultTestMetric, + arg: []float64{1000.0, 2000.0, 3000.0, 4000.0, 5000.0}, + distribution: "random", + want: func(metric pmetric.Metric) { + + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(44, 0, 0, 0, 0) // expect all counts in the 1st bucket + + // set explictbounds + dp.ExplicitBounds().Append(1000.0, 2000.0, 3000.0, 4000.0, 5000.0) + + }, + }, + { + + name: "convert exponential histogram to explicit hist", + input: defaultTestMetric, + arg: []float64{10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0}, + distribution: "random", + want: func(metric pmetric.Metric) { + + metric.SetName("test-metric") + dp := metric.SetEmptyHistogram().DataPoints().AppendEmpty() + metric.Histogram().SetAggregationTemporality(1) + dp.SetCount(44) + dp.SetSum(999) + dp.SetMax(245) + dp.SetMin(40) + dp.SetTimestamp(ts) + + // set attributes + dp.Attributes().PutStr("metric_type", "timing") + + // set bucket counts + dp.BucketCounts().Append(0, 0, 3, 3, 2, 7, 5, 4, 4, 16) + + // set explictbounds + dp.ExplicitBounds().Append(10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metric := pmetric.NewMetric() + tt.input().CopyTo(metric) + + ctx := ottlmetric.NewTransformContext(metric, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + + exprFunc, err := convertExponentialHistToExplicitHist(tt.distribution, tt.arg) + assert.NoError(t, err) + _, err = exprFunc(nil, ctx) + assert.NoError(t, err) + + expected := pmetric.NewMetric() + tt.want(expected) + + // since the bucket counts are randomly distributed, we can't predict the exact output + // thus we only check if the metric dimensions are as expected. + if tt.name == "convert exponential histogram to explicit hist" { + expectedDp := expected.Histogram().DataPoints().At(0) + dp := metric.Histogram().DataPoints().At(0) + assert.Equal(t, + expectedDp.BucketCounts().Len(), + dp.BucketCounts().Len()) + + var count uint64 + for i := 0; i < dp.BucketCounts().Len(); i++ { + count += dp.BucketCounts().At(i) + } + + assert.Equal(t, expectedDp.Count(), count) + assert.Equal(t, expectedDp.ExplicitBounds().Len(), dp.ExplicitBounds().Len()) + + // even though the distribution is random, we know that for this + // particular test case, the min value is 40, therefore the 1st 3 bucket + // counts should be 0, as they represent values 10 - 30 + for i := 0; i < 3; i++ { + assert.Equal(t, uint64(0), dp.BucketCounts().At(i), "bucket %d", i) + } + + // since the max value in the exponential histogram is 245 + // we can assert that the overflow bucket has a count > 0 + overflow := dp.BucketCounts().At(dp.BucketCounts().Len() - 1) + assert.Positive(t, overflow, uint64(0), "overflow bucket count should be > 0") + return + } + + assert.Equal(t, expected, metric) + }) + } +} + +func Test_convertExponentialHistToExplicitHist_validate(t *testing.T) { + tests := []struct { + name string + sliceExplicitBoundsArgs []float64 + }{ + { + name: "empty explicit bounds", + sliceExplicitBoundsArgs: []float64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := convertExponentialHistToExplicitHist("random", tt.sliceExplicitBoundsArgs) + assert.Error(t, err) + assert.True(t, strings.Contains(err.Error(), "explicit bounds cannot be empty")) + }) + } +} diff --git a/processor/transformprocessor/internal/metrics/functions.go b/processor/transformprocessor/internal/metrics/functions.go index e9a4462f0a5e..23851dcd5ae0 100644 --- a/processor/transformprocessor/internal/metrics/functions.go +++ b/processor/transformprocessor/internal/metrics/functions.go @@ -51,6 +51,7 @@ func MetricFunctions() map[string]ottl.Factory[ottlmetric.TransformContext] { newCopyMetricFactory(), newScaleMetricFactory(), newAggregateOnAttributesFactory(), + newconvertExponentialHistToExplicitHistFactory(), newAggregateOnAttributeValueFactory(), ) diff --git a/processor/transformprocessor/internal/metrics/functions_test.go b/processor/transformprocessor/internal/metrics/functions_test.go index a54c9274d1a0..10e959be2df8 100644 --- a/processor/transformprocessor/internal/metrics/functions_test.go +++ b/processor/transformprocessor/internal/metrics/functions_test.go @@ -67,6 +67,7 @@ func Test_MetricFunctions(t *testing.T) { expected["extract_count_metric"] = newExtractCountMetricFactory() expected["copy_metric"] = newCopyMetricFactory() expected["scale_metric"] = newScaleMetricFactory() + expected["convert_exponential_histogram_to_histogram"] = newconvertExponentialHistToExplicitHistFactory() actual := MetricFunctions() require.Equal(t, len(expected), len(actual))