Skip to content

Commit

Permalink
add compression settings for otlp
Browse files Browse the repository at this point in the history
Lightstep supports gzip compression. This updates the default lightstep
configuration mode to use gzip and adds configurability for it.
  • Loading branch information
kvcache authored and kvc0 committed Sep 16, 2022
1 parent 63f7022 commit b2e3976
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
14 changes: 13 additions & 1 deletion kotlin/goodmetrics/src/main/kotlin/goodmetrics/MetricsSetups.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package goodmetrics

import goodmetrics.downstream.CompressionMode
import goodmetrics.downstream.GoodmetricsClient
import goodmetrics.downstream.GrpcTrailerLoggerInterceptor
import goodmetrics.downstream.OpentelemetryClient
Expand Down Expand Up @@ -85,6 +86,12 @@ class MetricsSetups private constructor() {
* Log with caution.
*/
logRawPayload: (ResourceMetrics) -> Unit = {},
/**
* Lightstep claims:
* > there is no compression on this traffic; I'm not sure what SDK you are using or if hand rolled but if you turn on compression this will go away.
* So I'll default the special lightstep configuration to use gzip compression. You can disable this if you want.
*/
compressionMode: CompressionMode = CompressionMode.Gzip,
): ConfiguredMetrics {
val client = opentelemetryClient(
lightstepAccessToken,
Expand All @@ -95,6 +102,7 @@ class MetricsSetups private constructor() {
onLightstepTrailers,
timeout,
logRawPayload,
compressionMode,
)

val unarySink = configureBatchedUnaryLightstepSink(unaryBatchSizeMaxMetricsCount, unaryBatchMaxAge, client, logError, onSendUnary)
Expand Down Expand Up @@ -140,6 +148,7 @@ class MetricsSetups private constructor() {
lightstepConnectionSecurityMode: SecurityMode = SecurityMode.Tls,
timeout: Duration = 5.seconds,
onSendUnary: (List<Metrics>) -> Unit = {},
compressionMode: CompressionMode = CompressionMode.None,
): MetricsFactory {
val client = opentelemetryClient(
lightstepAccessToken,
Expand All @@ -148,7 +157,8 @@ class MetricsSetups private constructor() {
prescientDimensions,
lightstepConnectionSecurityMode,
onLightstepTrailers,
timeout
timeout,
compressionMode = compressionMode,
)

val unarySink = MetricsSink { metrics ->
Expand Down Expand Up @@ -237,6 +247,7 @@ class MetricsSetups private constructor() {
onLightstepTrailers: (Status, Metadata) -> Unit,
timeout: Duration,
logRawPayload: (ResourceMetrics) -> Unit = { },
compressionMode: CompressionMode,
): OpentelemetryClient {
val authHeader = Metadata()
authHeader.put(
Expand All @@ -255,6 +266,7 @@ class MetricsSetups private constructor() {
),
timeout = timeout,
logRawPayload = logRawPayload,
compressionMode = compressionMode,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import goodmetrics.pipeline.bucket
import goodmetrics.pipeline.bucketBelow
import io.grpc.CallOptions
import io.grpc.ClientInterceptor
import io.grpc.Deadline
import io.grpc.ManagedChannel
import io.grpc.netty.GrpcSslContexts
import io.grpc.netty.NettyChannelBuilder
Expand Down Expand Up @@ -56,6 +55,12 @@ enum class SecurityMode {
Tls,
}

sealed interface CompressionMode {
object None : CompressionMode
object Gzip : CompressionMode
data class IKnowWhatIWant(val explicitMode: String) : CompressionMode
}

/**
* This client should be used as a last resort, in defeat, if you
* cannot use the goodmetrics protocol. Opentelemetry is highly
Expand All @@ -67,7 +72,8 @@ class OpentelemetryClient(
private val channel: ManagedChannel,
private val prescientDimensions: PrescientDimensions,
private val timeout: Duration,
private val logRawPayload: (ResourceMetrics) -> Unit = { }
private val logRawPayload: (ResourceMetrics) -> Unit = { },
private val compressionMode: CompressionMode,
) {
companion object {
fun connect(
Expand All @@ -81,6 +87,7 @@ class OpentelemetryClient(
interceptors: List<ClientInterceptor>,
timeout: Duration = 5.seconds,
logRawPayload: (ResourceMetrics) -> Unit = { },
compressionMode: CompressionMode = CompressionMode.None,
): OpentelemetryClient {
val channelBuilder = NettyChannelBuilder.forAddress(sillyOtlpHostname, port)
when (securityMode) {
Expand All @@ -100,14 +107,20 @@ class OpentelemetryClient(
}
}
channelBuilder.intercept(interceptors)
return OpentelemetryClient(channelBuilder.build(), prescientDimensions, timeout, logRawPayload)
return OpentelemetryClient(channelBuilder.build(), prescientDimensions, timeout, logRawPayload, compressionMode)
}
}
private fun stub(): MetricsServiceGrpcKt.MetricsServiceCoroutineStub = MetricsServiceGrpcKt.MetricsServiceCoroutineStub(
channel,
CallOptions.DEFAULT
.withDeadline(Deadline.after(timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS))
)
private fun stub(): MetricsServiceGrpcKt.MetricsServiceCoroutineStub {
val defaultCallOptions = CallOptions.DEFAULT
.withDeadlineAfter(timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS)
val callOptions = when (compressionMode) {
CompressionMode.None -> defaultCallOptions
CompressionMode.Gzip -> defaultCallOptions.withCompression("gzip")
is CompressionMode.IKnowWhatIWant -> defaultCallOptions.withCompression(compressionMode.explicitMode)
}

return MetricsServiceGrpcKt.MetricsServiceCoroutineStub(channel, callOptions)
}

suspend fun sendMetricsBatch(batch: List<Metrics>) {
val resourceMetricsBatch = asResourceMetrics(batch)
Expand Down

0 comments on commit b2e3976

Please sign in to comment.