Skip to content

Commit

Permalink
The Http1xClientConnection does close the stream when the response en…
Browse files Browse the repository at this point in the history
…ds ignoring the status of the request. As consequence when a server sends a response and then close the connection, the connection gets recycled and reused to allocate a new stream. Since the connection is still in use, the stream is allocated and parked until the stream that recycled the connection has finished sending the request. When the connection is closed the allocated stream promise is not failed.

This modifies the stream close to happen when both the request and response are ended instead of the response ended only. As consequence a connection is recycled when the request is ended.
  • Loading branch information
vietj committed Mar 13, 2024
1 parent ee23278 commit 874a2ba
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 18 deletions.
37 changes: 19 additions & 18 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,12 @@ private void writeBuffer(Stream s, ByteBuf buff, boolean end, FutureListener<Voi

private void endRequest(Stream s) {
Stream next;
boolean checkLifecycle;
boolean responseEnded;
synchronized (this) {
s.requestEnded = true;
requests.pop();
next = requests.peek();
checkLifecycle = s.responseEnded;
responseEnded = s.responseEnded;
if (metrics != null) {
metrics.requestEnd(s.metric, s.bytesWritten);
}
Expand All @@ -297,7 +298,8 @@ private void endRequest(Stream s) {
if (next != null) {
next.promise.complete((HttpClientStream) next);
}
if (checkLifecycle) {
if (responseEnded) {
s.context.execute(null, s::handleClosed);
checkLifecycle();
}
}
Expand Down Expand Up @@ -354,6 +356,7 @@ private abstract static class Stream {
private Object metric;
private HttpRequestHead request;
private HttpResponseHead response;
private boolean requestEnded;
private boolean responseEnded;
private long bytesRead;
private long bytesWritten;
Expand All @@ -380,7 +383,7 @@ Object trace() {
abstract void handleEnd(LastHttpContent trailer);
abstract void handleWritabilityChanged(boolean writable);
abstract void handleException(Throwable cause);
abstract void handleClosed();
abstract void handleClosed(Throwable err);

}

Expand Down Expand Up @@ -648,9 +651,10 @@ public void reset(Throwable cause) {

private void _reset(Throwable cause) {
boolean removed = conn.reset(this);
context.execute(cause, this::handleException);
if (removed) {
context.execute(this::handleClosed);
context.execute(cause, this::handleClosed);
} else {
context.execute(cause, this::handleException);
}
}

Expand Down Expand Up @@ -703,7 +707,6 @@ void handleChunk(Buffer buff) {

void handleEnd(LastHttpContent trailer) {
queue.write(new HeadersAdaptor(trailer.trailingHeaders()));
tryClose();
}

void handleException(Throwable cause) {
Expand All @@ -713,15 +716,10 @@ void handleException(Throwable cause) {
}

@Override
void handleClosed() {
handleException(HttpUtils.CONNECTION_CLOSED_EXCEPTION);
tryClose();
}

/**
* Attempt to close the stream.
*/
private void tryClose() {
void handleClosed(Throwable err) {
if (err != null) {
handleException(err);
}
if (!closed) {
closed = true;
if (closeHandler != null) {
Expand Down Expand Up @@ -939,6 +937,9 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
}
lastResponseReceivedTimestamp = System.currentTimeMillis();
stream.context.execute(trailer, stream::handleEnd);
if (stream.requestEnded) {
stream.context.execute(null, stream::handleClosed);
}
}

public HttpClientMetrics metrics() {
Expand Down Expand Up @@ -1206,7 +1207,7 @@ protected void handleClosed() {
ws.handleConnectionClosed();
}
for (Stream stream : allocatedStreams) {
stream.context.execute(null, v -> stream.handleClosed());
stream.context.execute(HttpUtils.CONNECTION_CLOSED_EXCEPTION, stream::handleClosed);
}
for (Stream stream : sentStreams) {
if (metrics != null) {
Expand All @@ -1216,7 +1217,7 @@ protected void handleClosed() {
if (tracer != null && trace != null) {
tracer.receiveResponse(stream.context, null, trace, HttpUtils.CONNECTION_CLOSED_EXCEPTION, TagExtractor.empty());
}
stream.context.execute(null, v -> stream.handleClosed());
stream.context.execute(HttpUtils.CONNECTION_CLOSED_EXCEPTION, stream::handleClosed);
}
}

Expand Down
58 changes: 58 additions & 0 deletions src/test/java/io/vertx/core/http/Http1xTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5380,4 +5380,62 @@ private void doTestCanUpgradeToWebSocket(UnaryOperator<RequestOptions> config, b
}));
await();
}

@Test
public void testDoNotRecycleWhenRequestIsNotEnded() throws Exception {
waitFor(2);
server.requestHandler(request -> {
HttpServerResponse resp = request.response();
resp.end().onComplete(onSuccess(v -> {
request.connection().close();
}));
});
startServer(testAddress);
client.close();
client = vertx.createHttpClient(new PoolOptions().setHttp1MaxSize(1));
for (int i = 0;i < 2;i++) {
int val = i;
Future<HttpClientRequest> fut = client.request(requestOptions);
fut.onComplete(ar -> {
switch (val) {
case 0:
assertTrue(ar.succeeded());
HttpClientRequest req = ar.result();
req.sendHead();
req.response().onComplete(onSuccess(resp -> {
complete();
}));
break;
case 1:
assertTrue(ar.succeeded());
complete();
break;
}
});
}
await();
}

@Test
public void testRecycleConnectionOnRequestEnd() throws Exception {
int numRequests = 10;
waitFor(numRequests);
server.requestHandler(request -> {
request.response().end();
});
startServer(testAddress);
client.close();
client = vertx.createHttpClient(new PoolOptions().setHttp1MaxSize(1));
for (int i = 0;i < numRequests;i++) {
Future<HttpClientRequest> fut = client.request(requestOptions);
fut.onComplete(onSuccess(request -> {
request.setChunked(true).sendHead();
request.response().compose(HttpClientResponse::body).onComplete(onSuccess(v -> {
vertx.setTimer(10, id -> request.end());
complete();
}));
}));
}
await();
}
}

0 comments on commit 874a2ba

Please sign in to comment.