Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for Otlp exponential histograms #38

Merged
merged 5 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import goodmetrics.pipeline.AggregatedBatch
import goodmetrics.pipeline.Aggregator
import goodmetrics.pipeline.BatchSender.Companion.launchSender
import goodmetrics.pipeline.Batcher
import goodmetrics.pipeline.DistributionMode
import goodmetrics.pipeline.MetricsSink
import goodmetrics.pipeline.SynchronizingBuffer
import io.grpc.Metadata
Expand All @@ -31,7 +32,7 @@ data class ConfiguredMetrics(

class MetricsSetups private constructor() {
companion object {
fun CoroutineScope.goodMetrics(goodmetricsHost: String = "localhost", port: Int = 9573, aggregationWidth: Duration = 10.seconds): ConfiguredMetrics {
fun CoroutineScope.goodMetrics(goodmetricsHost: String = "localhost", port: Int = 9573, aggregationWidth: Duration = 10.seconds, distributionMode: DistributionMode = DistributionMode.Histogram): ConfiguredMetrics {
val unaryIncomingBuffer = SynchronizingBuffer()
val unaryFactory = MetricsFactory(unaryIncomingBuffer, timeSource = NanoTimeSource.preciseNanoTime, totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds)

Expand All @@ -40,7 +41,7 @@ class MetricsSetups private constructor() {
sendMetricsBatch(batch)
}

val preaggregatedIncomingBuffer = Aggregator(aggregationWidth)
val preaggregatedIncomingBuffer = Aggregator(aggregationWidth, distributionMode = distributionMode)
val preaggregatedFactory = MetricsFactory(preaggregatedIncomingBuffer, timeSource = NanoTimeSource.fastNanoTime, totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds)

val preaggregatedBatcher = Batcher(preaggregatedIncomingBuffer)
Expand Down Expand Up @@ -362,8 +363,10 @@ class MetricsSetups private constructor() {
client: OpentelemetryClient,
logError: (message: String, exception: Exception) -> Unit,
onSendPreaggregated: (List<AggregatedBatch>) -> Unit,
// By default, use Histogram distribution mode since ExponentialHistograms are not frequently used
distributionMode: DistributionMode = DistributionMode.Histogram,
): Aggregator {
val sink = Aggregator(aggregationWidth = aggregationWidth)
val sink = Aggregator(aggregationWidth = aggregationWidth, distributionMode = distributionMode)
val batcher = Batcher(
sink,
batchSize = batchSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import io.goodmetrics.statisticSet
import io.grpc.netty.GrpcSslContexts
import io.grpc.netty.NettyChannelBuilder
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
import kotlin.math.roundToLong

class GoodmetricsClient private constructor(
private val stub: MetricsGrpcKt.MetricsCoroutineStub,
Expand Down Expand Up @@ -156,6 +157,14 @@ fun Aggregation.toProto(): Measurement = measurement {
}
}
}
is Aggregation.ExponentialHistogram -> {
val bucketCounts = this@toProto.valueCounts().map { (value, count) -> Pair(value.roundToLong(), count)}.toMap()
histogram = histogram {
for ((bucket, count) in bucketCounts) {
buckets[bucket] = count
}
}
}
is Aggregation.StatisticSet -> {
statisticSet = statisticSet {
minimum = min.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import goodmetrics.io.opentelemetry.proto.common.v1.anyValue
import goodmetrics.io.opentelemetry.proto.common.v1.instrumentationScope
import goodmetrics.io.opentelemetry.proto.common.v1.keyValue
import goodmetrics.io.opentelemetry.proto.metrics.v1.AggregationTemporality
import goodmetrics.io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPointKt.buckets
import goodmetrics.io.opentelemetry.proto.metrics.v1.Metric
import goodmetrics.io.opentelemetry.proto.metrics.v1.ResourceMetrics
import goodmetrics.io.opentelemetry.proto.metrics.v1.ScopeMetrics
import goodmetrics.io.opentelemetry.proto.metrics.v1.exponentialHistogram
import goodmetrics.io.opentelemetry.proto.metrics.v1.exponentialHistogramDataPoint
import goodmetrics.io.opentelemetry.proto.metrics.v1.gauge
import goodmetrics.io.opentelemetry.proto.metrics.v1.histogram
import goodmetrics.io.opentelemetry.proto.metrics.v1.histogramDataPoint
Expand Down Expand Up @@ -184,6 +187,15 @@ class OpentelemetryClient(
}
)
}
is Aggregation.ExponentialHistogram -> {
yield(
metric {
name = "${this@asGoofyOtlpMetricSequence.metric}_$measurementName"
unit = "1"
exponentialHistogram = aggregation.asOtlpExponentialHistogram(otlpDimensions, this@asGoofyOtlpMetricSequence.timestampNanos, aggregationWidth)
}
)
}
is Aggregation.StatisticSet -> {
yieldAll(aggregation.statisticSetToOtlp(this@asGoofyOtlpMetricSequence.metric, measurementName, timestampNanos, aggregationWidth, otlpDimensions))
}
Expand Down Expand Up @@ -355,6 +367,39 @@ class OpentelemetryClient(
)
}

private fun Aggregation.ExponentialHistogram.asOtlpExponentialHistogram(
otlpDimensions: Iterable<KeyValue>,
timestampNanos: Long,
aggregationWidth: Duration,
) = exponentialHistogram {
aggregationTemporality = AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
dataPoints.add(
exponentialHistogramDataPoint {
attributes.addAll(otlpDimensions)
startTimeUnixNano = timestampNanos - aggregationWidth.inWholeNanoseconds
timeUnixNano = timestampNanos
count = this@asOtlpExponentialHistogram.count()
sum = if(this@asOtlpExponentialHistogram.hasNegatives()) 0.0 else (this@asOtlpExponentialHistogram.sum())
min = this@asOtlpExponentialHistogram.min()
max = this@asOtlpExponentialHistogram.max()
scale = this@asOtlpExponentialHistogram.scale()
zeroCount = 0
positive = buckets {
offset = this@asOtlpExponentialHistogram.bucketStartOffset()
bucketCounts.addAll(
this@asOtlpExponentialHistogram.takePositives()
)
}
negative = buckets {
offset = this@asOtlpExponentialHistogram.bucketStartOffset()
bucketCounts.addAll(
this@asOtlpExponentialHistogram.takeNegatives()
)
}
}
)
}

private val library = instrumentationScope {
name = "goodmetrics_kotlin"
version = OpentelemetryClient::class.java.`package`.implementationVersion ?: "development"
Expand Down
Loading
Loading