Skip to content

Commit

Permalink
Add Coroutines support for WebClient and WebFlux.fn
Browse files Browse the repository at this point in the history
This commit is the first part of a more complete Coroutines
support coming in Spring Framework 5.2. It introduces suspendable
Kotlin extensions for Mono based methods in WebFlux classes like
WebClient, ServerRequest, ServerResponse as well as a Coroutines
router usable via `coRouter { }`.

Coroutines extensions use `await` prefix or `AndAwait` suffix,
and most are using names close to their Reactive counterparts,
except `exchange` in `WebClient.RequestHeadersSpec`
which translates to `awaitResponse`.

Upcoming expected changes are:
 - Leverage `Dispatchers.Unconfined` (Kotlin/kotlinx.coroutines#972)
 - Expose extensions for `Flux` based API (Kotlin/kotlinx.coroutines#254)
 - Introduce interop with `CoroutineContext` (Kotlin/kotlinx.coroutines#284)
 - Support Coroutines in `ReactiveAdapterRegistry`
 - Support Coroutines for WebFlux annotated controllers
 - Fix return type of Kotlin suspending functions (gh-21058)

See gh-19975
  • Loading branch information
sdeleuze committed Feb 18, 2019
1 parent 04bb114 commit 19f792d
Show file tree
Hide file tree
Showing 16 changed files with 1,182 additions and 23 deletions.
2 changes: 2 additions & 0 deletions spring-webflux/spring-webflux.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ dependencies {
optional("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
optional("com.google.protobuf:protobuf-java-util:3.6.1")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.1.1")
testCompile("javax.xml.bind:jaxb-api:2.3.1")
testCompile("com.fasterxml:aalto-xml:1.1.1")
testCompile("org.hibernate:hibernate-validator:6.0.14.Final")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.web.reactive.function.client

import kotlinx.coroutines.reactive.awaitSingle
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.ResponseEntity
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -64,3 +65,30 @@ inline fun <reified T : Any> ClientResponse.toEntity(): Mono<ResponseEntity<T>>
*/
inline fun <reified T : Any> ClientResponse.toEntityList(): Mono<ResponseEntity<List<T>>> =
toEntityList(object : ParameterizedTypeReference<T>() {})

/**
* Coroutines variant of [ClientResponse.bodyToMono].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend inline fun <reified T : Any> ClientResponse.awaitBody(): T =
bodyToMono<T>().awaitSingle()

/**
* Coroutines variant of [ClientResponse.toEntity].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend inline fun <reified T : Any> ClientResponse.awaitEntity(): ResponseEntity<T> =
toEntity<T>().awaitSingle()

/**
* Coroutines variant of [ClientResponse.toEntityList].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend inline fun <reified T : Any> ClientResponse.awaitEntityList(): ResponseEntity<List<T>> =
toEntityList<T>().awaitSingle()
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,9 @@

package org.springframework.web.reactive.function.client

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.mono
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec
Expand Down Expand Up @@ -57,3 +60,30 @@ inline fun <reified T : Any> WebClient.ResponseSpec.bodyToMono(): Mono<T> =
*/
inline fun <reified T : Any> WebClient.ResponseSpec.bodyToFlux(): Flux<T> =
bodyToFlux(object : ParameterizedTypeReference<T>() {})

/**
* Coroutines variant of [WebClient.RequestHeadersSpec.exchange].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend fun WebClient.RequestHeadersSpec<out WebClient.RequestHeadersSpec<*>>.awaitResponse(): ClientResponse =
exchange().awaitSingle()

/**
* Coroutines variant of [WebClient.RequestBodySpec.body].
*
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T: Any> WebClient.RequestBodySpec.body(crossinline supplier: suspend () -> T)
= body(GlobalScope.mono { supplier.invoke() })

/**
* Coroutines variant of [WebClient.ResponseSpec.bodyToMono].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend inline fun <reified T : Any> WebClient.ResponseSpec.awaitBody() : T =
bodyToMono<T>().awaitSingle()
Loading

0 comments on commit 19f792d

Please sign in to comment.