diff --git a/http/http2/src/main/java/io/helidon/http/http2/Http2Ping.java b/http/http2/src/main/java/io/helidon/http/http2/Http2Ping.java index 7518bfcf527..53b9a38452b 100644 --- a/http/http2/src/main/java/io/helidon/http/http2/Http2Ping.java +++ b/http/http2/src/main/java/io/helidon/http/http2/Http2Ping.java @@ -22,6 +22,7 @@ * Ping frame. */ public final class Http2Ping implements Http2Frame { + private static final byte[] EMPTY_PING_DATA = new byte[8]; private final BufferData data; Http2Ping(BufferData data) { @@ -38,6 +39,15 @@ public static Http2Ping create(BufferData data) { return new Http2Ping(data); } + /** + * Create ping. + * + * @return ping frame + */ + public static Http2Ping create() { + return new Http2Ping(BufferData.create(EMPTY_PING_DATA)); + } + @Override public Http2FrameData toFrameData(Http2Settings settings, int streamId, Http2Flag.PingFlags flags) { Http2FrameHeader header = Http2FrameHeader.create(data.available(), @@ -48,6 +58,20 @@ public Http2FrameData toFrameData(Http2Settings settings, int streamId, Http2Fla return new Http2FrameData(header, data); } + /** + * Representation of ping data. + * + * @return frame data crated from this ping + */ + public Http2FrameData toFrameData() { + Http2FrameHeader header = Http2FrameHeader.create(data.available(), + frameTypes(), + Http2Flag.PingFlags.create(0), + 0); + + return new Http2FrameData(header, data); + } + @Override public String name() { return Http2FrameType.PING.name(); diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java index 50de3c13ee0..8359c51ce23 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -33,6 +34,7 @@ import io.helidon.common.buffers.DataWriter; import io.helidon.common.socket.SocketContext; import io.helidon.http.http2.ConnectionFlowControl; +import io.helidon.http.http2.FlowControl; import io.helidon.http.http2.Http2ConnectionWriter; import io.helidon.http.http2.Http2ErrorCode; import io.helidon.http.http2.Http2Exception; @@ -70,11 +72,13 @@ class Http2ClientConnection { private final ConnectionFlowControl connectionFlowControl; private final Http2Headers.DynamicTable inboundDynamicTable = Http2Headers.DynamicTable.create(Http2Setting.HEADER_TABLE_SIZE.defaultValue()); + private final Http2ClientProtocolConfig protocolConfig; private final ClientConnection connection; private final SocketContext ctx; private final Http2ConnectionWriter writer; private final DataReader reader; private final DataWriter dataWriter; + private final Semaphore pingPongSemaphore = new Semaphore(0); private volatile int lastStreamId; private Http2Settings serverSettings = Http2Settings.builder() @@ -93,6 +97,7 @@ public boolean closed(){ .initialWindowSize(protocolConfig.initialWindowSize()) .blockTimeout(protocolConfig.flowControlBlockTimeout()) .build(); + this.protocolConfig = protocolConfig; this.connection = connection; this.ctx = connection.helidonSocket(); this.dataWriter = connection.writer(); @@ -172,6 +177,24 @@ Http2ClientStream tryStream(Http2StreamConfig config) { } } + boolean ping() { + Http2Ping ping = Http2Ping.create(); + Http2FrameData frameData = ping.toFrameData(); + sendListener.frameHeader(ctx, 0, frameData.header()); + sendListener.frame(ctx, 0, ping); + try { + this.writer().writeData(frameData, FlowControl.Outbound.NOOP); + return pingPongSemaphore.tryAcquire(protocolConfig.pingTimeout().toMillis(), TimeUnit.MILLISECONDS); + } catch (UncheckedIOException | InterruptedException e) { + ctx.log(LOGGER, DEBUG, "Ping failed!", e); + return false; + } + } + + void pong() { + pingPongSemaphore.release(); + } + void updateLastStreamId(int lastStreamId){ this.lastStreamId = lastStreamId; } @@ -247,6 +270,7 @@ private void start(Http2ClientProtocolConfig protocolConfig, } ctx.log(LOGGER, TRACE, "Client listener interrupted"); } catch (Throwable t) { + closed = true; ctx.log(LOGGER, DEBUG, "Failed to handle HTTP/2 client connection", t); } }); @@ -385,6 +409,8 @@ private boolean handle() { Http2Flag.PingFlags.create(Http2Flag.ACK), 0); writer.write(new Http2FrameData(header, frame)); + } else { + pong(); } break; diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnectionHandler.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnectionHandler.java index 386095dc6c7..dce1aa725f5 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnectionHandler.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnectionHandler.java @@ -115,7 +115,7 @@ Http2ConnectionAttemptResult http2(Http2ClientImpl http2Client, } } - return new Http2ConnectionAttemptResult(Result.HTTP_2, stream, null); + return new Http2ConnectionAttemptResult(Result.HTTP_2, conn, stream, null); } finally { lock.unlock(); } @@ -188,6 +188,7 @@ private Http2ConnectionAttemptResult httpX(Http2ClientImpl http2Client, } else { result.set(Result.HTTP_1); return new Http2ConnectionAttemptResult(Result.HTTP_1, + null, null, upgradeResponse.response()); } @@ -209,6 +210,7 @@ private Http2ConnectionAttemptResult http1(Http2ClientImpl http2Client, Http2ClientRequestImpl request, ClientUri initialUri, Function http1EntityHandler) { return new Http2ConnectionAttemptResult(Result.HTTP_1, + null, null, http1EntityHandler.apply(http1Request( http2Client.webClient(), diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientProtocolConfigBlueprint.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientProtocolConfigBlueprint.java index c66735b7bd4..0be3b431d38 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientProtocolConfigBlueprint.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientProtocolConfigBlueprint.java @@ -100,4 +100,12 @@ default String type() { */ @ConfiguredOption("PT0.1S") Duration flowControlBlockTimeout(); + + /** + * Timeout for ping probe used for checking healthiness of cached connections. + * + * @return timeout + */ + @ConfiguredOption("PT0.5S") + Duration pingTimeout(); } diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionAttemptResult.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionAttemptResult.java index 77dbe39c0fc..75277ce9e2e 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionAttemptResult.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionAttemptResult.java @@ -19,6 +19,7 @@ import io.helidon.webclient.api.HttpClientResponse; record Http2ConnectionAttemptResult(Result result, + Http2ClientConnection connection, Http2ClientStream stream, HttpClientResponse response) { enum Result { diff --git a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionCache.java b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionCache.java index 8dd1538bd26..1e8d55aeefd 100644 --- a/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionCache.java +++ b/webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ConnectionCache.java @@ -26,7 +26,10 @@ import io.helidon.webclient.http1.Http1ClientRequest; import io.helidon.webclient.http1.Http1ClientResponse; +import static java.lang.System.Logger.Level.DEBUG; + final class Http2ConnectionCache { + private static final System.Logger LOGGER = System.getLogger(Http2ConnectionCache.class.getName()); //todo Gracefully close connections in channel cache private static final Http2ConnectionCache SHARED = create(); private final LruCache http2Supported = LruCache.builder() @@ -47,7 +50,10 @@ boolean supports(ConnectionKey ck) { } void remove(ConnectionKey connectionKey) { - cache.remove(connectionKey); + Http2ClientConnectionHandler handler = cache.remove(connectionKey); + if (handler != null) { + handler.close(); + } http2Supported.remove(connectionKey); } @@ -57,17 +63,35 @@ Http2ConnectionAttemptResult newStream(Http2ClientImpl http2Client, ClientUri initialUri, Function http1EntityHandler) { - // this statement locks all threads - must not do anything complicated (just create a new instance) - Http2ConnectionAttemptResult result = - cache.computeIfAbsent(connectionKey, Http2ClientConnectionHandler::new) - // this statement may block a single connection key - .newStream(http2Client, - request, - initialUri, - http1EntityHandler); - if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) { - http2Supported.put(connectionKey, true); + for (int i = 0; i < 2; i++) { + // this statement locks all threads - must not do anything complicated (just create a new instance) + Http2ClientConnectionHandler connectionHandler = + cache.computeIfAbsent(connectionKey, Http2ClientConnectionHandler::new); + + // this statement may block a single connection key + Http2ConnectionAttemptResult result = connectionHandler.newStream(http2Client, + request, + initialUri, + http1EntityHandler); + + if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) { + http2Supported.put(connectionKey, true); + } + + if (result.connection() == null) { + // Upgrade failed, its HTTP/1.1 + return result; + } + + if (!result.connection().closed() + && result.connection().ping()) { + return result; + } + + LOGGER.log(DEBUG, "Cached connection is not healthy!"); + remove(connectionKey); } - return result; + + throw new RuntimeException("Unable to create a stream from either cached or new connection!"); } } diff --git a/webserver/tests/upgrade/src/main/java/io/helidon/webserver/tests/upgrade/Main.java b/webserver/tests/upgrade/src/main/java/io/helidon/webserver/tests/upgrade/Main.java index 94c6dfa6e82..8ad98e8bd65 100644 --- a/webserver/tests/upgrade/src/main/java/io/helidon/webserver/tests/upgrade/Main.java +++ b/webserver/tests/upgrade/src/main/java/io/helidon/webserver/tests/upgrade/Main.java @@ -60,8 +60,12 @@ public static WebServer startServer(boolean ssl) { .route(Http2Route.route(GET, "/versionspecific2", (req, res) -> res.send("HTTP/2.0 route\n"))) .route(Http.Method.predicate(GET, POST, PUT), PathMatchers.create("/multi*"), - (req, res) -> res.send("HTTP/" + req.prologue().protocolVersion() - + " route " + req.prologue().method() + "\n"))) + (req, res) -> + { + req.content().consume(); // Workaround for #7427 + res.send("HTTP/" + req.prologue().protocolVersion() + + " route " + req.prologue().method() + "\n"); + })) .addRouting(WsRouting.builder() .endpoint("/ws-echo", new EchoWsListener()) .build()) diff --git a/webserver/tests/upgrade/src/test/java/io/helidon/webserver/tests/upgrade/test/SharedHttp2CacheTest.java b/webserver/tests/upgrade/src/test/java/io/helidon/webserver/tests/upgrade/test/SharedHttp2CacheTest.java new file mode 100644 index 00000000000..c6f0bc4fbe8 --- /dev/null +++ b/webserver/tests/upgrade/src/test/java/io/helidon/webserver/tests/upgrade/test/SharedHttp2CacheTest.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.tests.upgrade.test; + +import io.helidon.http.Http; +import io.helidon.logging.common.LogConfig; +import io.helidon.webclient.http2.Http2Client; +import io.helidon.webclient.http2.Http2ClientProtocolConfig; +import io.helidon.webserver.WebServer; +import io.helidon.webserver.http.HttpRouting; +import io.helidon.webserver.http2.Http2Route; + +import org.junit.jupiter.api.Test; + +import static io.helidon.http.Http.Method.POST; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class SharedHttp2CacheTest { + @Test + void cacheHttp2WithServerRestart() { + LogConfig.configureRuntime(); + WebServer webServer = null; + try { + HttpRouting routing = HttpRouting.builder() + .route(Http2Route.route(POST, "/versionspecific", (req, res) -> { + req.content().consume(); // Workaround for #7427 + res.send(); + })) + .build(); + + webServer = WebServer.builder() + .routing(routing) + .build() + .start(); + + int port = webServer.port(); + + Http2Client webClient = Http2Client.builder() + .protocolConfig(Http2ClientProtocolConfig.builder().priorKnowledge(true).build()) + .keepAlive(true) + .baseUri("http://localhost:" + port + "/versionspecific") + .build(); + + try (var res = webClient.post().submit("WHATEVER")) { + assertThat(res.status(), is(Http.Status.OK_200)); + } + + webServer.stop(); + webServer = WebServer.builder() + .port(port) + .routing(routing) + .build() + .start(); + + try (var res = webClient.post().submit("WHATEVER")) { + assertThat(res.status(), is(Http.Status.OK_200)); + } + } finally { + if (webServer != null) { + webServer.stop(); + } + } + } + + @Test + void cacheHttp2() { + LogConfig.configureRuntime(); + Http.HeaderName clientPortHeader = Http.HeaderNames.create("client-port"); + WebServer webServer = null; + try { + HttpRouting routing = HttpRouting.builder() + .route(Http2Route.route(POST, "/versionspecific", (req, res) -> { + req.content().consume(); + res.header(clientPortHeader, String.valueOf(req.remotePeer().port())) + .send(); + })) + .build(); + + webServer = WebServer.builder() + .routing(routing) + .build() + .start(); + + int port = webServer.port(); + + Http2Client webClient = Http2Client.builder() + .protocolConfig(Http2ClientProtocolConfig.builder().priorKnowledge(true).build()) + .keepAlive(true) + .baseUri("http://localhost:" + port + "/versionspecific") + .build(); + + Integer firstReqClientPort; + try (var res = webClient.post().submit("WHATEVER")) { + firstReqClientPort = res.headers().get(clientPortHeader).value(Integer.TYPE); + assertThat(res.status(), is(Http.Status.OK_200)); + } + + Integer secondReqClientPort; + try (var res = webClient.post().submit("WHATEVER")) { + secondReqClientPort = res.headers().get(clientPortHeader).value(Integer.TYPE); + assertThat(res.status(), is(Http.Status.OK_200)); + } + + assertThat("In case of cached connection client port must be the same.", + secondReqClientPort, + is(firstReqClientPort)); + } finally { + if (webServer != null) { + webServer.stop(); + } + } + } +} diff --git a/webserver/tests/upgrade/src/test/java/io/helidon/webserver/tests/upgrade/test/UpgradeCodecsCompositionTest.java b/webserver/tests/upgrade/src/test/java/io/helidon/webserver/tests/upgrade/test/UpgradeCodecsCompositionTest.java index 89e7e6121b3..b2efaf7d03f 100644 --- a/webserver/tests/upgrade/src/test/java/io/helidon/webserver/tests/upgrade/test/UpgradeCodecsCompositionTest.java +++ b/webserver/tests/upgrade/src/test/java/io/helidon/webserver/tests/upgrade/test/UpgradeCodecsCompositionTest.java @@ -50,7 +50,6 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -208,11 +207,10 @@ void versionSpecificHttp20Negative(String url) throws IOException, InterruptedEx "HTTP/1.1 GET http://localhost:%d/multi-something", "HTTP/1.1 PUT https://localhost:%d/multi-something", "HTTP/1.1 POST https://localhost:%d/multi-something", - //"HTTP/2.0 GET http://localhost:%d/multi-something", + "HTTP/2.0 GET http://localhost:%d/multi-something", "HTTP/2.0 PUT https://localhost:%d/multi-something", "HTTP/2.0 POST https://localhost:%d/multi-something", }) - @Disabled("Fails on pipeline") void versionSpecificHttp20MultipleMethods(String param) throws IOException, InterruptedException { String[] split = param.split("\s"); String version = split[0];