Skip to content

Commit

Permalink
Close channel on stream handler exception
Browse files Browse the repository at this point in the history
In case a stream handler throws uncaught exception, we should close the
channel and release associated resources to avoid the channel entering a
limbo state. This PR does that.

Resolves: #ES-9537
  • Loading branch information
ywangd committed Oct 24, 2024
1 parent 92ecd36 commit 4a34f28
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.Collection;
Expand Down Expand Up @@ -215,6 +216,29 @@ public void testServerCloseConnectionMidStream() throws Exception {
}
}

public void testServerExceptionMidStream() throws Exception {
try (var ctx = setupClientCtx()) {
var opaqueId = opaqueId(0);

// write half of http request
ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024));
ctx.clientChannel.writeAndFlush(randomContent(1024, false));

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotNull(handler.stream.buf()));
assertFalse(handler.streamClosed);

handler.shouldThrowInsideHandleChunk = true;
handler.stream.next();

assertBusy(() -> {
assertNull(handler.stream.buf());
assertTrue(handler.streamClosed);
});
}
}

// ensure that client's socket buffers data when server is not consuming data
public void testClientBackpressure() throws Exception {
try (var ctx = setupClientCtx()) {
Expand Down Expand Up @@ -598,6 +622,7 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon
RestChannel channel;
boolean recvLast = false;
volatile boolean streamClosed = false;
volatile boolean shouldThrowInsideHandleChunk = false;

ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
this.opaqueId = opaqueId;
Expand All @@ -606,6 +631,10 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon

@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
Transports.assertTransportThread();
if (shouldThrowInsideHandleChunk) {
throw new RuntimeException("simulated exception inside handleChunk");
}
recvChunks.add(new Chunk(chunk, isLast));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.Future;

import org.elasticsearch.core.Releasables;
import org.elasticsearch.http.HttpBody;
Expand Down Expand Up @@ -66,14 +67,19 @@ public void addTracingHandler(ChunkHandler chunkHandler) {
public void next() {
assert closing == false : "cannot request next chunk on closing stream";
assert handler != null : "handler must be set before requesting next chunk";
channel.eventLoop().submit(() -> {
final Future<?> future = channel.eventLoop().submit(() -> {
requested = true;
if (buf == null) {
channel.read();
} else {
send();
}
});
future.addListener(f -> {
if (f.isSuccess() == false) {
channel.eventLoop().submit(() -> channel.pipeline().fireExceptionCaught(f.cause()));
}
});
}

public void handleNettyContent(HttpContent httpContent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,15 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl
request.contentStream().setHandler(new HttpBody.ChunkHandler() {
@Override
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
chunkConsumer.handleChunk(channel, chunk, isLast);
try {
chunkConsumer.handleChunk(channel, chunk, isLast);
} catch (Exception e) {
// Release the chunk if the handler fails before releasing it in exceptional cases
if (chunk.hasReferences()) {
chunk.decRef();
}
throw e;
}
}

@Override
Expand Down

0 comments on commit 4a34f28

Please sign in to comment.