Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use only one request timeout mechanism in JdkClientHttpRequest #33090

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,26 @@

package org.springframework.http.client;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -92,28 +94,46 @@ public URI getURI() {
@Override
@SuppressWarnings("NullAway")
protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body body) throws IOException {
HttpRequest request = buildRequest(headers, body);
CompletableFuture<HttpResponse<InputStream>> responsefuture =
this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream());
try {
HttpRequest request = buildRequest(headers, body);
HttpResponse<InputStream> response;
if (this.timeout != null) {
response = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
}
else {
response = this.httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
CompletableFuture<Void> timeoutFuture = new CompletableFuture<Void>()
.completeOnTimeout(null, this.timeout.toMillis(), TimeUnit.MILLISECONDS);
timeoutFuture.thenRun(() -> {
if (!responsefuture.cancel(true) && !responsefuture.isCompletedExceptionally()) {
Copy link
Contributor

@rstoyanchev rstoyanchev Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be checking if the task was cancelled successfully, i.e. that cancel() returns true?

try {
responsefuture.resultNow().body().close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we really be sure there is a body, I imagine the timeout could happen before the server responds with status and headers.

} catch (IOException ignored) {}
}
});
var response = responsefuture.get();
return new JdkClientHttpResponse(response.statusCode(), response.headers(), new FilterInputStream(response.body()) {

@Override
public void close() throws IOException {
timeoutFuture.cancel(false);
super.close();
}
});

} else {
var response = responsefuture.get();
return new JdkClientHttpResponse(response.statusCode(), response.headers(), response.body());
}
return new JdkClientHttpResponse(response);
}
catch (UncheckedIOException ex) {
throw ex.getCause();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
responsefuture.cancel(true);
throw new IOException("Request was interrupted: " + ex.getMessage(), ex);
}
catch (ExecutionException ex) {
Throwable cause = ex.getCause();

if (cause instanceof CancellationException caEx) {
throw new HttpTimeoutException("Request timed out");
}
if (cause instanceof UncheckedIOException uioEx) {
throw uioEx.getCause();
}
Expand All @@ -127,17 +147,11 @@ else if (cause instanceof IOException ioEx) {
throw new IOException(cause.getMessage(), cause);
}
}
catch (TimeoutException ex) {
throw new IOException("Request timed out: " + ex.getMessage(), ex);
}
}


private HttpRequest buildRequest(HttpHeaders headers, @Nullable Body body) {
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(this.uri);
if (this.timeout != null) {
builder.timeout(this.timeout);
}

headers.forEach((headerName, headerValues) -> {
if (!DISALLOWED_HEADERS.contains(headerName.toLowerCase())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -41,22 +40,21 @@
*/
class JdkClientHttpResponse implements ClientHttpResponse {

private final HttpResponse<InputStream> response;
private final int statusCode;

private final HttpHeaders headers;

private final InputStream body;


public JdkClientHttpResponse(HttpResponse<InputStream> response) {
this.response = response;
this.headers = adaptHeaders(response);
InputStream inputStream = response.body();
this.body = (inputStream != null ? inputStream : InputStream.nullInputStream());
public JdkClientHttpResponse(int statusCode, java.net.http.HttpHeaders headers, InputStream body) {
this.statusCode = statusCode;
this.headers = adaptHeaders(headers);
this.body = body != null ? body : InputStream.nullInputStream();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could have constructors like these:

public JdkClientHttpResponse(HttpResponse<InputStream> response) {
    this(response, response.getInputStream());
}

public JdkClientHttpResponse(HttpResponse<InputStream> response, InputStream body) {

}

That would keep the logic mostly as it was before and avoid the full qualified java.net.http.HttpHeaders type.

}

private static HttpHeaders adaptHeaders(HttpResponse<?> response) {
Map<String, List<String>> rawHeaders = response.headers().map();
private static HttpHeaders adaptHeaders(java.net.http.HttpHeaders headers) {
Map<String, List<String>> rawHeaders = headers.map();
Map<String, List<String>> map = new LinkedCaseInsensitiveMap<>(rawHeaders.size(), Locale.ENGLISH);
MultiValueMap<String, String> multiValueMap = CollectionUtils.toMultiValueMap(map);
multiValueMap.putAll(rawHeaders);
Expand All @@ -66,7 +64,7 @@ private static HttpHeaders adaptHeaders(HttpResponse<?> response) {

@Override
public HttpStatusCode getStatusCode() {
return HttpStatusCode.valueOf(this.response.statusCode());
return HttpStatusCode.valueOf(statusCode);
}

@Override
Expand Down