diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/Aggregation.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/Aggregation.scala index 411bf937e..c0b869618 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/Aggregation.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/Aggregation.scala @@ -41,7 +41,7 @@ sealed abstract class Aggregation( object Aggregation { - private object Defaults { + private[metrics] object Defaults { // See https://opentelemetry.io/docs/specs/otel/metrics/sdk/#explicit-bucket-histogram-aggregation val Boundaries: BucketBoundaries = BucketBoundaries( Vector( diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/Aggregator.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/Aggregator.scala index f4d16d00c..d655b9411 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/Aggregator.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/Aggregator.scala @@ -16,15 +16,25 @@ package org.typelevel.otel4s.sdk.metrics.aggregation +import cats.Applicative +import cats.effect.Temporal +import cats.effect.std.Random import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.BucketBoundaries +import org.typelevel.otel4s.metrics.MeasurementValue import org.typelevel.otel4s.sdk.TelemetryResource import org.typelevel.otel4s.sdk.common.InstrumentationScope import org.typelevel.otel4s.sdk.context.Context +import org.typelevel.otel4s.sdk.metrics.Aggregation +import org.typelevel.otel4s.sdk.metrics.InstrumentType import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality import org.typelevel.otel4s.sdk.metrics.data.MetricData import org.typelevel.otel4s.sdk.metrics.data.PointData import org.typelevel.otel4s.sdk.metrics.data.TimeWindow +import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter +import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup import org.typelevel.otel4s.sdk.metrics.internal.AsynchronousMeasurement +import org.typelevel.otel4s.sdk.metrics.internal.InstrumentDescriptor import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor /** Aggregators are responsible for holding aggregated values and taking a @@ -183,4 +193,98 @@ private[metrics] object Aggregator { ): F[Unit] } + /** Creates a [[Synchronous]] aggregator based on the given `aggregation`. + * + * @param aggregation + * the aggregation to use + * + * @param descriptor + * the descriptor of the instrument + * + * @param filter + * used by the exemplar reservoir to filter the offered values + * + * @param traceContextLookup + * used by the exemplar reservoir to extract tracing information from the + * context + * + * @tparam F + * the higher-kinded type of a polymorphic effect + * + * @tparam A + * the type of the values to record + */ + def synchronous[F[_]: Temporal: Random, A: MeasurementValue: Numeric]( + aggregation: Aggregation.Synchronous, + descriptor: InstrumentDescriptor.Synchronous, + filter: ExemplarFilter, + traceContextLookup: TraceContextLookup + ): Aggregator.Synchronous[F, A] = { + def sum: Aggregator.Synchronous[F, A] = + SumAggregator.synchronous( + Runtime.getRuntime.availableProcessors, + filter, + traceContextLookup + ) + + def lastValue: Aggregator.Synchronous[F, A] = + LastValueAggregator.synchronous[F, A] + + def histogram(boundaries: BucketBoundaries): Aggregator.Synchronous[F, A] = + ExplicitBucketHistogramAggregator(boundaries, filter, traceContextLookup) + + aggregation match { + case Aggregation.Default => + descriptor.instrumentType match { + case InstrumentType.Counter => sum + case InstrumentType.UpDownCounter => sum + case InstrumentType.Histogram => + histogram(Aggregation.Defaults.Boundaries) + } + + case Aggregation.Sum => sum + case Aggregation.LastValue => lastValue + + case Aggregation.ExplicitBucketHistogram(boundaries) => + histogram(boundaries) + } + } + + /** Creates an [[Asynchronous]] aggregator based on the given `aggregation`. + * + * @param aggregation + * the aggregation to use + * + * @param descriptor + * the descriptor of the instrument + * + * @tparam F + * the higher-kinded type of a polymorphic effect + * + * @tparam A + * the type of the values to record + */ + def asynchronous[F[_]: Applicative, A: MeasurementValue: Numeric]( + aggregation: Aggregation.Asynchronous, + descriptor: InstrumentDescriptor.Asynchronous + ): Aggregator.Asynchronous[F, A] = { + def sum: Aggregator.Asynchronous[F, A] = + SumAggregator.asynchronous[F, A] + + def lastValue: Aggregator.Asynchronous[F, A] = + LastValueAggregator.asynchronous[F, A] + + aggregation match { + case Aggregation.Default => + descriptor.instrumentType match { + case InstrumentType.ObservableCounter => sum + case InstrumentType.ObservableUpDownCounter => sum + case InstrumentType.ObservableGauge => lastValue + } + + case Aggregation.Sum => sum + case Aggregation.LastValue => lastValue + } + } + } diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/ExplicitBucketHistogramAggregator.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/ExplicitBucketHistogramAggregator.scala index 9eb0ba318..676bc0336 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/ExplicitBucketHistogramAggregator.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/ExplicitBucketHistogramAggregator.scala @@ -90,10 +90,11 @@ private object ExplicitBucketHistogramAggregator { * the bucket boundaries to aggregate values at * * @param filter - * filters the offered values + * used by the exemplar reservoir to filter the offered values * * @param lookup - * extracts tracing information from the context + * used by the exemplar reservoir to extract tracing information from the + * context * * @tparam F * the higher-kinded type of a polymorphic effect diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/SumAggregator.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/SumAggregator.scala index 5e927a1a4..d5ddd5707 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/SumAggregator.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/SumAggregator.scala @@ -50,10 +50,11 @@ private object SumAggregator { * the maximum number of exemplars to preserve * * @param filter - * filters the offered values + * used by the exemplar reservoir to filter the offered values * * @param lookup - * extracts tracing information from the context + * used by the exemplar reservoir to extract tracing information from the + * context * * @tparam F * the higher-kinded type of a polymorphic effect diff --git a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/aggregation/AggregatorSuite.scala b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/aggregation/AggregatorSuite.scala new file mode 100644 index 000000000..884a12501 --- /dev/null +++ b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/aggregation/AggregatorSuite.scala @@ -0,0 +1,182 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.metrics.aggregation + +import cats.effect.IO +import cats.effect.std.Random +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.Gen +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.sdk.metrics.Aggregation +import org.typelevel.otel4s.sdk.metrics.InstrumentType +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.metrics.data.MetricPoints +import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter +import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup +import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor +import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens + +class AggregatorSuite extends CatsEffectSuite with ScalaCheckEffectSuite { + + private val synchronousAggregationGen: Gen[Aggregation.Synchronous] = + Gen.oneOf( + Gen.const(Aggregation.Default), + Gen.const(Aggregation.Sum), + Gen.const(Aggregation.LastValue), + Gens.bucketBoundaries.map(Aggregation.ExplicitBucketHistogram) + ) + + private val asynchronousAggregationGen: Gen[Aggregation.Asynchronous] = + Gen.oneOf( + Gen.const(Aggregation.Default), + Gen.const(Aggregation.Sum), + Gen.const(Aggregation.LastValue) + ) + + test("create an aggregator for a synchronous instrument") { + PropF.forAllF( + synchronousAggregationGen, + Gens.synchronousInstrumentDescriptor, + Gens.telemetryResource, + Gens.instrumentationScope, + Gens.aggregationTemporality + ) { (aggregation, descriptor, resource, scope, temporality) => + Random.scalaUtilRandom[IO].flatMap { implicit R: Random[IO] => + val aggregator = Aggregator.synchronous[IO, Long]( + aggregation, + descriptor, + ExemplarFilter.alwaysOn, + TraceContextLookup.noop + ) + + val expected = { + def sum = { + val monotonic = + descriptor.instrumentType match { + case InstrumentType.Counter => true + case InstrumentType.Histogram => true + case _ => false + } + + MetricPoints.sum(Vector.empty, monotonic, temporality) + } + + def lastValue = + MetricPoints.gauge(Vector.empty) + + def histogram = + MetricPoints.histogram(Vector.empty, temporality) + + val metricPoints = aggregation match { + case Aggregation.Default => + descriptor.instrumentType match { + case InstrumentType.Counter => sum + case InstrumentType.UpDownCounter => sum + case InstrumentType.Histogram => histogram + } + + case Aggregation.Sum => sum + case Aggregation.LastValue => lastValue + case Aggregation.ExplicitBucketHistogram(_) => histogram + } + + MetricData( + resource = resource, + scope = scope, + name = descriptor.name.toString, + description = descriptor.description, + unit = descriptor.unit, + data = metricPoints + ) + } + + for { + result <- aggregator.toMetricData( + resource, + scope, + MetricDescriptor(None, descriptor), + Vector.empty, + temporality + ) + } yield assertEquals(result, expected) + } + } + } + + test("create an aggregator for an asynchronous instrument") { + PropF.forAllF( + asynchronousAggregationGen, + Gens.asynchronousInstrumentDescriptor, + Gens.telemetryResource, + Gens.instrumentationScope, + Gens.aggregationTemporality + ) { (aggregation, descriptor, resource, scope, temporality) => + val aggregator = Aggregator.asynchronous[IO, Long]( + aggregation, + descriptor + ) + + val expected = { + def sum = { + val monotonic = + descriptor.instrumentType match { + case InstrumentType.ObservableCounter => true + case _ => false + } + + MetricPoints.sum(Vector.empty, monotonic, temporality) + } + + def lastValue = + MetricPoints.gauge(Vector.empty) + + val metricPoints = aggregation match { + case Aggregation.Default => + descriptor.instrumentType match { + case InstrumentType.ObservableCounter => sum + case InstrumentType.ObservableUpDownCounter => sum + case InstrumentType.ObservableGauge => lastValue + } + + case Aggregation.Sum => sum + case Aggregation.LastValue => lastValue + } + + MetricData( + resource = resource, + scope = scope, + name = descriptor.name.toString, + description = descriptor.description, + unit = descriptor.unit, + data = metricPoints + ) + } + + for { + result <- aggregator.toMetricData( + resource, + scope, + MetricDescriptor(None, descriptor), + Vector.empty, + temporality + ) + } yield assertEquals(result, expected) + } + } + +} diff --git a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/scalacheck/Gens.scala b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/scalacheck/Gens.scala index 76923ea37..ffd106ebf 100644 --- a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/scalacheck/Gens.scala +++ b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/scalacheck/Gens.scala @@ -60,7 +60,7 @@ trait Gens extends org.typelevel.otel4s.sdk.scalacheck.Gens { case tpe: InstrumentType.Asynchronous => tpe }) - val synchronousInstrumentDescriptor: Gen[InstrumentDescriptor] = + val synchronousInstrumentDescriptor: Gen[InstrumentDescriptor.Synchronous] = for { tpe <- Gens.synchronousInstrumentType name <- Gens.ciString @@ -68,7 +68,7 @@ trait Gens extends org.typelevel.otel4s.sdk.scalacheck.Gens { unit <- Gen.option(Gen.alphaNumStr) } yield InstrumentDescriptor.synchronous(name, description, unit, tpe) - val asynchronousInstrumentDescriptor: Gen[InstrumentDescriptor] = + val asynchronousInstrumentDescriptor: Gen[InstrumentDescriptor.Asynchronous] = for { tpe <- Gens.asynchronousInstrumentType name <- Gens.ciString