From 776811bdb882b7ed7e6b388f6433a5c1cf4c4938 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Wed, 18 Sep 2024 11:18:17 +0200 Subject: [PATCH] Defer ExchangeFilterFunction execution in WebClient Prior to this commit, the `DefaultWebClient` would execute the configured `ExchangeFilterFunction` as the reactive pipeline is assembled during subscription. This means that if imperative code is executed in a filter function, it won't be aware of the current observation through the local scope. For example, when automatic context propagation is enabled for Reactor operators, the logger MDC will not know about the current traceId/spanId. This commit ensures that client filter functions execution is deferred during the actual client exchange. Fixes gh-33559 --- spring-webflux/spring-webflux.gradle | 1 + .../function/client/DefaultWebClient.java | 5 +++-- .../client/WebClientObservationTests.java | 21 +++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index e0f98bcdc8fa..d0f4bda0c3b1 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -37,6 +37,7 @@ dependencies { testImplementation(testFixtures(project(":spring-web"))) testImplementation("com.fasterxml:aalto-xml") testImplementation("com.squareup.okhttp3:mockwebserver") + testImplementation("io.micrometer:context-propagation") testImplementation("io.micrometer:micrometer-observation-test") testImplementation("io.projectreactor:reactor-test") testImplementation("io.reactivex.rxjava3:rxjava") diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index 296bacb41d9c..5076d7c35198 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -453,8 +453,9 @@ public Mono exchange() { ClientRequest request = requestBuilder.build(); observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null)); observationContext.setRequest(request); - Mono responseMono = filterFunction.apply(exchangeFunction) - .exchange(request) + final ExchangeFilterFunction finalFilterFunction = filterFunction; + Mono responseMono = Mono.defer( + () -> finalFilterFunction.apply(exchangeFunction).exchange(request)) .checkpoint("Request to " + WebClientUtils.getRequestDescription(request.method(), request.url()) + " [DefaultWebClient]") diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java index 73a35047e14d..ad662822f53f 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java @@ -27,10 +27,12 @@ import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.micrometer.observation.tck.TestObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistryAssert; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -63,6 +65,7 @@ class WebClientObservationTests { @BeforeEach void setup() { + Hooks.enableAutomaticContextPropagation(); ClientResponse mockResponse = mock(); when(mockResponse.statusCode()).thenReturn(HttpStatus.OK); when(mockResponse.headers()).thenReturn(new MockClientHeaders()); @@ -74,6 +77,11 @@ void setup() { this.observationRegistry.observationConfig().observationHandler(new HeaderInjectingHandler()); } + @AfterEach + void cleanUp() { + Hooks.disableAutomaticContextPropagation(); + } + @Test void recordsObservationForSuccessfulExchange() { this.builder.build().get().uri("/resource/{id}", 42) @@ -148,6 +156,19 @@ void setsCurrentObservationInReactorContext() { verifyAndGetRequest(); } + @Test + void setsCurrentObservationInScope() { + ExchangeFilterFunction assertionFilter = (request, chain) -> { + Observation currentObservation = observationRegistry.getCurrentObservation(); + assertThat(currentObservation).isNotNull(); + assertThat(currentObservation.getContext()).isInstanceOf(ClientRequestObservationContext.class); + return chain.exchange(request); + }; + this.builder.filter(assertionFilter).build().get().uri("/resource/{id}", 42) + .retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(5)); + verifyAndGetRequest(); + } + @Test void recordsObservationWithResponseDetailsWhenFilterFunctionErrors() { ExchangeFilterFunction errorFunction = (req, next) -> next.exchange(req).then(Mono.error(new IllegalStateException()));