Skip to content

Commit

Permalink
Http/2 Cached connection close detection
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Aug 23, 2023
1 parent 546a628 commit 55fae6f
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 18 deletions.
24 changes: 24 additions & 0 deletions http/http2/src/main/java/io/helidon/http/http2/Http2Ping.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Ping frame.
*/
public final class Http2Ping implements Http2Frame<Http2Flag.PingFlags> {
private static final byte[] EMPTY_PING_DATA = new byte[8];
private final BufferData data;

Http2Ping(BufferData data) {
Expand All @@ -38,6 +39,15 @@ public static Http2Ping create(BufferData data) {
return new Http2Ping(data);
}

/**
* Create ping.
*
* @return ping frame
*/
public static Http2Ping create() {
return new Http2Ping(BufferData.create(EMPTY_PING_DATA));
}

@Override
public Http2FrameData toFrameData(Http2Settings settings, int streamId, Http2Flag.PingFlags flags) {
Http2FrameHeader header = Http2FrameHeader.create(data.available(),
Expand All @@ -48,6 +58,20 @@ public Http2FrameData toFrameData(Http2Settings settings, int streamId, Http2Fla
return new Http2FrameData(header, data);
}

/**
* Representation of ping data.
*
* @return frame data crated from this ping
*/
public Http2FrameData toFrameData() {
Http2FrameHeader header = Http2FrameHeader.create(data.available(),
frameTypes(),
Http2Flag.PingFlags.create(0),
0);

return new Http2FrameData(header, data);
}

@Override
public String name() {
return Http2FrameType.PING.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -33,6 +34,7 @@
import io.helidon.common.buffers.DataWriter;
import io.helidon.common.socket.SocketContext;
import io.helidon.http.http2.ConnectionFlowControl;
import io.helidon.http.http2.FlowControl;
import io.helidon.http.http2.Http2ConnectionWriter;
import io.helidon.http.http2.Http2ErrorCode;
import io.helidon.http.http2.Http2Exception;
Expand Down Expand Up @@ -70,11 +72,13 @@ class Http2ClientConnection {
private final ConnectionFlowControl connectionFlowControl;
private final Http2Headers.DynamicTable inboundDynamicTable =
Http2Headers.DynamicTable.create(Http2Setting.HEADER_TABLE_SIZE.defaultValue());
private final Http2ClientProtocolConfig protocolConfig;
private final ClientConnection connection;
private final SocketContext ctx;
private final Http2ConnectionWriter writer;
private final DataReader reader;
private final DataWriter dataWriter;
private final Semaphore pingPongSemaphore = new Semaphore(0);
private volatile int lastStreamId;

private Http2Settings serverSettings = Http2Settings.builder()
Expand All @@ -93,6 +97,7 @@ public boolean closed(){
.initialWindowSize(protocolConfig.initialWindowSize())
.blockTimeout(protocolConfig.flowControlBlockTimeout())
.build();
this.protocolConfig = protocolConfig;
this.connection = connection;
this.ctx = connection.helidonSocket();
this.dataWriter = connection.writer();
Expand Down Expand Up @@ -172,6 +177,24 @@ Http2ClientStream tryStream(Http2StreamConfig config) {
}
}

boolean ping() {
Http2Ping ping = Http2Ping.create();
Http2FrameData frameData = ping.toFrameData();
sendListener.frameHeader(ctx, 0, frameData.header());
sendListener.frame(ctx, 0, ping);
try {
this.writer().writeData(frameData, FlowControl.Outbound.NOOP);
return pingPongSemaphore.tryAcquire(protocolConfig.pingTimeout().toMillis(), TimeUnit.MILLISECONDS);
} catch (UncheckedIOException | InterruptedException e) {
ctx.log(LOGGER, DEBUG, "Ping failed!", e);
return false;
}
}

void pong() {
pingPongSemaphore.release();
}

void updateLastStreamId(int lastStreamId){
this.lastStreamId = lastStreamId;
}
Expand Down Expand Up @@ -247,6 +270,7 @@ private void start(Http2ClientProtocolConfig protocolConfig,
}
ctx.log(LOGGER, TRACE, "Client listener interrupted");
} catch (Throwable t) {
closed = true;
ctx.log(LOGGER, DEBUG, "Failed to handle HTTP/2 client connection", t);
}
});
Expand Down Expand Up @@ -385,6 +409,8 @@ private boolean handle() {
Http2Flag.PingFlags.create(Http2Flag.ACK),
0);
writer.write(new Http2FrameData(header, frame));
} else {
pong();
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Http2ConnectionAttemptResult http2(Http2ClientImpl http2Client,
}
}

return new Http2ConnectionAttemptResult(Result.HTTP_2, stream, null);
return new Http2ConnectionAttemptResult(Result.HTTP_2, conn, stream, null);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -188,6 +188,7 @@ private Http2ConnectionAttemptResult httpX(Http2ClientImpl http2Client,
} else {
result.set(Result.HTTP_1);
return new Http2ConnectionAttemptResult(Result.HTTP_1,
null,
null,
upgradeResponse.response());
}
Expand All @@ -209,6 +210,7 @@ private Http2ConnectionAttemptResult http1(Http2ClientImpl http2Client,
Http2ClientRequestImpl request, ClientUri initialUri,
Function<Http1ClientRequest, Http1ClientResponse> http1EntityHandler) {
return new Http2ConnectionAttemptResult(Result.HTTP_1,
null,
null,
http1EntityHandler.apply(http1Request(
http2Client.webClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,12 @@ default String type() {
*/
@ConfiguredOption("PT0.1S")
Duration flowControlBlockTimeout();

/**
* Timeout for ping probe used for checking healthiness of cached connections.
*
* @return timeout
*/
@ConfiguredOption("PT0.5S")
Duration pingTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.helidon.webclient.api.HttpClientResponse;

record Http2ConnectionAttemptResult(Result result,
Http2ClientConnection connection,
Http2ClientStream stream,
HttpClientResponse response) {
enum Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import io.helidon.webclient.http1.Http1ClientRequest;
import io.helidon.webclient.http1.Http1ClientResponse;

import static java.lang.System.Logger.Level.DEBUG;

final class Http2ConnectionCache {
private static final System.Logger LOGGER = System.getLogger(Http2ConnectionCache.class.getName());
//todo Gracefully close connections in channel cache
private static final Http2ConnectionCache SHARED = create();
private final LruCache<ConnectionKey, Boolean> http2Supported = LruCache.<ConnectionKey, Boolean>builder()
Expand All @@ -47,7 +50,10 @@ boolean supports(ConnectionKey ck) {
}

void remove(ConnectionKey connectionKey) {
cache.remove(connectionKey);
Http2ClientConnectionHandler handler = cache.remove(connectionKey);
if (handler != null) {
handler.close();
}
http2Supported.remove(connectionKey);
}

Expand All @@ -57,17 +63,35 @@ Http2ConnectionAttemptResult newStream(Http2ClientImpl http2Client,
ClientUri initialUri,
Function<Http1ClientRequest, Http1ClientResponse> http1EntityHandler) {

// this statement locks all threads - must not do anything complicated (just create a new instance)
Http2ConnectionAttemptResult result =
cache.computeIfAbsent(connectionKey, Http2ClientConnectionHandler::new)
// this statement may block a single connection key
.newStream(http2Client,
request,
initialUri,
http1EntityHandler);
if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) {
http2Supported.put(connectionKey, true);
for (int i = 0; i < 2; i++) {
// this statement locks all threads - must not do anything complicated (just create a new instance)
Http2ClientConnectionHandler connectionHandler =
cache.computeIfAbsent(connectionKey, Http2ClientConnectionHandler::new);

// this statement may block a single connection key
Http2ConnectionAttemptResult result = connectionHandler.newStream(http2Client,
request,
initialUri,
http1EntityHandler);

if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) {
http2Supported.put(connectionKey, true);
}

if (result.connection() == null) {
// Upgrade failed, its HTTP/1.1
return result;
}

if (!result.connection().closed()
&& result.connection().ping()) {
return result;
}

LOGGER.log(DEBUG, "Cached connection is not healthy!");
remove(connectionKey);
}
return result;

throw new RuntimeException("Unable to create a stream from either cached or new connection!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ public static WebServer startServer(boolean ssl) {
.route(Http2Route.route(GET, "/versionspecific2", (req, res) -> res.send("HTTP/2.0 route\n")))
.route(Http.Method.predicate(GET, POST, PUT),
PathMatchers.create("/multi*"),
(req, res) -> res.send("HTTP/" + req.prologue().protocolVersion()
+ " route " + req.prologue().method() + "\n")))
(req, res) ->
{
req.content().consume(); // Workaround for #7427
res.send("HTTP/" + req.prologue().protocolVersion()
+ " route " + req.prologue().method() + "\n");
}))
.addRouting(WsRouting.builder()
.endpoint("/ws-echo", new EchoWsListener())
.build())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.tests.upgrade.test;

import io.helidon.http.Http;
import io.helidon.logging.common.LogConfig;
import io.helidon.webclient.http2.Http2Client;
import io.helidon.webclient.http2.Http2ClientProtocolConfig;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.http.HttpRouting;
import io.helidon.webserver.http2.Http2Route;

import org.junit.jupiter.api.Test;

import static io.helidon.http.Http.Method.POST;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class SharedHttp2CacheTest {
@Test
void cacheHttp2WithServerRestart() {
LogConfig.configureRuntime();
WebServer webServer = null;
try {
HttpRouting routing = HttpRouting.builder()
.route(Http2Route.route(POST, "/versionspecific", (req, res) -> {
req.content().consume(); // Workaround for #7427
res.send();
}))
.build();

webServer = WebServer.builder()
.routing(routing)
.build()
.start();

int port = webServer.port();

Http2Client webClient = Http2Client.builder()
.protocolConfig(Http2ClientProtocolConfig.builder().priorKnowledge(true).build())
.keepAlive(true)
.baseUri("http://localhost:" + port + "/versionspecific")
.build();

try (var res = webClient.post().submit("WHATEVER")) {
assertThat(res.status(), is(Http.Status.OK_200));
}

webServer.stop();
webServer = WebServer.builder()
.port(port)
.routing(routing)
.build()
.start();

try (var res = webClient.post().submit("WHATEVER")) {
assertThat(res.status(), is(Http.Status.OK_200));
}
} finally {
if (webServer != null) {
webServer.stop();
}
}
}

@Test
void cacheHttp2() {
LogConfig.configureRuntime();
Http.HeaderName clientPortHeader = Http.HeaderNames.create("client-port");
WebServer webServer = null;
try {
HttpRouting routing = HttpRouting.builder()
.route(Http2Route.route(POST, "/versionspecific", (req, res) -> {
req.content().consume();
res.header(clientPortHeader, String.valueOf(req.remotePeer().port()))
.send();
}))
.build();

webServer = WebServer.builder()
.routing(routing)
.build()
.start();

int port = webServer.port();

Http2Client webClient = Http2Client.builder()
.protocolConfig(Http2ClientProtocolConfig.builder().priorKnowledge(true).build())
.keepAlive(true)
.baseUri("http://localhost:" + port + "/versionspecific")
.build();

Integer firstReqClientPort;
try (var res = webClient.post().submit("WHATEVER")) {
firstReqClientPort = res.headers().get(clientPortHeader).value(Integer.TYPE);
assertThat(res.status(), is(Http.Status.OK_200));
}

Integer secondReqClientPort;
try (var res = webClient.post().submit("WHATEVER")) {
secondReqClientPort = res.headers().get(clientPortHeader).value(Integer.TYPE);
assertThat(res.status(), is(Http.Status.OK_200));
}

assertThat("In case of cached connection client port must be the same.",
secondReqClientPort,
is(firstReqClientPort));
} finally {
if (webServer != null) {
webServer.stop();
}
}
}
}
Loading

0 comments on commit 55fae6f

Please sign in to comment.