Skip to content

Commit

Permalink
Fix client backpressure for streaming requests (#11440)
Browse files Browse the repository at this point in the history
ProxyBackpressureTest takes quite long so I made it parallel. Wasn't able to do that with junit unfortunately.

Might fix #11428
  • Loading branch information
yawkat authored Dec 17, 2024
1 parent 68ad1b4 commit f6eb101
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 513 deletions.
3 changes: 3 additions & 0 deletions http-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api libs.managed.netty.handler.proxy

compileOnly libs.managed.netty.incubator.codec.http3
testImplementation libs.managed.netty.incubator.codec.http3

testAnnotationProcessor platform(libs.test.boms.micronaut.validation)
testAnnotationProcessor (libs.micronaut.validation.processor) {
Expand All @@ -46,6 +47,8 @@ dependencies {
testImplementation libs.wiremock
testImplementation libs.logback.classic
testImplementation libs.bcpkix
testImplementation libs.junit.jupiter.params
testImplementation libs.awaitility

testRuntimeOnly(libs.managed.netty.tcnative.boringssl.static) {
artifact {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ final void emitPoolHandle(Sinks.One<PoolHandle> sink, PoolHandle ph) {
}

@Override
public boolean dispatch(PoolSink<PoolHandle> sink) {
public final boolean dispatch(PoolSink<PoolHandle> sink) {
if (!tryEarmarkForRequest()) {
return false;
}
Expand Down Expand Up @@ -1602,7 +1602,7 @@ void fireReadTimeout(ChannelHandlerContext ctx) {
}

@Override
void dispatch0(PoolSink<PoolHandle> sink) {
final void dispatch0(PoolSink<PoolHandle> sink) {
if (!channel.isActive() || windDownConnection) {
// make sure the request isn't dispatched to this connection again
windDownConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2187,7 +2187,7 @@ private <E extends HttpClientException> E decorate(DefaultHttpClient ctx, E exc)
/**
* Used as a holder for the current SSE event.
*/
private static class CurrentEvent {
private static final class CurrentEvent {
byte[] data;
String id;
String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private void transitionToState(ChannelHandlerContext ctx, ReaderState<?> fromSta
if (state != fromState) {
throw new IllegalStateException("Wrong source state");
}
fromState.leave(ctx);
state = nextState;
}

Expand All @@ -110,6 +111,9 @@ void channelReadComplete(ChannelHandlerContext ctx) {
void channelInactive(ChannelHandlerContext ctx) {
exceptionCaught(ctx, new ResponseClosedException("Connection closed before response was received"));
}

void leave(ChannelHandlerContext ctx) {
}
}

/**
Expand Down Expand Up @@ -226,6 +230,7 @@ private final class UnbufferedContent extends ReaderState<HttpContent> implement
private final ResponseListener listener;
private final ChannelHandlerContext streamingContext;
private final StreamingNettyByteBody.SharedBuffer streaming;
private final boolean wasAutoRead;
private long demand;

UnbufferedContent(ResponseListener listener, ChannelHandlerContext ctx, HttpResponse response) {
Expand All @@ -235,6 +240,13 @@ private final class UnbufferedContent extends ReaderState<HttpContent> implement
streaming.setExpectedLengthFrom(response.headers());
}
streamingContext = ctx;
wasAutoRead = ctx.channel().config().isAutoRead();
ctx.channel().config().setAutoRead(false);
}

@Override
void leave(ChannelHandlerContext ctx) {
ctx.channel().config().setAutoRead(wasAutoRead);
}

void add(ByteBuf buf) {
Expand Down Expand Up @@ -278,14 +290,7 @@ public void start() {
}

private void start0() {
if (state != this) {
return;
}

demand++;
if (demand == 1) {
streamingContext.read();
}
onBytesConsumed0(1);
}

@Override
Expand All @@ -306,7 +311,7 @@ private void onBytesConsumed0(long bytesConsumed) {
long newDemand = oldDemand + bytesConsumed;
if (newDemand < oldDemand) {
// overflow
newDemand = oldDemand;
newDemand = Long.MAX_VALUE;
}
this.demand = newDemand;
if (oldDemand <= 0 && newDemand > 0) {
Expand Down

This file was deleted.

Loading

0 comments on commit f6eb101

Please sign in to comment.