Skip to content

Commit

Permalink
sdk-metrics: add Aggregator.synchronous and Aggregator.asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
iRevive committed Apr 19, 2024
1 parent ade6174 commit 85c4c82
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ sealed abstract class Aggregation(

object Aggregation {

private object Defaults {
private[metrics] object Defaults {
// See https://opentelemetry.io/docs/specs/otel/metrics/sdk/#explicit-bucket-histogram-aggregation
val Boundaries: BucketBoundaries = BucketBoundaries(
Vector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@

package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.Applicative
import cats.effect.Temporal
import cats.effect.std.Random
import org.typelevel.otel4s.Attributes
import org.typelevel.otel4s.metrics.BucketBoundaries
import org.typelevel.otel4s.metrics.MeasurementValue
import org.typelevel.otel4s.sdk.TelemetryResource
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.metrics.Aggregation
import org.typelevel.otel4s.sdk.metrics.InstrumentType
import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality
import org.typelevel.otel4s.sdk.metrics.data.MetricData
import org.typelevel.otel4s.sdk.metrics.data.PointData
import org.typelevel.otel4s.sdk.metrics.data.TimeWindow
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter
import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup
import org.typelevel.otel4s.sdk.metrics.internal.AsynchronousMeasurement
import org.typelevel.otel4s.sdk.metrics.internal.InstrumentDescriptor
import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor

/** Aggregators are responsible for holding aggregated values and taking a
Expand Down Expand Up @@ -183,4 +193,98 @@ private[metrics] object Aggregator {
): F[Unit]
}

/** Creates a [[Synchronous]] aggregator based on the given `aggregation`.
*
* @param aggregation
* the aggregation to use
*
* @param descriptor
* the descriptor of the instrument
*
* @param filter
* used by the exemplar reservoir to filter the offered values
*
* @param traceContextLookup
* used by the exemplar reservoir to extract tracing information from the
* context
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*
* @tparam A
* the type of the values to record
*/
def synchronous[F[_]: Temporal: Random, A: MeasurementValue: Numeric](
aggregation: Aggregation.Synchronous,
descriptor: InstrumentDescriptor.Synchronous,
filter: ExemplarFilter,
traceContextLookup: TraceContextLookup
): Aggregator.Synchronous[F, A] = {
def sum: Aggregator.Synchronous[F, A] =
SumAggregator.synchronous(
Runtime.getRuntime.availableProcessors,
filter,
traceContextLookup
)

def lastValue: Aggregator.Synchronous[F, A] =
LastValueAggregator.synchronous[F, A]

def histogram(boundaries: BucketBoundaries): Aggregator.Synchronous[F, A] =
ExplicitBucketHistogramAggregator(boundaries, filter, traceContextLookup)

aggregation match {
case Aggregation.Default =>
descriptor.instrumentType match {
case InstrumentType.Counter => sum
case InstrumentType.UpDownCounter => sum
case InstrumentType.Histogram =>
histogram(Aggregation.Defaults.Boundaries)
}

case Aggregation.Sum => sum
case Aggregation.LastValue => lastValue

case Aggregation.ExplicitBucketHistogram(boundaries) =>
histogram(boundaries)
}
}

/** Creates an [[Asynchronous]] aggregator based on the given `aggregation`.
*
* @param aggregation
* the aggregation to use
*
* @param descriptor
* the descriptor of the instrument
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*
* @tparam A
* the type of the values to record
*/
def asynchronous[F[_]: Applicative, A: MeasurementValue: Numeric](
aggregation: Aggregation.Asynchronous,
descriptor: InstrumentDescriptor.Asynchronous
): Aggregator.Asynchronous[F, A] = {
def sum: Aggregator.Asynchronous[F, A] =
SumAggregator.asynchronous[F, A]

def lastValue: Aggregator.Asynchronous[F, A] =
LastValueAggregator.asynchronous[F, A]

aggregation match {
case Aggregation.Default =>
descriptor.instrumentType match {
case InstrumentType.ObservableCounter => sum
case InstrumentType.ObservableUpDownCounter => sum
case InstrumentType.ObservableGauge => lastValue
}

case Aggregation.Sum => sum
case Aggregation.LastValue => lastValue
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ private object ExplicitBucketHistogramAggregator {
* the bucket boundaries to aggregate values at
*
* @param filter
* filters the offered values
* used by the exemplar reservoir to filter the offered values
*
* @param lookup
* extracts tracing information from the context
* used by the exemplar reservoir to extract tracing information from the
* context
*
* @tparam F
* the higher-kinded type of a polymorphic effect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ private object SumAggregator {
* the maximum number of exemplars to preserve
*
* @param filter
* filters the offered values
* used by the exemplar reservoir to filter the offered values
*
* @param lookup
* extracts tracing information from the context
* used by the exemplar reservoir to extract tracing information from the
* context
*
* @tparam F
* the higher-kinded type of a polymorphic effect
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.effect.IO
import cats.effect.std.Random
import munit.CatsEffectSuite
import munit.ScalaCheckEffectSuite
import org.scalacheck.Gen
import org.scalacheck.effect.PropF
import org.typelevel.otel4s.sdk.metrics.Aggregation
import org.typelevel.otel4s.sdk.metrics.InstrumentType
import org.typelevel.otel4s.sdk.metrics.data.MetricData
import org.typelevel.otel4s.sdk.metrics.data.MetricPoints
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter
import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup
import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor
import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens

class AggregatorSuite extends CatsEffectSuite with ScalaCheckEffectSuite {

private val synchronousAggregationGen: Gen[Aggregation.Synchronous] =
Gen.oneOf(
Gen.const(Aggregation.Default),
Gen.const(Aggregation.Sum),
Gen.const(Aggregation.LastValue),
Gens.bucketBoundaries.map(Aggregation.ExplicitBucketHistogram)
)

private val asynchronousAggregationGen: Gen[Aggregation.Asynchronous] =
Gen.oneOf(
Gen.const(Aggregation.Default),
Gen.const(Aggregation.Sum),
Gen.const(Aggregation.LastValue)
)

test("create an aggregator for a synchronous instrument") {
PropF.forAllF(
synchronousAggregationGen,
Gens.synchronousInstrumentDescriptor,
Gens.telemetryResource,
Gens.instrumentationScope,
Gens.aggregationTemporality
) { (aggregation, descriptor, resource, scope, temporality) =>
Random.scalaUtilRandom[IO].flatMap { implicit R: Random[IO] =>
val aggregator = Aggregator.synchronous[IO, Long](
aggregation,
descriptor,
ExemplarFilter.alwaysOn,
TraceContextLookup.noop
)

val expected = {
def sum = {
val monotonic =
descriptor.instrumentType match {
case InstrumentType.Counter => true
case InstrumentType.Histogram => true
case _ => false
}

MetricPoints.sum(Vector.empty, monotonic, temporality)
}

def lastValue =
MetricPoints.gauge(Vector.empty)

def histogram =
MetricPoints.histogram(Vector.empty, temporality)

val metricPoints = aggregation match {
case Aggregation.Default =>
descriptor.instrumentType match {
case InstrumentType.Counter => sum
case InstrumentType.UpDownCounter => sum
case InstrumentType.Histogram => histogram
}

case Aggregation.Sum => sum
case Aggregation.LastValue => lastValue
case Aggregation.ExplicitBucketHistogram(_) => histogram
}

MetricData(
resource = resource,
scope = scope,
name = descriptor.name.toString,
description = descriptor.description,
unit = descriptor.unit,
data = metricPoints
)
}

for {
result <- aggregator.toMetricData(
resource,
scope,
MetricDescriptor(None, descriptor),
Vector.empty,
temporality
)
} yield assertEquals(result, expected)
}
}
}

test("create an aggregator for an asynchronous instrument") {
PropF.forAllF(
asynchronousAggregationGen,
Gens.asynchronousInstrumentDescriptor,
Gens.telemetryResource,
Gens.instrumentationScope,
Gens.aggregationTemporality
) { (aggregation, descriptor, resource, scope, temporality) =>
val aggregator = Aggregator.asynchronous[IO, Long](
aggregation,
descriptor
)

val expected = {
def sum = {
val monotonic =
descriptor.instrumentType match {
case InstrumentType.ObservableCounter => true
case _ => false
}

MetricPoints.sum(Vector.empty, monotonic, temporality)
}

def lastValue =
MetricPoints.gauge(Vector.empty)

val metricPoints = aggregation match {
case Aggregation.Default =>
descriptor.instrumentType match {
case InstrumentType.ObservableCounter => sum
case InstrumentType.ObservableUpDownCounter => sum
case InstrumentType.ObservableGauge => lastValue
}

case Aggregation.Sum => sum
case Aggregation.LastValue => lastValue
}

MetricData(
resource = resource,
scope = scope,
name = descriptor.name.toString,
description = descriptor.description,
unit = descriptor.unit,
data = metricPoints
)
}

for {
result <- aggregator.toMetricData(
resource,
scope,
MetricDescriptor(None, descriptor),
Vector.empty,
temporality
)
} yield assertEquals(result, expected)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ trait Gens extends org.typelevel.otel4s.sdk.scalacheck.Gens {
case tpe: InstrumentType.Asynchronous => tpe
})

val synchronousInstrumentDescriptor: Gen[InstrumentDescriptor] =
val synchronousInstrumentDescriptor: Gen[InstrumentDescriptor.Synchronous] =
for {
tpe <- Gens.synchronousInstrumentType
name <- Gens.ciString
description <- Gen.option(Gen.alphaNumStr)
unit <- Gen.option(Gen.alphaNumStr)
} yield InstrumentDescriptor.synchronous(name, description, unit, tpe)

val asynchronousInstrumentDescriptor: Gen[InstrumentDescriptor] =
val asynchronousInstrumentDescriptor: Gen[InstrumentDescriptor.Asynchronous] =
for {
tpe <- Gens.asynchronousInstrumentType
name <- Gens.ciString
Expand Down

0 comments on commit 85c4c82

Please sign in to comment.