Skip to content

Commit

Permalink
feat: add support for Otlp exponential histograms (#38)
Browse files Browse the repository at this point in the history
Adds support for Otlp exponential histograms. This is more or less the JVM implementation
of the goodmetrics_rs feature.

Unit tests were grabbed from the latest main of the Rust implementation's unit tests.

I also added a new sealed interface called DistributionMode so consumers can explicitly set which aggregation mode they would
like to use. Considering exponential histograms are a "newer" technology, the default is set to Histogram so this new version
is not a breaking change.
  • Loading branch information
tylerburdsall authored Mar 20, 2024
1 parent 20c421a commit 81b1520
Show file tree
Hide file tree
Showing 5 changed files with 544 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import goodmetrics.pipeline.AggregatedBatch
import goodmetrics.pipeline.Aggregator
import goodmetrics.pipeline.BatchSender.Companion.launchSender
import goodmetrics.pipeline.Batcher
import goodmetrics.pipeline.DistributionMode
import goodmetrics.pipeline.MetricsSink
import goodmetrics.pipeline.SynchronizingBuffer
import io.grpc.Metadata
Expand All @@ -31,7 +32,7 @@ data class ConfiguredMetrics(

class MetricsSetups private constructor() {
companion object {
fun CoroutineScope.goodMetrics(goodmetricsHost: String = "localhost", port: Int = 9573, aggregationWidth: Duration = 10.seconds): ConfiguredMetrics {
fun CoroutineScope.goodMetrics(goodmetricsHost: String = "localhost", port: Int = 9573, aggregationWidth: Duration = 10.seconds, distributionMode: DistributionMode = DistributionMode.Histogram): ConfiguredMetrics {
val unaryIncomingBuffer = SynchronizingBuffer()
val unaryFactory = MetricsFactory(unaryIncomingBuffer, timeSource = NanoTimeSource.preciseNanoTime, totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds)

Expand All @@ -40,7 +41,7 @@ class MetricsSetups private constructor() {
sendMetricsBatch(batch)
}

val preaggregatedIncomingBuffer = Aggregator(aggregationWidth)
val preaggregatedIncomingBuffer = Aggregator(aggregationWidth, distributionMode = distributionMode)
val preaggregatedFactory = MetricsFactory(preaggregatedIncomingBuffer, timeSource = NanoTimeSource.fastNanoTime, totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds)

val preaggregatedBatcher = Batcher(preaggregatedIncomingBuffer)
Expand Down Expand Up @@ -362,8 +363,10 @@ class MetricsSetups private constructor() {
client: OpentelemetryClient,
logError: (message: String, exception: Exception) -> Unit,
onSendPreaggregated: (List<AggregatedBatch>) -> Unit,
// By default, use Histogram distribution mode since ExponentialHistograms are not frequently used
distributionMode: DistributionMode = DistributionMode.Histogram,
): Aggregator {
val sink = Aggregator(aggregationWidth = aggregationWidth)
val sink = Aggregator(aggregationWidth = aggregationWidth, distributionMode = distributionMode)
val batcher = Batcher(
sink,
batchSize = batchSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import io.goodmetrics.statisticSet
import io.grpc.netty.GrpcSslContexts
import io.grpc.netty.NettyChannelBuilder
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
import kotlin.math.roundToLong

class GoodmetricsClient private constructor(
private val stub: MetricsGrpcKt.MetricsCoroutineStub,
Expand Down Expand Up @@ -156,6 +157,14 @@ fun Aggregation.toProto(): Measurement = measurement {
}
}
}
is Aggregation.ExponentialHistogram -> {
val bucketCounts = this@toProto.valueCounts().map { (value, count) -> Pair(value.roundToLong(), count)}.toMap()
histogram = histogram {
for ((bucket, count) in bucketCounts) {
buckets[bucket] = count
}
}
}
is Aggregation.StatisticSet -> {
statisticSet = statisticSet {
minimum = min.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import goodmetrics.io.opentelemetry.proto.common.v1.anyValue
import goodmetrics.io.opentelemetry.proto.common.v1.instrumentationScope
import goodmetrics.io.opentelemetry.proto.common.v1.keyValue
import goodmetrics.io.opentelemetry.proto.metrics.v1.AggregationTemporality
import goodmetrics.io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPointKt.buckets
import goodmetrics.io.opentelemetry.proto.metrics.v1.Metric
import goodmetrics.io.opentelemetry.proto.metrics.v1.ResourceMetrics
import goodmetrics.io.opentelemetry.proto.metrics.v1.ScopeMetrics
import goodmetrics.io.opentelemetry.proto.metrics.v1.exponentialHistogram
import goodmetrics.io.opentelemetry.proto.metrics.v1.exponentialHistogramDataPoint
import goodmetrics.io.opentelemetry.proto.metrics.v1.gauge
import goodmetrics.io.opentelemetry.proto.metrics.v1.histogram
import goodmetrics.io.opentelemetry.proto.metrics.v1.histogramDataPoint
Expand Down Expand Up @@ -184,6 +187,15 @@ class OpentelemetryClient(
}
)
}
is Aggregation.ExponentialHistogram -> {
yield(
metric {
name = "${this@asGoofyOtlpMetricSequence.metric}_$measurementName"
unit = "1"
exponentialHistogram = aggregation.asOtlpExponentialHistogram(otlpDimensions, this@asGoofyOtlpMetricSequence.timestampNanos, aggregationWidth)
}
)
}
is Aggregation.StatisticSet -> {
yieldAll(aggregation.statisticSetToOtlp(this@asGoofyOtlpMetricSequence.metric, measurementName, timestampNanos, aggregationWidth, otlpDimensions))
}
Expand Down Expand Up @@ -355,6 +367,39 @@ class OpentelemetryClient(
)
}

private fun Aggregation.ExponentialHistogram.asOtlpExponentialHistogram(
otlpDimensions: Iterable<KeyValue>,
timestampNanos: Long,
aggregationWidth: Duration,
) = exponentialHistogram {
aggregationTemporality = AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
dataPoints.add(
exponentialHistogramDataPoint {
attributes.addAll(otlpDimensions)
startTimeUnixNano = timestampNanos - aggregationWidth.inWholeNanoseconds
timeUnixNano = timestampNanos
count = this@asOtlpExponentialHistogram.count()
sum = if(this@asOtlpExponentialHistogram.hasNegatives()) 0.0 else (this@asOtlpExponentialHistogram.sum())
min = this@asOtlpExponentialHistogram.min()
max = this@asOtlpExponentialHistogram.max()
scale = this@asOtlpExponentialHistogram.scale()
zeroCount = 0
positive = buckets {
offset = this@asOtlpExponentialHistogram.bucketStartOffset()
bucketCounts.addAll(
this@asOtlpExponentialHistogram.takePositives()
)
}
negative = buckets {
offset = this@asOtlpExponentialHistogram.bucketStartOffset()
bucketCounts.addAll(
this@asOtlpExponentialHistogram.takeNegatives()
)
}
}
)
}

private val library = instrumentationScope {
name = "goodmetrics_kotlin"
version = OpentelemetryClient::class.java.`package`.implementationVersion ?: "development"
Expand Down
Loading

0 comments on commit 81b1520

Please sign in to comment.