Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add compression settings for otlp #28

Merged
merged 1 commit into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion kotlin/goodmetrics/src/main/kotlin/goodmetrics/MetricsSetups.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package goodmetrics

import goodmetrics.downstream.CompressionMode
import goodmetrics.downstream.GoodmetricsClient
import goodmetrics.downstream.GrpcTrailerLoggerInterceptor
import goodmetrics.downstream.OpentelemetryClient
Expand Down Expand Up @@ -85,6 +86,12 @@ class MetricsSetups private constructor() {
* Log with caution.
*/
logRawPayload: (ResourceMetrics) -> Unit = {},
/**
* Lightstep claims:
* > there is no compression on this traffic; I'm not sure what SDK you are using or if hand rolled but if you turn on compression this will go away.
* So I'll default the special lightstep configuration to use gzip compression. You can disable this if you want.
*/
compressionMode: CompressionMode = CompressionMode.Gzip,
): ConfiguredMetrics {
val client = opentelemetryClient(
lightstepAccessToken,
Expand All @@ -95,6 +102,7 @@ class MetricsSetups private constructor() {
onLightstepTrailers,
timeout,
logRawPayload,
compressionMode,
)

val unarySink = configureBatchedUnaryLightstepSink(unaryBatchSizeMaxMetricsCount, unaryBatchMaxAge, client, logError, onSendUnary)
Expand Down Expand Up @@ -140,6 +148,7 @@ class MetricsSetups private constructor() {
lightstepConnectionSecurityMode: SecurityMode = SecurityMode.Tls,
timeout: Duration = 5.seconds,
onSendUnary: (List<Metrics>) -> Unit = {},
compressionMode: CompressionMode = CompressionMode.None,
): MetricsFactory {
val client = opentelemetryClient(
lightstepAccessToken,
Expand All @@ -148,7 +157,8 @@ class MetricsSetups private constructor() {
prescientDimensions,
lightstepConnectionSecurityMode,
onLightstepTrailers,
timeout
timeout,
compressionMode = compressionMode,
)

val unarySink = MetricsSink { metrics ->
Expand Down Expand Up @@ -237,6 +247,7 @@ class MetricsSetups private constructor() {
onLightstepTrailers: (Status, Metadata) -> Unit,
timeout: Duration,
logRawPayload: (ResourceMetrics) -> Unit = { },
compressionMode: CompressionMode,
): OpentelemetryClient {
val authHeader = Metadata()
authHeader.put(
Expand All @@ -255,6 +266,7 @@ class MetricsSetups private constructor() {
),
timeout = timeout,
logRawPayload = logRawPayload,
compressionMode = compressionMode,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import goodmetrics.pipeline.bucket
import goodmetrics.pipeline.bucketBelow
import io.grpc.CallOptions
import io.grpc.ClientInterceptor
import io.grpc.Deadline
import io.grpc.ManagedChannel
import io.grpc.netty.GrpcSslContexts
import io.grpc.netty.NettyChannelBuilder
Expand Down Expand Up @@ -56,6 +55,12 @@ enum class SecurityMode {
Tls,
}

sealed interface CompressionMode {
object None : CompressionMode
object Gzip : CompressionMode
data class IKnowWhatIWant(val explicitMode: String) : CompressionMode
}

/**
* This client should be used as a last resort, in defeat, if you
* cannot use the goodmetrics protocol. Opentelemetry is highly
Expand All @@ -67,7 +72,8 @@ class OpentelemetryClient(
private val channel: ManagedChannel,
private val prescientDimensions: PrescientDimensions,
private val timeout: Duration,
private val logRawPayload: (ResourceMetrics) -> Unit = { }
private val logRawPayload: (ResourceMetrics) -> Unit = { },
private val compressionMode: CompressionMode,
) {
companion object {
fun connect(
Expand All @@ -81,6 +87,7 @@ class OpentelemetryClient(
interceptors: List<ClientInterceptor>,
timeout: Duration = 5.seconds,
logRawPayload: (ResourceMetrics) -> Unit = { },
compressionMode: CompressionMode = CompressionMode.None,
): OpentelemetryClient {
val channelBuilder = NettyChannelBuilder.forAddress(sillyOtlpHostname, port)
when (securityMode) {
Expand All @@ -100,14 +107,20 @@ class OpentelemetryClient(
}
}
channelBuilder.intercept(interceptors)
return OpentelemetryClient(channelBuilder.build(), prescientDimensions, timeout, logRawPayload)
return OpentelemetryClient(channelBuilder.build(), prescientDimensions, timeout, logRawPayload, compressionMode)
}
}
private fun stub(): MetricsServiceGrpcKt.MetricsServiceCoroutineStub = MetricsServiceGrpcKt.MetricsServiceCoroutineStub(
channel,
CallOptions.DEFAULT
.withDeadline(Deadline.after(timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS))
)
private fun stub(): MetricsServiceGrpcKt.MetricsServiceCoroutineStub {
val defaultCallOptions = CallOptions.DEFAULT
.withDeadlineAfter(timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS)
val callOptions = when (compressionMode) {
CompressionMode.None -> defaultCallOptions
CompressionMode.Gzip -> defaultCallOptions.withCompression("gzip")
is CompressionMode.IKnowWhatIWant -> defaultCallOptions.withCompression(compressionMode.explicitMode)
}

return MetricsServiceGrpcKt.MetricsServiceCoroutineStub(channel, callOptions)
}

suspend fun sendMetricsBatch(batch: List<Metrics>) {
val resourceMetricsBatch = asResourceMetrics(batch)
Expand Down