From ee15b0d38ad3c5ea03b2513b7fc494777f2f4520 Mon Sep 17 00:00:00 2001 From: Tyler Burdsall Date: Fri, 8 Mar 2024 14:55:27 -0800 Subject: [PATCH] feat: add ability to send metrics to generic OTLP ingest point. DRY up code to avoid duplication since a lot of it is shared. --- .../main/kotlin/goodmetrics/MetricsSetups.kt | 198 +++++++++++++++--- 1 file changed, 166 insertions(+), 32 deletions(-) diff --git a/kotlin/goodmetrics/src/main/kotlin/goodmetrics/MetricsSetups.kt b/kotlin/goodmetrics/src/main/kotlin/goodmetrics/MetricsSetups.kt index 353519f..0623f94 100644 --- a/kotlin/goodmetrics/src/main/kotlin/goodmetrics/MetricsSetups.kt +++ b/kotlin/goodmetrics/src/main/kotlin/goodmetrics/MetricsSetups.kt @@ -1,5 +1,7 @@ package goodmetrics +import goodmetrics.MetricsSetups.Companion.configureBatchedPreaggregatedOtlpSink +import goodmetrics.MetricsSetups.Companion.configureBatchedUnaryOtlpSink import goodmetrics.downstream.CompressionMode import goodmetrics.downstream.GoodmetricsClient import goodmetrics.downstream.GrpcTrailerLoggerInterceptor @@ -54,10 +56,10 @@ class MetricsSetups private constructor() { /** * preaggregated metrics in lightstep appear as Distributions for `Metrics::distribution`s - * and as Delta temporality Sums for `Metrics::measure`ments. + * and as Delta temporality Sums for `Metrics::measurements`. * * raw, unary metrics in lightstep also appear as Distribution for `Metrics::distribution`s - * however `Metrics::measure`ments appear as Delta temporality Gauge so you can look at those + * however `Metrics::measurements` appear as Delta temporality Gauge so you can look at those * values with more flexibility in a way that makes more sense for raw emissions. It's a sharper * edged tool here, and there might be a better representation - to which goodmetrics will change * upon discovery. @@ -95,6 +97,7 @@ class MetricsSetups private constructor() { ): ConfiguredMetrics { val client = opentelemetryClient( lightstepAccessToken, + "lightstep-access-token", lightstepUrl, lightstepPort, prescientDimensions, @@ -104,23 +107,16 @@ class MetricsSetups private constructor() { logRawPayload, compressionMode, ) - - val unarySink = configureBatchedUnaryLightstepSink(unaryBatchSizeMaxMetricsCount, unaryBatchMaxAge, client, logError, onSendUnary) - val preaggregatedSink = configureBatchedPreaggregatedLightstepSink(aggregationWidth, preaggregatedBatchMaxMetricsCount, preaggregatedBatchMaxAge, client, logError, onSendPreaggregated) - - val unaryFactory = MetricsFactory( - sink = unarySink, - timeSource = NanoTimeSource.preciseNanoTime, - totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds, - ) - val preaggregatedFactory = MetricsFactory( - sink = preaggregatedSink, - timeSource = NanoTimeSource.fastNanoTime, - totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds, - ) - return ConfiguredMetrics( - unaryMetricsFactory = unaryFactory, - preaggregatedMetricsFactory = preaggregatedFactory, + return configureMetricsForOtlpClient( + client, + unaryBatchSizeMaxMetricsCount, + unaryBatchMaxAge, + logError, + aggregationWidth, + preaggregatedBatchMaxMetricsCount, + preaggregatedBatchMaxAge, + onSendUnary, + onSendPreaggregated ) } @@ -152,6 +148,7 @@ class MetricsSetups private constructor() { ): MetricsFactory { val client = opentelemetryClient( lightstepAccessToken, + "lightstep-access-token", lightstepUrl, lightstepPort, prescientDimensions, @@ -160,7 +157,142 @@ class MetricsSetups private constructor() { timeout, compressionMode = compressionMode, ) + return configureNativeOtlpForLambda(client, logError, onSendUnary) + } + + + /** + * Not using Lightstep? Use this instead to send your metrics to an ingest endpoint that accepts Otlp metrics. + */ + fun CoroutineScope.rawNativeOtlp( + accessToken: String, + authHeaderName: String, + prescientDimensions: PrescientDimensions, + aggregationWidth: Duration, + logError: (message: String, exception: Exception) -> Unit, + onIngestTrailers: (Status, Metadata) -> Unit = { status, trailers -> + println("got trailers from ingest. Status: $status, Trailers: $trailers") + }, + ingestUrl: String, + ingestPort: Int = 443, + connectionSecurityMode: SecurityMode = SecurityMode.Tls, + timeout: Duration = 5.seconds, + unaryBatchSizeMaxMetricsCount: Int = 1000, + unaryBatchMaxAge: Duration = 10.seconds, + preaggregatedBatchMaxMetricsCount: Int = 1000, + preaggregatedBatchMaxAge: Duration = 10.seconds, + onSendUnary: (List) -> Unit = {}, + onSendPreaggregated: (List) -> Unit = {}, + /** + * This is verbose but can be helpful when debugging ingest data format issues. + * It shows you exactly what protocol buffers structure is sent. + * Log with caution. + */ + logRawPayload: (ResourceMetrics) -> Unit = {}, + compressionMode: CompressionMode = CompressionMode.Gzip, + ): ConfiguredMetrics { + val client = opentelemetryClient( + accessToken, + authHeaderName, + ingestUrl, + ingestPort, + prescientDimensions, + connectionSecurityMode, + onIngestTrailers, + timeout, + logRawPayload, + compressionMode, + ) + return configureMetricsForOtlpClient( + client, + unaryBatchSizeMaxMetricsCount, + unaryBatchMaxAge, + logError, + aggregationWidth, + preaggregatedBatchMaxMetricsCount, + preaggregatedBatchMaxAge, + onSendUnary, + onSendPreaggregated + ) + } + + /** + * The simplest configuration of metrics - it sends what you record, when you finish + * recording it. + * + * Calling `metricsFactory.record { metrics -> [...] }` will see goodmetrics invoke + * your ingest API _synchronously_ within the `}` scope end. If you are using + * this in Lambda to record an execution, it will report before the execution completes. + * + * If you want preaggregated metrics in lambda or multiple recorded workflows per lambda + * execution you might need to do some work - but probably you just wish you could emit + * 1 row with a bunch of measurements per execution and this does that. + */ + fun rawNativeOtlpButItSendsMetricsUponRecordingForLambda( + accessToken: String, + authHeaderName: String, + prescientDimensions: PrescientDimensions, + logError: (message: String, exception: Exception) -> Unit, + onIngestTrailers: (Status, Metadata) -> Unit = { status, trailers -> + println("got trailers from ingest. Status: $status, Trailers: $trailers") + }, + ingestUrl: String, + ingestPort: Int = 443, + connectionSecurityMode: SecurityMode = SecurityMode.Tls, + timeout: Duration = 5.seconds, + onSendUnary: (List) -> Unit = {}, + compressionMode: CompressionMode = CompressionMode.None, + ): MetricsFactory { + val client = opentelemetryClient( + accessToken, + authHeaderName, + ingestUrl, + ingestPort, + prescientDimensions, + connectionSecurityMode, + onIngestTrailers, + timeout, + compressionMode = compressionMode, + ) + return configureNativeOtlpForLambda(client, logError, onSendUnary) + } + + + private fun CoroutineScope.configureMetricsForOtlpClient( + client: OpentelemetryClient, + unaryBatchSizeMaxMetricsCount: Int, + unaryBatchMaxAge: Duration, + logError: (message: String, exception: Exception) -> Unit, + aggregationWidth: Duration, + preaggregatedBatchMaxMetricsCount: Int, + preaggregatedBatchMaxAge: Duration, + onSendUnary: (List) -> Unit, + onSendPreaggregated: (List) -> Unit + ): ConfiguredMetrics { + val unarySink = configureBatchedUnaryOtlpSink(unaryBatchSizeMaxMetricsCount, unaryBatchMaxAge, client, logError, onSendUnary) + val preaggregatedSink = configureBatchedPreaggregatedOtlpSink(aggregationWidth, preaggregatedBatchMaxMetricsCount, preaggregatedBatchMaxAge, client, logError, onSendPreaggregated) + + val unaryFactory = MetricsFactory( + sink = unarySink, + timeSource = NanoTimeSource.preciseNanoTime, + totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds, + ) + val preaggregatedFactory = MetricsFactory( + sink = preaggregatedSink, + timeSource = NanoTimeSource.fastNanoTime, + totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds, + ) + return ConfiguredMetrics( + unaryMetricsFactory = unaryFactory, + preaggregatedMetricsFactory = preaggregatedFactory, + ) + } + private fun configureNativeOtlpForLambda( + client: OpentelemetryClient, + logError: (message: String, exception: Exception) -> Unit, + onSendUnary: (List) -> Unit, + ): MetricsFactory { val unarySink = object : MetricsSink { override fun emit(metrics: Metrics) { runBlocking { @@ -185,7 +317,8 @@ class MetricsSetups private constructor() { ) } - private fun CoroutineScope.configureBatchedUnaryLightstepSink( + + private fun CoroutineScope.configureBatchedUnaryOtlpSink( batchSize: Int, batchMaxAge: Duration, client: OpentelemetryClient, @@ -214,7 +347,7 @@ class MetricsSetups private constructor() { return unarySink } - private fun CoroutineScope.configureBatchedPreaggregatedLightstepSink( + private fun CoroutineScope.configureBatchedPreaggregatedOtlpSink( aggregationWidth: Duration, batchSize: Int, batchMaxAge: Duration, @@ -245,30 +378,31 @@ class MetricsSetups private constructor() { } private fun opentelemetryClient( - lightstepAccessToken: String, - lightstepUrl: String, - lightstepPort: Int, + authToken: String, + authHeaderName: String, + ingestUrl: String, + ingestPort: Int, prescientDimensions: PrescientDimensions, - lightstepConnectionSecurityMode: SecurityMode, - onLightstepTrailers: (Status, Metadata) -> Unit, + securityMode: SecurityMode, + onIngestTrailers: (Status, Metadata) -> Unit, timeout: Duration, logRawPayload: (ResourceMetrics) -> Unit = { }, compressionMode: CompressionMode, ): OpentelemetryClient { val authHeader = Metadata() authHeader.put( - Metadata.Key.of("lightstep-access-token", Metadata.ASCII_STRING_MARSHALLER), - lightstepAccessToken + Metadata.Key.of(authHeaderName, Metadata.ASCII_STRING_MARSHALLER), + authToken ) return OpentelemetryClient.connect( - sillyOtlpHostname = lightstepUrl, - port = lightstepPort, + sillyOtlpHostname = ingestUrl, + port = ingestPort, prescientDimensions = prescientDimensions, - securityMode = lightstepConnectionSecurityMode, + securityMode = securityMode, interceptors = listOf( MetadataUtils.newAttachHeadersInterceptor(authHeader), - GrpcTrailerLoggerInterceptor(onLightstepTrailers), + GrpcTrailerLoggerInterceptor(onIngestTrailers), ), timeout = timeout, logRawPayload = logRawPayload,