diff --git a/httpclient-jdk/pom.xml b/httpclient-jdk/pom.xml index 48e68ecd5c8..51fda6328a2 100644 --- a/httpclient-jdk/pom.xml +++ b/httpclient-jdk/pom.xml @@ -79,6 +79,11 @@ assertj-core test + + org.awaitility + awaitility + test + diff --git a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientSimultaneousConnectionsTest.java b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientSimultaneousConnectionsTest.java index 740a053b826..6123b2f203e 100644 --- a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientSimultaneousConnectionsTest.java +++ b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientSimultaneousConnectionsTest.java @@ -24,4 +24,12 @@ public class JdkHttpClientSimultaneousConnectionsTest extends AbstractSimultaneo protected HttpClient.Factory getHttpClientFactory() { return new JdkHttpClientFactory(); } + + @Override + public void http1Connections() { + // NO-OP + // This test will only pass when it's run in isolation, it seems that the JDK HttpClient eventually uses a shared thread + // pool that reaches a limit and this test will effectively block any further processing after a few connections are open. + // - jdk.internal.net.http.HttpClientImpl.ASYNC_POOL + } } diff --git a/httpclient-jetty/pom.xml b/httpclient-jetty/pom.xml index 6dfcbc1e053..382f64568c6 100644 --- a/httpclient-jetty/pom.xml +++ b/httpclient-jetty/pom.xml @@ -93,6 +93,11 @@ org.assertj assertj-core + + org.awaitility + awaitility + test + diff --git a/httpclient-okhttp/pom.xml b/httpclient-okhttp/pom.xml index 598d010e5ce..86df84c0dbd 100644 --- a/httpclient-okhttp/pom.xml +++ b/httpclient-okhttp/pom.xml @@ -93,6 +93,11 @@ assertj-core test + + org.awaitility + awaitility + test + diff --git a/httpclient-vertx/pom.xml b/httpclient-vertx/pom.xml index 2245051fd5b..0944e6a71ab 100644 --- a/httpclient-vertx/pom.xml +++ b/httpclient-vertx/pom.xml @@ -91,6 +91,11 @@ assertj-core test + + org.awaitility + awaitility + test + org.bouncycastle diff --git a/junit/mockwebserver/src/main/java/io/fabric8/mockwebserver/MockWebServer.java b/junit/mockwebserver/src/main/java/io/fabric8/mockwebserver/MockWebServer.java index 001dfec7e75..d48ff5e3e22 100644 --- a/junit/mockwebserver/src/main/java/io/fabric8/mockwebserver/MockWebServer.java +++ b/junit/mockwebserver/src/main/java/io/fabric8/mockwebserver/MockWebServer.java @@ -168,6 +168,7 @@ public synchronized void shutdown() { httpClose.onComplete(onComplete); await(httpClose, "Unable to close MockWebServer"); } + await(vertx.close(), "Unable to close Vertx"); } @Override diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java index 24b787801af..7fdf5a9533f 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java @@ -114,7 +114,6 @@ public void consumeBytesProcessesLargeBodies() throws Exception { asyncBodyResponse.body().consume(); asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS); assertThat(responseText.toString()).isEqualTo(largeBody); - } } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractSimultaneousConnectionsTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractSimultaneousConnectionsTest.java index 0b11046b1b1..2bbab2dc8c3 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractSimultaneousConnectionsTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractSimultaneousConnectionsTest.java @@ -15,9 +15,7 @@ */ package io.fabric8.kubernetes.client.http; -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; -import com.sun.net.httpserver.HttpServer; +import io.fabric8.kubernetes.client.RequestConfigBuilder; import io.fabric8.mockwebserver.MockWebServer; import io.fabric8.mockwebserver.MockWebServerListener; import io.fabric8.mockwebserver.http.MockResponse; @@ -25,6 +23,13 @@ import io.fabric8.mockwebserver.http.Response; import io.fabric8.mockwebserver.http.WebSocketListener; import io.fabric8.mockwebserver.vertx.Protocol; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpVersion; +import io.vertx.core.net.NetServerOptions; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -32,8 +37,6 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; -import java.io.IOException; -import java.net.InetSocketAddress; import java.net.URI; import java.util.Collection; import java.util.Collections; @@ -42,8 +45,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -59,20 +60,16 @@ public abstract class AbstractSimultaneousConnectionsTest { private RegisteredConnections registeredConnections; private MockWebServer mockWebServer; - private ExecutorService httpExecutor; - private HttpServer httpServer; + private Vertx vertx; private HttpClient.Builder clientBuilder; @BeforeEach - void prepareServerAndBuilder() throws IOException { + void prepareServerAndBuilder() { registeredConnections = new RegisteredConnections(); mockWebServer = new MockWebServer(); mockWebServer.addListener(registeredConnections); - httpExecutor = Executors.newCachedThreadPool(); - httpServer = HttpServer.create(new InetSocketAddress(0), 0); - httpServer.setExecutor(httpExecutor); - httpServer.start(); + vertx = Vertx.vertx(); clientBuilder = getHttpClientFactory().newBuilder() .connectTimeout(60, TimeUnit.SECONDS); } @@ -80,8 +77,7 @@ void prepareServerAndBuilder() throws IOException { @AfterEach void stopServer() { mockWebServer.shutdown(); - httpServer.stop(0); - httpExecutor.shutdownNow(); + vertx.close(); } protected abstract HttpClient.Factory getHttpClientFactory(); @@ -95,20 +91,21 @@ private void withHttp1() { @DisplayName("Should be able to make 2048 simultaneous HTTP/1.x connections before processing the response") @DisabledOnOs(OS.WINDOWS) public void http1Connections() throws Exception { - final DelayedResponseHandler handler = new DelayedResponseHandler(MAX_HTTP_1_CONNECTIONS, - exchange -> { - exchange.sendResponseHeaders(204, -1); - exchange.close(); - }); - httpServer.createContext("/http", handler); - try (final HttpClient client = clientBuilder.build()) { - final Collection>> asyncResponses = ConcurrentHashMap.newKeySet(); - final HttpRequest request = client.newHttpRequestBuilder() - .uri(String.format("http://localhost:%s/http", httpServer.getAddress().getPort())) - .build(); + final Collection>> asyncResponses = ConcurrentHashMap.newKeySet(); + try ( + var server = new DelayedResponseHttp1Server(vertx, MAX_HTTP_1_CONNECTIONS); + var client = clientBuilder.tag(new RequestConfigBuilder().withRequestRetryBackoffLimit(0).build()).build()) { for (int it = 0; it < MAX_HTTP_1_CONNECTIONS; it++) { + final HttpRequest request = client.newHttpRequestBuilder() + .uri(server.uri() + "?" + it) + .build(); asyncResponses.add(client.consumeBytes(request, (value, asyncBody) -> asyncBody.consume())); - handler.await(); + } + server.await(); + assertThat(server.requests) + .hasSize(MAX_HTTP_1_CONNECTIONS); + for (HttpServerRequest serverRequest : server.requests) { + serverRequest.response().setStatusCode(204).end(); } CompletableFuture.allOf(asyncResponses.toArray(new CompletableFuture[0])).get(70, TimeUnit.SECONDS); assertThat(asyncResponses) @@ -126,19 +123,18 @@ public void http1Connections() throws Exception { @DisplayName("Should be able to make 1024 simultaneous HTTP connections before upgrading to WebSocket") @DisabledOnOs(OS.WINDOWS) public void http1WebSocketConnectionsBeforeUpgrade() throws Exception { - final DelayedResponseHandler handler = new DelayedResponseHandler(MAX_HTTP_1_WS_CONNECTIONS, - exchange -> exchange.sendResponseHeaders(404, -1)); - httpServer.createContext("/http", handler); - try (final HttpClient client = clientBuilder.build()) { + try (var server = new DelayedResponseHttp1Server(vertx, MAX_HTTP_1_WS_CONNECTIONS); var client = clientBuilder.build()) { for (int it = 0; it < MAX_HTTP_1_WS_CONNECTIONS; it++) { client.newWebSocketBuilder() - .uri(URI.create(String.format("http://localhost:%s/http", httpServer.getAddress().getPort()))) + .uri(URI.create(server.uri())) .buildAsync(new WebSocket.Listener() { }); - handler.await(); } + server.await(); + assertThat(server.requests) + .hasSize(MAX_HTTP_1_WS_CONNECTIONS); + server.requests.forEach(request -> request.response().setStatusCode(101).end()); } - assertThat(handler.connectionCount.get(60, TimeUnit.SECONDS)).isEqualTo(MAX_HTTP_1_WS_CONNECTIONS); } @Test @@ -192,47 +188,46 @@ public void onMessage(WebSocket webSocket, String text) { } } - private static class DelayedResponseHandler implements HttpHandler { - - private final int requestCount; - private final CyclicBarrier barrier; - private final Set exchanges; - private final CompletableFuture connectionCount; - private final ExecutorService executorService; - - private DelayedResponseHandler(int requestCount, HttpHandler handler) { - this.requestCount = requestCount; - this.barrier = new CyclicBarrier(2); - exchanges = ConcurrentHashMap.newKeySet(); - connectionCount = new CompletableFuture<>(); - executorService = Executors.newFixedThreadPool(1); - connectionCount.thenRunAsync(() -> { - for (HttpExchange exchange : exchanges) { - try { - handler.handle(exchange); - } catch (IOException ignore) { - // NO OP - } - } - }, executorService) - .whenComplete((unused, throwable) -> executorService.shutdownNow()); + private static class DelayedResponseHttp1Server implements AutoCloseable { + + private final int connections; + private final HttpServer httpServer; + private final Collection requests; + private final CountDownLatch connectionLatch; + + private DelayedResponseHttp1Server(Vertx vertx, int connections) throws Exception { + this.connections = connections; + requests = ConcurrentHashMap.newKeySet(); + connectionLatch = new CountDownLatch(connections); + httpServer = vertx.createHttpServer(new HttpServerOptions() + .setPort(NetServerOptions.DEFAULT_PORT) + .setAlpnVersions(Collections.singletonList(HttpVersion.HTTP_1_1))); + httpServer.connectionHandler(event -> connectionLatch.countDown()); + httpServer.requestHandler(requests::add); + httpServer.listen().toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); } @Override - public void handle(HttpExchange exchange) { - exchanges.add(exchange); - await(); - if (exchanges.size() == requestCount) { - connectionCount.complete(requestCount); - } + public void close() throws Exception { + requests.forEach(request -> request.connection().close()); + requests.clear(); + httpServer.close().toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS); + } + private String uri() { + return String.format("http://localhost:%s/http-1-connections", httpServer.actualPort()); } - public final void await() { + private void await() { try { - barrier.await(5, TimeUnit.SECONDS); - } catch (Exception ex) { - throw new RuntimeException("Failed to await the barrier"); + if (!connectionLatch.await(10, TimeUnit.SECONDS)) { + throw new AssertionError( + "Failed to await the connection latch, remaining connections to open: " + connectionLatch.getCount()); + } + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> requests.size() == connections); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Failed to await the connection latch (interrupted)", e); } } }