Skip to content

Commit

Permalink
feat: add ability to send metrics to generic OTLP ingest point. DRY u…
Browse files Browse the repository at this point in the history
…p code to avoid duplication since a lot of it is shared.
  • Loading branch information
tylerburdsall committed Mar 8, 2024
1 parent 9f2650f commit ee15b0d
Showing 1 changed file with 166 additions and 32 deletions.
198 changes: 166 additions & 32 deletions kotlin/goodmetrics/src/main/kotlin/goodmetrics/MetricsSetups.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package goodmetrics

import goodmetrics.MetricsSetups.Companion.configureBatchedPreaggregatedOtlpSink
import goodmetrics.MetricsSetups.Companion.configureBatchedUnaryOtlpSink
import goodmetrics.downstream.CompressionMode
import goodmetrics.downstream.GoodmetricsClient
import goodmetrics.downstream.GrpcTrailerLoggerInterceptor
Expand Down Expand Up @@ -54,10 +56,10 @@ class MetricsSetups private constructor() {

/**
* preaggregated metrics in lightstep appear as Distributions for `Metrics::distribution`s
* and as Delta temporality Sums for `Metrics::measure`ments.
* and as Delta temporality Sums for `Metrics::measurements`.
*
* raw, unary metrics in lightstep also appear as Distribution for `Metrics::distribution`s
* however `Metrics::measure`ments appear as Delta temporality Gauge so you can look at those
* however `Metrics::measurements` appear as Delta temporality Gauge so you can look at those
* values with more flexibility in a way that makes more sense for raw emissions. It's a sharper
* edged tool here, and there might be a better representation - to which goodmetrics will change
* upon discovery.
Expand Down Expand Up @@ -95,6 +97,7 @@ class MetricsSetups private constructor() {
): ConfiguredMetrics {
val client = opentelemetryClient(
lightstepAccessToken,
"lightstep-access-token",
lightstepUrl,
lightstepPort,
prescientDimensions,
Expand All @@ -104,23 +107,16 @@ class MetricsSetups private constructor() {
logRawPayload,
compressionMode,
)

val unarySink = configureBatchedUnaryLightstepSink(unaryBatchSizeMaxMetricsCount, unaryBatchMaxAge, client, logError, onSendUnary)
val preaggregatedSink = configureBatchedPreaggregatedLightstepSink(aggregationWidth, preaggregatedBatchMaxMetricsCount, preaggregatedBatchMaxAge, client, logError, onSendPreaggregated)

val unaryFactory = MetricsFactory(
sink = unarySink,
timeSource = NanoTimeSource.preciseNanoTime,
totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds,
)
val preaggregatedFactory = MetricsFactory(
sink = preaggregatedSink,
timeSource = NanoTimeSource.fastNanoTime,
totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds,
)
return ConfiguredMetrics(
unaryMetricsFactory = unaryFactory,
preaggregatedMetricsFactory = preaggregatedFactory,
return configureMetricsForOtlpClient(
client,
unaryBatchSizeMaxMetricsCount,
unaryBatchMaxAge,
logError,
aggregationWidth,
preaggregatedBatchMaxMetricsCount,
preaggregatedBatchMaxAge,
onSendUnary,
onSendPreaggregated
)
}

Expand Down Expand Up @@ -152,6 +148,7 @@ class MetricsSetups private constructor() {
): MetricsFactory {
val client = opentelemetryClient(
lightstepAccessToken,
"lightstep-access-token",
lightstepUrl,
lightstepPort,
prescientDimensions,
Expand All @@ -160,7 +157,142 @@ class MetricsSetups private constructor() {
timeout,
compressionMode = compressionMode,
)
return configureNativeOtlpForLambda(client, logError, onSendUnary)
}


/**
* Not using Lightstep? Use this instead to send your metrics to an ingest endpoint that accepts Otlp metrics.
*/
fun CoroutineScope.rawNativeOtlp(
accessToken: String,
authHeaderName: String,
prescientDimensions: PrescientDimensions,
aggregationWidth: Duration,
logError: (message: String, exception: Exception) -> Unit,
onIngestTrailers: (Status, Metadata) -> Unit = { status, trailers ->
println("got trailers from ingest. Status: $status, Trailers: $trailers")
},
ingestUrl: String,
ingestPort: Int = 443,
connectionSecurityMode: SecurityMode = SecurityMode.Tls,
timeout: Duration = 5.seconds,
unaryBatchSizeMaxMetricsCount: Int = 1000,
unaryBatchMaxAge: Duration = 10.seconds,
preaggregatedBatchMaxMetricsCount: Int = 1000,
preaggregatedBatchMaxAge: Duration = 10.seconds,
onSendUnary: (List<Metrics>) -> Unit = {},
onSendPreaggregated: (List<AggregatedBatch>) -> Unit = {},
/**
* This is verbose but can be helpful when debugging ingest data format issues.
* It shows you exactly what protocol buffers structure is sent.
* Log with caution.
*/
logRawPayload: (ResourceMetrics) -> Unit = {},
compressionMode: CompressionMode = CompressionMode.Gzip,
): ConfiguredMetrics {
val client = opentelemetryClient(
accessToken,
authHeaderName,
ingestUrl,
ingestPort,
prescientDimensions,
connectionSecurityMode,
onIngestTrailers,
timeout,
logRawPayload,
compressionMode,
)
return configureMetricsForOtlpClient(
client,
unaryBatchSizeMaxMetricsCount,
unaryBatchMaxAge,
logError,
aggregationWidth,
preaggregatedBatchMaxMetricsCount,
preaggregatedBatchMaxAge,
onSendUnary,
onSendPreaggregated
)
}

/**
* The simplest configuration of metrics - it sends what you record, when you finish
* recording it.
*
* Calling `metricsFactory.record { metrics -> [...] }` will see goodmetrics invoke
* your ingest API _synchronously_ within the `}` scope end. If you are using
* this in Lambda to record an execution, it will report before the execution completes.
*
* If you want preaggregated metrics in lambda or multiple recorded workflows per lambda
* execution you might need to do some work - but probably you just wish you could emit
* 1 row with a bunch of measurements per execution and this does that.
*/
fun rawNativeOtlpButItSendsMetricsUponRecordingForLambda(
accessToken: String,
authHeaderName: String,
prescientDimensions: PrescientDimensions,
logError: (message: String, exception: Exception) -> Unit,
onIngestTrailers: (Status, Metadata) -> Unit = { status, trailers ->
println("got trailers from ingest. Status: $status, Trailers: $trailers")
},
ingestUrl: String,
ingestPort: Int = 443,
connectionSecurityMode: SecurityMode = SecurityMode.Tls,
timeout: Duration = 5.seconds,
onSendUnary: (List<Metrics>) -> Unit = {},
compressionMode: CompressionMode = CompressionMode.None,
): MetricsFactory {
val client = opentelemetryClient(
accessToken,
authHeaderName,
ingestUrl,
ingestPort,
prescientDimensions,
connectionSecurityMode,
onIngestTrailers,
timeout,
compressionMode = compressionMode,
)
return configureNativeOtlpForLambda(client, logError, onSendUnary)
}


private fun CoroutineScope.configureMetricsForOtlpClient(
client: OpentelemetryClient,
unaryBatchSizeMaxMetricsCount: Int,
unaryBatchMaxAge: Duration,
logError: (message: String, exception: Exception) -> Unit,
aggregationWidth: Duration,
preaggregatedBatchMaxMetricsCount: Int,
preaggregatedBatchMaxAge: Duration,
onSendUnary: (List<Metrics>) -> Unit,
onSendPreaggregated: (List<AggregatedBatch>) -> Unit
): ConfiguredMetrics {
val unarySink = configureBatchedUnaryOtlpSink(unaryBatchSizeMaxMetricsCount, unaryBatchMaxAge, client, logError, onSendUnary)
val preaggregatedSink = configureBatchedPreaggregatedOtlpSink(aggregationWidth, preaggregatedBatchMaxMetricsCount, preaggregatedBatchMaxAge, client, logError, onSendPreaggregated)

val unaryFactory = MetricsFactory(
sink = unarySink,
timeSource = NanoTimeSource.preciseNanoTime,
totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds,
)
val preaggregatedFactory = MetricsFactory(
sink = preaggregatedSink,
timeSource = NanoTimeSource.fastNanoTime,
totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds,
)
return ConfiguredMetrics(
unaryMetricsFactory = unaryFactory,
preaggregatedMetricsFactory = preaggregatedFactory,
)
}

private fun configureNativeOtlpForLambda(
client: OpentelemetryClient,
logError: (message: String, exception: Exception) -> Unit,
onSendUnary: (List<Metrics>) -> Unit,
): MetricsFactory {
val unarySink = object : MetricsSink {
override fun emit(metrics: Metrics) {
runBlocking {
Expand All @@ -185,7 +317,8 @@ class MetricsSetups private constructor() {
)
}

private fun CoroutineScope.configureBatchedUnaryLightstepSink(

private fun CoroutineScope.configureBatchedUnaryOtlpSink(
batchSize: Int,
batchMaxAge: Duration,
client: OpentelemetryClient,
Expand Down Expand Up @@ -214,7 +347,7 @@ class MetricsSetups private constructor() {
return unarySink
}

private fun CoroutineScope.configureBatchedPreaggregatedLightstepSink(
private fun CoroutineScope.configureBatchedPreaggregatedOtlpSink(
aggregationWidth: Duration,
batchSize: Int,
batchMaxAge: Duration,
Expand Down Expand Up @@ -245,30 +378,31 @@ class MetricsSetups private constructor() {
}

private fun opentelemetryClient(
lightstepAccessToken: String,
lightstepUrl: String,
lightstepPort: Int,
authToken: String,
authHeaderName: String,
ingestUrl: String,
ingestPort: Int,
prescientDimensions: PrescientDimensions,
lightstepConnectionSecurityMode: SecurityMode,
onLightstepTrailers: (Status, Metadata) -> Unit,
securityMode: SecurityMode,
onIngestTrailers: (Status, Metadata) -> Unit,
timeout: Duration,
logRawPayload: (ResourceMetrics) -> Unit = { },
compressionMode: CompressionMode,
): OpentelemetryClient {
val authHeader = Metadata()
authHeader.put(
Metadata.Key.of("lightstep-access-token", Metadata.ASCII_STRING_MARSHALLER),
lightstepAccessToken
Metadata.Key.of(authHeaderName, Metadata.ASCII_STRING_MARSHALLER),
authToken
)

return OpentelemetryClient.connect(
sillyOtlpHostname = lightstepUrl,
port = lightstepPort,
sillyOtlpHostname = ingestUrl,
port = ingestPort,
prescientDimensions = prescientDimensions,
securityMode = lightstepConnectionSecurityMode,
securityMode = securityMode,
interceptors = listOf(
MetadataUtils.newAttachHeadersInterceptor(authHeader),
GrpcTrailerLoggerInterceptor(onLightstepTrailers),
GrpcTrailerLoggerInterceptor(onIngestTrailers),
),
timeout = timeout,
logRawPayload = logRawPayload,
Expand Down

0 comments on commit ee15b0d

Please sign in to comment.