diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 89bc5c453..4baa30cd1 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,7 +12,7 @@ okio-version = "3.6.0" otel-version = "1.32.0" slf4j-version = "2.0.9" slf4j-v1x-version = "1.7.36" -crt-kotlin-version = "0.8.2" +crt-kotlin-version = "0.8.3" # codegen smithy-version = "1.42.0" diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index ea8c8773d..7424a4293 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -9,6 +9,7 @@ import aws.smithy.kotlin.runtime.http.HttpCall import aws.smithy.kotlin.runtime.http.config.EngineFactory import aws.smithy.kotlin.runtime.http.engine.* import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics +import aws.smithy.kotlin.runtime.http.engine.internal.ThreadState import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.net.TlsVersion import aws.smithy.kotlin.runtime.operation.ExecutionContext @@ -16,6 +17,7 @@ import aws.smithy.kotlin.runtime.time.Instant import aws.smithy.kotlin.runtime.time.fromEpochMilliseconds import kotlinx.coroutines.job import okhttp3.* +import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import kotlin.time.toJavaDuration import aws.smithy.kotlin.runtime.net.TlsVersion as SdkTlsVersion @@ -40,8 +42,13 @@ public class OkHttpEngine( override val engineConstructor: (OkHttpEngineConfig.Builder.() -> Unit) -> OkHttpEngine = ::invoke } - private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider) - private val client = config.buildClient(metrics) + private val dispatcher = config.buildDispatcher() + private val metrics = HttpClientMetrics( + scope = TELEMETRY_SCOPE, + provider = config.telemetryProvider, + threadStateCallback = dispatcher::getThreadState, + ) + private val client = config.buildClient(dispatcher, metrics) override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { val callContext = callContext() @@ -68,10 +75,21 @@ public class OkHttpEngine( } } +private fun Dispatcher.getThreadState(): ThreadState? = (executorService as? ThreadPoolExecutor)?.let { executor -> + val active = executor.activeCount.toLong() + val idle = executor.poolSize.toLong() - active + ThreadState(idle, active) +} + +private fun OkHttpEngineConfig.buildDispatcher() = Dispatcher().apply { + maxRequests = maxConcurrency.toInt() + maxRequestsPerHost = maxConcurrencyPerHost.toInt() +} + /** * Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client */ -private fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient { +private fun OkHttpEngineConfig.buildClient(dispatcher: Dispatcher, metrics: HttpClientMetrics): OkHttpClient { val config = this return OkHttpClient.Builder().apply { @@ -100,10 +118,6 @@ private fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCl ) connectionPool(pool) - val dispatcher = Dispatcher().apply { - maxRequests = config.maxConcurrency.toInt() - maxRequestsPerHost = config.maxConcurrencyPerHost.toInt() - } dispatcher(dispatcher) // Log events coming from okhttp. Allocate a new listener per-call to facilitate dedicated trace spans. diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt index a193a81dc..c8c0c5cea 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt @@ -25,8 +25,13 @@ public object HttpClientMetricAttributes { public val AcquiredConnection: Attributes = attributesOf { "state" to "acquired" } public val QueuedRequest: Attributes = attributesOf { "state" to "queued" } public val InFlightRequest: Attributes = attributesOf { "state" to "in-flight" } + public val IdleThread: Attributes = attributesOf { "state" to "idle" } + public val ActiveThread: Attributes = attributesOf { "state" to "active" } } +@InternalApi +public data class ThreadState(val idle: Long, val active: Long) + /** * Container for common HTTP engine related metrics. Engine implementations can re-use this and update * the various fields in whatever manner fits best (increment/decrement vs current absolute value). @@ -39,6 +44,7 @@ public object HttpClientMetricAttributes { public class HttpClientMetrics( scope: String, public val provider: TelemetryProvider, + threadStateCallback: (() -> ThreadState?)? = null, ) : Closeable { private val meter = provider.meterProvider.getOrCreateMeter(scope) @@ -76,34 +82,6 @@ public class HttpClientMetrics( "The amount of time a connection has been open", ) - private val connectionLimitHandle = meter.createAsyncUpDownCounter( - "smithy.client.http.connections.limit", - { it.record(_connectionsLimit.value) }, - "{connection}", - "Max connections configured for the HTTP client", - ) - - private val connectionUsageHandle = meter.createAsyncUpDownCounter( - "smithy.client.http.connections.usage", - ::recordConnectionState, - "{connection}", - "Current state of connections (idle, acquired)", - ) - - private val requestsConcurrencyLimitHandle = meter.createAsyncUpDownCounter( - "smithy.client.http.requests.limit", - { it.record(_requestConcurrencyLimit.value) }, - "{request}", - "Max concurrent requests configured for the HTTP client", - ) - - private val requestsHandle = meter.createAsyncUpDownCounter( - "smithy.client.http.requests.usage", - ::recordRequestsState, - "{request}", - "The current state of HTTP client request concurrency (queued, in-flight)", - ) - public val bytesSent: MonotonicCounter = meter.createMonotonicCounter( "smithy.client.http.bytes_sent", "By", @@ -122,6 +100,41 @@ public class HttpClientMetrics( "The amount of time after a request has been sent spent waiting on a response from the remote server", ) + private val asyncHandles = listOfNotNull( + meter.createAsyncUpDownCounter( + "smithy.client.http.connections.limit", + { it.record(_connectionsLimit.value) }, + "{connection}", + "Max connections configured for the HTTP client", + ), + meter.createAsyncUpDownCounter( + "smithy.client.http.connections.usage", + ::recordConnectionState, + "{connection}", + "Current state of connections (idle, acquired)", + ), + meter.createAsyncUpDownCounter( + "smithy.client.http.requests.limit", + { it.record(_requestConcurrencyLimit.value) }, + "{request}", + "Max concurrent requests configured for the HTTP client", + ), + meter.createAsyncUpDownCounter( + "smithy.client.http.requests.usage", + ::recordRequestsState, + "{request}", + "The current state of HTTP client request concurrency (queued, in-flight)", + ), + threadStateCallback?.let { callback -> + meter.createAsyncUpDownCounter( + "smithy.client.http.threads.count", + { measurement -> recordThreadState(measurement, callback) }, + "{thread}", + "Current state of HTTP engine threads (idle, active)", + ) + }, + ) + /** * The maximum number of connections configured for the client */ @@ -186,13 +199,17 @@ public class HttpClientMetrics( measurement.record(acquiredConnections, HttpClientMetricAttributes.AcquiredConnection) } + private fun recordThreadState(measurement: LongAsyncMeasurement, callback: () -> ThreadState?) { + callback()?.let { threadState -> + measurement.record(threadState.idle, HttpClientMetricAttributes.IdleThread) + measurement.record(threadState.active, HttpClientMetricAttributes.ActiveThread) + } + } + override fun close() { - val exceptions = listOf( - runCatching(connectionLimitHandle::stop), - runCatching(connectionUsageHandle::stop), - runCatching(requestsHandle::stop), - runCatching(requestsConcurrencyLimitHandle::stop), - ).mapNotNull(Result<*>::exceptionOrNull) + val exceptions = asyncHandles + .map { handle -> runCatching { handle.stop() } } + .mapNotNull(Result<*>::exceptionOrNull) exceptions.firstOrNull()?.let { first -> exceptions.drop(1).forEach(first::addSuppressed)