Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore ReactorContext into Imperative Processing #9259

Closed
syedyusufh opened this issue Jun 21, 2024 · 6 comments · Fixed by #9284
Closed

Restore ReactorContext into Imperative Processing #9259

syedyusufh opened this issue Jun 21, 2024 · 6 comments · Fixed by #9284

Comments

@syedyusufh
Copy link

ReactorContext is NOT carried onto the following Imperative processing and this results in a new ObservationContext (change in traceId)

SO reference - https://stackoverflow.com/questions/78653090/spring-integration-restore-reactorcontext-into-imperative-processing

Issue happening with Spring Boot v3.3.0

@syedyusufh syedyusufh added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Jun 21, 2024
@artembilan artembilan added in: core and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Jun 21, 2024
@artembilan artembilan added this to the 6.4.0-M1 milestone Jun 21, 2024
@artembilan
Copy link
Member

Was going to suggest another workaround:

.channel("notObserved")

and

@EnableIntegrationManagement(observationPatterns = "!notObserved")

or

@EnableIntegrationManagement(observationPatterns ={  "!notObserved", "*" })

But that does not work: the first one just ignores everything else. The second one ends up with a sort in HashSet, where * got lower hash, therefore it comes first in a matching loop 😄 .

Will raise separate issue and fix. Nothing related for this yet.

@artembilan
Copy link
Member

Here is that issue: #9260.

BTW, feel free to contribute fixes: https://github.com/spring-projects/spring-integration/blob/main/CONTRIBUTING.adoc

@artembilan
Copy link
Member

So, our logic for the reactive reply from this kind of components is like Mono.toFuture().
And looks like there is no Reactor context propagation in there down to the CompletableFuture consumer.

Thinking...

@artembilan
Copy link
Member

I have made the fix and now your sample application is OK:

2024-06-28T11:49:18.915-04:00  INFO 31944 --- [   scheduling-1] [667edb7e9be7aa6059b96f78bc67c011-ae124118f96ecbd9] o.s.integration.handler.LoggingHandler   : ----------------------START-------------------------
2024-06-28T11:49:18.919-04:00  INFO 31944 --- [   scheduling-1] [667edb7e9be7aa6059b96f78bc67c011-50473877464ebff9] o.s.integration.handler.LoggingHandler   : WebClient GET Request (start of reactorContext)
2024-06-28T11:49:18.978-04:00  INFO 31944 --- [   scheduling-1] [667edb7e9be7aa6059b96f78bc67c011-8ae480e6d77503b6] c.i.sample.config.TracingConfig          : Request Headers: [Content-Type:"application/json", traceparent:"00-667edb7e9be7aa6059b96f78bc67c011-8ae480e6d77503b6-01"]
2024-06-28T11:49:19.640-04:00  INFO 31944 --- [ctor-http-nio-3] [667edb7e9be7aa6059b96f78bc67c011-8ae480e6d77503b6] c.i.sample.config.TracingConfig          : Response Headers: [Access-Control-Allow-Origin:"*", Alt-Svc:"h3=":443"; ma=2592000", Content-Type:"application/json", Date:"Fri, 28 Jun 2024 15:49:19 GMT", Server:"Caddy", Vary:"Accept-Encoding", Transfer-Encoding:"chunked"]
2024-06-28T11:49:19.667-04:00 ERROR 31944 --- [oundedElastic-1] [667edb7e9be7aa6059b96f78bc67c011-f0b19daa83cd9fd5] .o.WebFluxRequestExecutingMessageHandler : Failed to send async reply: org.springframework.integration.support.MessageBuilder@21b720b4

Will issue Pull Request shortly.

@artembilan
Copy link
Member

Forgot to mention.
Now you don't need ec -> ec.customizeMonoReply((message, mono) -> mono.contextCapture()) since it is done in the framework by itself.

artembilan added a commit to artembilan/spring-integration that referenced this issue Jun 28, 2024
…eply

Fixes: spring-projects#9259

The `Mono.toFuture()` does not propagate context to thread locals of the `CompletableFuture` consumer.
See `MonoToCompletableFuture`

* Fix `AbstractMessageProducingHandler` to convert reply `Mono` to `CompletableFuture` manually.
Use `doOnEach()` and set thread locals from the Reactor context manually around `replyFuture.complete()/completeExceptionally()`
* Add respective unit test into `WebFluxObservationPropagationTests` to ensure that same trace is used in downstream endpoints after WebFlux client reply

**Auto-cherry-pick to `6.3.x` & `6.2.x`**
spring-builds pushed a commit that referenced this issue Jun 28, 2024
Fixes: #9259

The `Mono.toFuture()` does not propagate context to thread locals of the `CompletableFuture` consumer.
See `MonoToCompletableFuture`

* Fix `AbstractMessageProducingHandler` to convert reply `Mono` to `CompletableFuture` manually.
Use `doOnEach()` and set thread locals from the Reactor context manually around `replyFuture.complete()/completeExceptionally()`
* Add respective unit test into `WebFluxObservationPropagationTests` to ensure that same trace is used in downstream endpoints after WebFlux client reply

(cherry picked from commit 4a77dbc)
spring-builds pushed a commit that referenced this issue Jun 28, 2024
Fixes: #9259

The `Mono.toFuture()` does not propagate context to thread locals of the `CompletableFuture` consumer.
See `MonoToCompletableFuture`

* Fix `AbstractMessageProducingHandler` to convert reply `Mono` to `CompletableFuture` manually.
Use `doOnEach()` and set thread locals from the Reactor context manually around `replyFuture.complete()/completeExceptionally()`
* Add respective unit test into `WebFluxObservationPropagationTests` to ensure that same trace is used in downstream endpoints after WebFlux client reply

(cherry picked from commit 4a77dbc)
@syedyusufh
Copy link
Author

Thanks for the quick fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants