Skip to content

Commit

Permalink
Fixes #9121 - Flaky BlockedWritesWithSmallThreadPoolTest.testServerTh…
Browse files Browse the repository at this point in the history
…readsBlockedInWrites().

The test uncovered a larger problem detailed in the issue: the Handler Callback should be non-blocking.

Since all implementations of HttpStream are non-blocking, overridden HttpStream.getInvocationType() to return NON_BLOCKING.

This guarantees that even in case of all server threads blocked, blocked/pending writes can be completed.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 20, 2024
1 parent 942e77c commit f3393be
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
Expand All @@ -45,7 +46,6 @@
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -100,19 +100,19 @@ public void dispose()
}

@Test
@Tag("flaky")
public void testServerThreadsBlockedInWrites() throws Exception
{
int contentLength = 16 * 1024 * 1024;
AtomicReference<AbstractEndPoint> serverEndPointRef = new AtomicReference<>();
start(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
serverEndPointRef.compareAndSet(null, (AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
// Write a large content to cause TCP congestion.
response.write(true, ByteBuffer.wrap(new byte[contentLength]), callback);
// Blocking write a large content to cause TCP congestion.
Content.Sink.write(response, true, ByteBuffer.wrap(new byte[contentLength]));
callback.succeeded();
return true;
}
});
Expand All @@ -138,21 +138,20 @@ public boolean handle(Request request, Response response, Callback callback)
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
try
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
clientBlockLatch.await(5, SECONDS);
Stream.Data data = stream.readData();
data.release();
if (data.frame().isEndStream())
clientDataLatch.countDown();
else
stream.demand();
}
catch (InterruptedException x)
catch (InterruptedException ignored)
{
data.release();
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ static Throwable consumeAvailable(HttpStream stream, HttpConfiguration httpConfi
return CONTENT_NOT_CONSUMED;
}

@Override
default InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}

class Wrapper implements HttpStream
{
private final HttpStream _wrapped;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,6 @@ private boolean lockedCompleteCallback()
@Override
public InvocationType getInvocationType()
{
// TODO review this as it is probably not correct
return _request.getHttpStream().getInvocationType();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1610,12 +1610,6 @@ private void abort(Throwable failure)
{
getEndPoint().close(failure);
}

@Override
public InvocationType getInvocationType()
{
return HttpStream.super.getInvocationType();
}
}

private class TunnelSupportOverHTTP1 implements TunnelSupport
Expand Down

0 comments on commit f3393be

Please sign in to comment.