Skip to content

Commit

Permalink
add lightstep MetricsSetup
Browse files Browse the repository at this point in the history
* Lots of configurability, might need more for stuff like retries.
* Adds a timeout to Lightstep requests.
* Fixes distributions for unary metrics.
  There still seems to be a bug with the way lightstep interprets
  the histogram with 1 bucket though. Unclear if there are more
  problems with it.
* switch the lightstep tester around to using the standard setup.
  • Loading branch information
kvcache authored and kvc0 committed Jul 14, 2022
1 parent 89130a8 commit 3391f2d
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 146 deletions.
184 changes: 165 additions & 19 deletions kotlin/lib/src/main/kotlin/goodmetrics/MetricsSetups.kt
Original file line number Diff line number Diff line change
@@ -1,49 +1,195 @@
package goodmetrics

import goodmetrics.downstream.GoodmetricsClient
import goodmetrics.downstream.GrpcTrailerLoggerInterceptor
import goodmetrics.downstream.OpentelemetryClient
import goodmetrics.downstream.PrescientDimensions
import goodmetrics.downstream.SecurityMode
import goodmetrics.pipeline.AggregatedBatch
import goodmetrics.pipeline.Aggregator
import goodmetrics.pipeline.BatchSender.Companion.launchSender
import goodmetrics.pipeline.Batcher
import goodmetrics.pipeline.SynchronizingBuffer
import io.grpc.Metadata
import io.grpc.Status
import io.grpc.stub.MetadataUtils
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

data class ConfiguredMetrics(
val emitterJob: Job,
val metricsFactory: MetricsFactory,
val unaryMetricsFactory: MetricsFactory,
val preaggregatedMetricsFactory: MetricsFactory,
)

class MetricsSetups private constructor() {
companion object {
fun CoroutineScope.rowPerMetric(goodmetricsHost: String = "localhost", port: Int = 9573): ConfiguredMetrics {
val incomingBuffer = SynchronizingBuffer()
val factory = MetricsFactory(incomingBuffer, timeSource = NanoTimeSource.preciseNanoTime, totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds)
fun CoroutineScope.goodMetrics(goodmetricsHost: String = "localhost", port: Int = 9573, aggregationWidth: Duration = 10.seconds): ConfiguredMetrics {
val unaryIncomingBuffer = SynchronizingBuffer()
val unaryFactory = MetricsFactory(unaryIncomingBuffer, timeSource = NanoTimeSource.preciseNanoTime, totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds)

val batched = Batcher(incomingBuffer)
val emitterJob = launchSender(batched, GoodmetricsClient.connect(goodmetricsHost, port)) { batch ->
val unaryBatcher = Batcher(unaryIncomingBuffer)
launchSender(unaryBatcher, GoodmetricsClient.connect(goodmetricsHost, port)) { batch ->
sendMetricsBatch(batch)
}

val preaggregatedIncomingBuffer = Aggregator(aggregationWidth)
val preaggregatedFactory = MetricsFactory(preaggregatedIncomingBuffer, timeSource = NanoTimeSource.fastNanoTime, totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds)

val preaggregatedBatcher = Batcher(preaggregatedIncomingBuffer)
launchSender(preaggregatedBatcher, GoodmetricsClient.connect(goodmetricsHost, port)) { batch ->
sendPreaggregatedMetrics(batch)
}

return ConfiguredMetrics(
emitterJob,
factory,
unaryFactory,
preaggregatedFactory,
)
}

fun CoroutineScope.preaggregated(goodmetricsHost: String = "localhost", port: Int = 9573, aggregationWidth: Duration = 10.seconds): ConfiguredMetrics {
val incomingBuffer = Aggregator(aggregationWidth)
val factory = MetricsFactory(incomingBuffer, timeSource = NanoTimeSource.fastNanoTime, totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds)
/**
* preaggregated metrics in lightstep appear as Distributions for `Metrics::distribution`s
* and as Delta temporality Sums for `Metrics::measure`ments.
*
* raw, unary metrics in lightstep also appear as Distribution for `Metrics::distribution`s
* however `Metrics::measure`ments appear as Delta temporality Gauge so you can look at those
* values with more flexibility in a way that makes more sense for raw emissions. It's a sharper
* edged tool here, and there might be a better representation - to which goodmetrics will change
* upon discovery.
*/
fun CoroutineScope.lightstepNativeOtlp(
lightstepAccessToken: String,
prescientDimensions: PrescientDimensions,
aggregationWidth: Duration,
logError: (message: String, exception: Exception) -> Unit,
onLightstepTrailers: (Status, Metadata) -> Unit = { status, trailers ->
println("got trailers from lightstep. Status: $status, Trailers: $trailers")
},
lightstepUrl: String = "ingest.lightstep.com",
lightstepPort: Int = 443,
lightstepConnectionSecurityMode: SecurityMode = SecurityMode.Tls,
timeout: Duration = 5.seconds,
unaryBatchSizeMaxMetricsCount: Int = 1000,
unaryBatchMaxAge: Duration = 10.seconds,
preaggregatedBatchMaxMetricsCount: Int = 1000,
preaggregatedBatchMaxAge: Duration = 10.seconds,
onSendUnary: (List<Metrics>) -> Unit = {},
onSendPreaggregated: (List<AggregatedBatch>) -> Unit = {},
): ConfiguredMetrics {
val client = opentelemetryClient(
lightstepAccessToken,
lightstepUrl,
lightstepPort,
prescientDimensions,
lightstepConnectionSecurityMode,
onLightstepTrailers,
timeout
)

val batched = Batcher(incomingBuffer)
val emitterJob = launchSender(batched, GoodmetricsClient.connect(goodmetricsHost, port)) { batch ->
sendPreaggregatedMetrics(batch)
}
val unarySink = configureBatchedUnaryLightstepSink(unaryBatchSizeMaxMetricsCount, unaryBatchMaxAge, client, logError, onSendUnary)
val preaggregatedSink = configureBatchedPreaggregatedLightstepSink(aggregationWidth, preaggregatedBatchMaxMetricsCount, preaggregatedBatchMaxAge, client, logError, onSendPreaggregated)

val unaryFactory = MetricsFactory(
sink = unarySink,
timeSource = NanoTimeSource.preciseNanoTime,
totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds,
)
val preaggregatedFactory = MetricsFactory(
sink = preaggregatedSink,
timeSource = NanoTimeSource.fastNanoTime,
totaltimeType = MetricsFactory.TotaltimeType.DistributionMicroseconds,
)
return ConfiguredMetrics(
emitterJob,
factory,
unaryMetricsFactory = unaryFactory,
preaggregatedMetricsFactory = preaggregatedFactory,
)
}

private fun CoroutineScope.configureBatchedUnaryLightstepSink(
batchSize: Int,
batchMaxAge: Duration,
client: OpentelemetryClient,
logError: (message: String, exception: Exception) -> Unit,
onSendUnary: (List<Metrics>) -> Unit,
): SynchronizingBuffer {
val unarySink = SynchronizingBuffer()
val unaryBatcher = Batcher(
unarySink,
batchSize = batchSize,
batchAge = batchMaxAge,
)

launch {
// Launch the sender on a background coroutine.
unaryBatcher.consume()
.collect { metrics ->
onSendUnary(metrics)
try {
client.sendMetricsBatch(metrics)
} catch (e: Exception) {
logError("error sending unary batch", e)
}
}
}
return unarySink
}

private fun CoroutineScope.configureBatchedPreaggregatedLightstepSink(
aggregationWidth: Duration,
batchSize: Int,
batchMaxAge: Duration,
client: OpentelemetryClient,
logError: (message: String, exception: Exception) -> Unit,
onSendPreaggregated: (List<AggregatedBatch>) -> Unit,
): Aggregator {
val sink = Aggregator(aggregationWidth = aggregationWidth)
val batcher = Batcher(
sink,
batchSize = batchSize,
batchAge = batchMaxAge,
)

launch {
// Launch the sender on a background coroutine.
batcher.consume()
.collect { metrics ->
onSendPreaggregated(metrics)
try {
client.sendPreaggregatedBatch(metrics)
} catch (e: Exception) {
logError("error sending preaggregated batch", e)
}
}
}
return sink
}

private fun opentelemetryClient(
lightstepAccessToken: String,
lightstepUrl: String,
lightstepPort: Int,
prescientDimensions: PrescientDimensions,
lightstepConnectionSecurityMode: SecurityMode,
onLightstepTrailers: (Status, Metadata) -> Unit,
timeout: Duration
): OpentelemetryClient {
val authHeader = Metadata()
authHeader.put(
Metadata.Key.of("lightstep-access-token", Metadata.ASCII_STRING_MARSHALLER),
lightstepAccessToken
)

return OpentelemetryClient.connect(
sillyOtlpHostname = lightstepUrl,
port = lightstepPort,
prescientDimensions = prescientDimensions,
securityMode = lightstepConnectionSecurityMode,
interceptors = listOf(
MetadataUtils.newAttachHeadersInterceptor(authHeader),
GrpcTrailerLoggerInterceptor(onLightstepTrailers),
),
timeout = timeout
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import io.grpc.MethodDescriptor
import io.grpc.Status
import io.grpc.Metadata as GrpcMetadata

class GrpcTrailerLoggerInterceptor (
class GrpcTrailerLoggerInterceptor(
private val onTrailers: (Status, GrpcMetadata) -> Unit
) : ClientInterceptor {
override fun <ReqT, RespT> interceptCall(
method: MethodDescriptor<ReqT, RespT>, callOptions: CallOptions, next: Channel
method: MethodDescriptor<ReqT, RespT>,
callOptions: CallOptions,
next: Channel
): ClientCall<ReqT, RespT> {
return TrailersLoggingClientCall(next.newCall(method, callOptions), onTrailers)
}
Expand Down
Loading

0 comments on commit 3391f2d

Please sign in to comment.