Skip to content

Commit

Permalink
Refactor ReactorClientHttpRequestFactory timeouts
Browse files Browse the repository at this point in the history
Closes gh-33782
  • Loading branch information
rstoyanchev committed Oct 28, 2024
1 parent 044da79 commit 4749d81
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,50 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest

private final URI uri;

@Nullable
private final Duration exchangeTimeout;

private final Duration readTimeout;

/**
* Create an instance.
* @param httpClient the client to perform the request with
* @param method the HTTP method
* @param uri the URI for the request
* @since 6.2
*/
public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri) {
this.httpClient = httpClient;
this.method = method;
this.uri = uri;
this.exchangeTimeout = null;
}

/**
* Package private constructor for use until exchangeTimeout is removed.
*/
ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Duration exchangeTimeout) {
this.httpClient = httpClient;
this.method = method;
this.uri = uri;
this.exchangeTimeout = exchangeTimeout;
}

/**
* Original constructor with timeout values.
* @deprecated without a replacement; readTimeout is now applied to the
* underlying client via {@link HttpClient#responseTimeout(Duration)}, and the
* value passed here is not used; exchangeTimeout is deprecated and superseded
* by Reactor Netty timeout configuration, but applied if set.
*/
@Deprecated(since = "6.2", forRemoval = true)
public ReactorClientHttpRequest(
HttpClient httpClient, URI uri, HttpMethod method, Duration exchangeTimeout, Duration readTimeout) {
HttpClient httpClient, URI uri, HttpMethod method,
@Nullable Duration exchangeTimeout, @Nullable Duration readTimeout) {

this.httpClient = httpClient;
this.method = method;
this.uri = uri;
this.exchangeTimeout = exchangeTimeout;
this.readTimeout = readTimeout;
}


Expand All @@ -89,18 +120,19 @@ protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body
sender = (this.uri.isAbsolute() ? sender.uri(this.uri) : sender.uri(this.uri.toString()));

try {
ReactorClientHttpResponse result =
Mono<ReactorClientHttpResponse> mono =
sender.send((request, outbound) -> send(headers, body, request, outbound))
.responseConnection((response, conn) ->
Mono.just(new ReactorClientHttpResponse(response, conn, this.readTimeout)))
.next()
.block(this.exchangeTimeout);
.responseConnection((response, conn) -> Mono.just(new ReactorClientHttpResponse(response, conn)))
.next();

ReactorClientHttpResponse clientResponse =
(this.exchangeTimeout != null ? mono.block(this.exchangeTimeout) : mono.block());

if (result == null) {
if (clientResponse == null) {
throw new IOException("HTTP exchange resulted in no result");
}

return result;
return clientResponse;
}
catch (RuntimeException ex) {
throw convertException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory

private static final Log logger = LogFactory.getLog(ReactorClientHttpRequestFactory.class);

private static final Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true);
private static final Function<HttpClient, HttpClient> defaultInitializer =
client -> client.compress(true).responseTimeout(Duration.ofSeconds(10));


@Nullable
Expand All @@ -60,9 +61,11 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
@Nullable
private Integer connectTimeout;

private Duration readTimeout = Duration.ofSeconds(10);
@Nullable
private Duration readTimeout;

private Duration exchangeTimeout = Duration.ofSeconds(5);
@Nullable
private Duration exchangeTimeout;

@Nullable
private volatile HttpClient httpClient;
Expand Down Expand Up @@ -120,12 +123,15 @@ private HttpClient createHttpClient(ReactorResourceFactory factory, Function<Htt
if (this.connectTimeout != null) {
client = client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout);
}
if (this.readTimeout != null) {
client = client.responseTimeout(this.readTimeout);
}
return client;
}


/**
* Set the underlying connect timeout value on the underlying client.
* Set the connect timeout value on the underlying client.
* Effectively, a shortcut for
* {@code httpClient.option(CONNECT_TIMEOUT_MILLIS, timeout)}.
* <p>By default, set to 30 seconds.
Expand All @@ -152,35 +158,53 @@ public void setConnectTimeout(Duration connectTimeout) {
}

/**
* Set the underlying read timeout in milliseconds.
* <p>Default is 10 seconds.
* Set the read timeout value on the underlying client.
* Effectively, a shortcut for {@link HttpClient#responseTimeout(Duration)}.
* <p>By default, set to 10 seconds.
* @param timeout the read timeout value in millis; must be > 0.
*/
public void setReadTimeout(long readTimeout) {
Assert.isTrue(readTimeout > 0, "Timeout must be a positive value");
this.readTimeout = Duration.ofMillis(readTimeout);
public void setReadTimeout(Duration timeout) {
Assert.notNull(timeout, "ReadTimeout must not be null");
Assert.isTrue(timeout.toMillis() > 0, "Timeout must be a positive value");
this.readTimeout = timeout;
HttpClient httpClient = this.httpClient;
if (httpClient != null) {
this.httpClient = httpClient.responseTimeout(timeout);
}
}

/**
* Variant of {@link #setConnectTimeout(int)} with a {@link Duration} value.
* Variant of {@link #setReadTimeout(Duration)} with a long value.
*/
public void setReadTimeout(Duration readTimeout) {
Assert.notNull(readTimeout, "ReadTimeout must not be null");
setReadTimeout((int) readTimeout.toMillis());
public void setReadTimeout(long readTimeout) {
setReadTimeout(Duration.ofMillis(readTimeout));
}

/**
* Set the timeout for the HTTP exchange in milliseconds.
* <p>Default is 5 seconds.
* <p>By default, as of 6.2 this is no longer set.
* @see #setConnectTimeout(int)
* @see #setReadTimeout(Duration)
* @see <a href="https://projectreactor.io/docs/netty/release/reference/index.html#timeout-configuration">Timeout Configuration</a>
* @deprecated as of 6.2 and no longer set by default (previously 5 seconds)
* in favor of using Reactor Netty HttpClient timeout configuration.
*/
@Deprecated(since = "6.2", forRemoval = true)
public void setExchangeTimeout(long exchangeTimeout) {
Assert.isTrue(exchangeTimeout > 0, "Timeout must be a positive value");
this.exchangeTimeout = Duration.ofMillis(exchangeTimeout);
}

/**
* Set the timeout for the HTTP exchange.
* <p>Default is 5 seconds.
* Variant of {@link #setExchangeTimeout(long)} with a Duration value.
* <p>By default, as of 6.2 this is no longer set.
* @see #setConnectTimeout(int)
* @see #setReadTimeout(Duration)
* @see <a href="https://projectreactor.io/docs/netty/release/reference/index.html#timeout-configuration">Timeout Configuration</a>
* @deprecated as of 6.2 and no longer set by default (previously 5 seconds)
* in favor of using Reactor Netty HttpClient timeout configuration.
*/
@Deprecated(since = "6.2", forRemoval = true)
public void setExchangeTimeout(Duration exchangeTimeout) {
Assert.notNull(exchangeTimeout, "ExchangeTimeout must not be null");
setExchangeTimeout((int) exchangeTimeout.toMillis());
Expand All @@ -195,8 +219,7 @@ public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IO
"Expected HttpClient or ResourceFactory and mapper");
client = createHttpClient(this.resourceFactory, this.mapper);
}
return new ReactorClientHttpRequest(
client, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
return new ReactorClientHttpRequest(client, httpMethod, uri, this.exchangeTimeout);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.ByteBuf;
import org.reactivestreams.FlowAdapters;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;

import org.springframework.http.HttpHeaders;
Expand All @@ -46,22 +47,38 @@ final class ReactorClientHttpResponse implements ClientHttpResponse {

private final HttpHeaders headers;

private final Duration readTimeout;

@Nullable
private volatile InputStream body;


public ReactorClientHttpResponse(
HttpClientResponse response, Connection connection, Duration readTimeout) {

/**
* Create a response instance.
* @param response the Reactor Netty response
* @param connection the connection for the exchange
* @since 6.2
*/
public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) {
this.response = response;
this.connection = connection;
this.readTimeout = readTimeout;
this.headers = HttpHeaders.readOnlyHttpHeaders(
new Netty4HeadersAdapter(response.responseHeaders()));
}

/**
* Original constructor.
* @deprecated without a replacement; readTimeout is now applied to the
* underlying client via {@link HttpClient#responseTimeout(Duration)}, and the
* value passed here is not used.
*/
@Deprecated(since = "6.2", forRemoval = true)
public ReactorClientHttpResponse(
HttpClientResponse response, Connection connection, @Nullable Duration readTimeout) {

this.response = response;
this.connection = connection;
this.headers = HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(response.responseHeaders()));
}


@Override
public HttpStatusCode getStatusCode() {
Expand Down

0 comments on commit 4749d81

Please sign in to comment.