diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java index bb41cd8491b..b734e7fd20e 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java @@ -81,6 +81,7 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons if (response != null) { try { future.complete(new WebSocketResponse(null, + // passing null as the type ensures that the response body is closed new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t))); } catch (IOException e) { // can't happen diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java index f593e142bd9..0aef9500437 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java @@ -63,6 +63,8 @@ public CompletableFuture> consumeBytes(HttpRequest reque .afterFailure(copy, response) .thenCompose(b -> { if (Boolean.TRUE.equals(b)) { + // before starting another request, make sure the old one is cancelled / closed + response.body().cancel(); return consumeBytesDirect(copy.build(), consumer); } return CompletableFuture.completedFuture(response); diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index c4b7f2910df..9edbcdffe08 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -27,8 +27,10 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; public abstract class AbstractInterceptorTest { @@ -146,11 +148,13 @@ public void before(BasicBuilder builder, HttpHeaders headers) { public void afterHttpFailureReplacesResponseInSendAsync() throws Exception { // Given server.expect().withPath("/intercepted-url").andReturn(200, "This works").once(); + AtomicReference> responseRef = new AtomicReference<>(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override public CompletableFuture afterFailure(BasicBuilder builder, HttpResponse response) { builder.uri(URI.create(server.url("/intercepted-url"))); + responseRef.set(response); return CompletableFuture.completedFuture(true); } }); @@ -163,6 +167,8 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons assertThat(result) .returns("This works", HttpResponse::body) .returns(200, HttpResponse::code); + + assertTrue(((AsyncBody) responseRef.get().body()).done().isDone()); } } @@ -171,11 +177,13 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception { // Given server.expect().withPath("/intercepted-url").andReturn(200, "This works").once(); + AtomicReference> responseRef = new AtomicReference<>(); final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("test", new Interceptor() { @Override public CompletableFuture afterFailure(BasicBuilder builder, HttpResponse response) { builder.uri(URI.create(server.url("/intercepted-url"))); + responseRef.set(response); return CompletableFuture.completedFuture(true); } }); @@ -193,6 +201,8 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons asyncR.body().done().get(10L, TimeUnit.SECONDS); // Then assertThat(result.get()).isEqualTo("This works"); + + assertTrue(((AsyncBody) responseRef.get().body()).done().isDone()); } }