Skip to content

Commit

Permalink
Get rid of addContent() by making produceContent() return Content ins…
Browse files Browse the repository at this point in the history
…tead.

Make EOF and errors be special content.
Transition to a much simplified FSM by using the needContent() / produceContent() model.
Implement blocking on top of async, this way there is only one FSM.
(Milestone 6)

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Nov 3, 2020
1 parent a4258ec commit 814dc69
Show file tree
Hide file tree
Showing 27 changed files with 2,951 additions and 974 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1377,9 +1377,12 @@ public void testRequestWithBigContentWithSplitBoundary() throws Exception

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(100));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser an goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(150));
assertThat(sslFlushes.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(150));

assertNull(request.get(5, TimeUnit.SECONDS));

Expand All @@ -1399,9 +1402,12 @@ public void testRequestWithBigContentWithSplitBoundary() throws Exception

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(100));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser an goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(150));
assertThat(sslFlushes.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(150));

closeClient(client);
}
Expand Down Expand Up @@ -1596,9 +1602,12 @@ record = proxy.readFromServer();

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(50));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser and goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(70));
assertThat(sslFlushes.get(), Matchers.lessThan(20));
assertThat(httpParses.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(70));

closeClient(client);
}
Expand Down Expand Up @@ -1743,9 +1752,12 @@ record = proxy.readFromServer();

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(50));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser and goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(80));
assertThat(sslFlushes.get(), Matchers.lessThan(20));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(120));

closeClient(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,7 +50,8 @@ public class HttpChannelOverFCGI extends HttpChannel
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class);

private final Queue<HttpInput.Content> _contentQueue = new LinkedList<>();
private Throwable _contentFailure;
private final AutoLock _lock = new AutoLock();
private HttpInput.Content _specialContent;
private final HttpFields.Mutable fields = HttpFields.build();
private final Dispatcher dispatcher;
private String method;
Expand All @@ -64,49 +66,99 @@ public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration,
this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this);
}

void enqueueContent(HttpInput.Content content)
@Override
public boolean onContent(HttpInput.Content content)
{
boolean b = super.onContent(content);

Throwable failure;
synchronized (_contentQueue)
try (AutoLock l = _lock.lock())
{
failure = _contentFailure;
failure = _specialContent == null ? null : _specialContent.getError();
if (failure == null)
_contentQueue.offer(content);
}
if (failure != null)
content.failed(failure);

return b;
}

@Override
public void produceContent()
public boolean needContent()
{
try (AutoLock l = _lock.lock())
{
boolean hasContent = _specialContent != null || !_contentQueue.isEmpty();
if (LOG.isDebugEnabled())
LOG.debug("needContent has content? {}", hasContent);
return hasContent;
}
}

@Override
public HttpInput.Content produceContent()
{
HttpInput.Content content;
synchronized (_contentQueue)
try (AutoLock l = _lock.lock())
{
if (_contentFailure != null)
content = null;
else
content = _contentQueue.poll();
content = _contentQueue.poll();
if (content == null)
content = _specialContent;
}
if (content != null)
onContent(content);
if (LOG.isDebugEnabled())
LOG.debug("produceContent has produced {}", content);
return content;
}

@Override
public void failContent(Throwable failure)
public boolean failAllContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failing all content with {}", (Object)failure);
List<HttpInput.Content> copy;
synchronized (_contentQueue)
try (AutoLock l = _lock.lock())
{
if (_contentFailure == null)
_contentFailure = failure;
else if (_contentFailure != failure)
_contentFailure.addSuppressed(failure);

copy = new ArrayList<>(_contentQueue);
_contentQueue.clear();
}
copy.forEach(content -> content.failed(failure));
copy.forEach(c -> c.failed(failure));
HttpInput.Content lastContent = copy.isEmpty() ? null : copy.get(copy.size() - 1);
boolean atEof = lastContent != null && lastContent.isEof();
if (LOG.isDebugEnabled())
LOG.debug("failed all content, EOF = {}", atEof);
return atEof;
}

@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);

try (AutoLock l = _lock.lock())
{
Throwable error = _specialContent == null ? null : _specialContent.getError();

if (error != null && error != x)
error.addSuppressed(x);
else
_specialContent = new HttpInput.ErrorContent(x);
}

return getRequest().getHttpInput().onContentProducible();
}

@Override
protected boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.EofContent();
}
return getRequest().getHttpInput().onContentProducible();
}

protected void header(HttpField field)
Expand Down Expand Up @@ -179,12 +231,46 @@ protected void dispatch()

public boolean onIdleTimeout(Throwable timeout)
{
boolean handle = getRequest().getHttpInput().onIdleTimeout(timeout);
boolean handle = doOnIdleTimeout(timeout);
if (handle)
execute(this);
return !handle;
}

private boolean doOnIdleTimeout(Throwable x)
{
boolean neverDispatched = getState().isIdle();
boolean waitingForContent;
HttpInput.Content specialContent;
try (AutoLock l = _lock.lock())
{
waitingForContent = _contentQueue.isEmpty() || _contentQueue.peek().remaining() == 0;
specialContent = _specialContent;
}
if ((waitingForContent || neverDispatched) && specialContent == null)
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.ErrorContent(x);
}
return getRequest().getHttpInput().onContentProducible();
}
return false;
}

@Override
public void recycle()
{
try (AutoLock l = _lock.lock())
{
if (!_contentQueue.isEmpty())
throw new AssertionError("unconsumed content: " + _contentQueue);
_specialContent = null;
}
super.recycle();
}

private static class Dispatcher implements Runnable
{
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
copy.put(buffer).flip();
channel.enqueueContent(new HttpInput.Content(copy));
channel.onContent(new HttpInput.Content(copy));
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.channels.WritePendingException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -77,7 +78,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private Listener listener;
private long dataLength;
private long dataDemand;
private Throwable failure;
private boolean dataInitial;
private boolean dataProcess;

Expand Down Expand Up @@ -237,20 +237,18 @@ public boolean isRemotelyClosed()
}

@Override
public void fail(Throwable x)
public boolean failAllData(Throwable x)
{
List<DataEntry> copy;
try (AutoLock l = lock.lock())
{
dataDemand = 0;
failure = x;
while (true)
{
DataEntry dataEntry = dataQueue.poll();
if (dataEntry == null)
break;
dataEntry.callback.failed(x);
}
copy = new ArrayList<>(dataQueue);
dataQueue.clear();
}
copy.forEach(dataEntry -> dataEntry.callback.failed(x));
DataEntry lastDataEntry = copy.isEmpty() ? null : copy.get(copy.size() - 1);
return lastDataEntry != null && lastDataEntry.frame.isEndStream();
}

public boolean isLocallyClosed()
Expand Down Expand Up @@ -418,12 +416,6 @@ private void onData(DataFrame frame, Callback callback)
DataEntry entry = new DataEntry(frame, callback);
try (AutoLock l = lock.lock())
{
if (failure != null)
{
// stream has been failed
callback.failed(failure);
return;
}
dataQueue.offer(entry);
initial = dataInitial;
if (initial)
Expand Down Expand Up @@ -463,8 +455,6 @@ public void demand(long n)
boolean proceed = false;
try (AutoLock l = lock.lock())
{
if (failure != null)
return; // stream has been failed
demand = dataDemand = MathUtils.cappedAdd(dataDemand, n);
if (!dataProcess)
dataProcess = proceed = !dataQueue.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
public abstract class HTTP2StreamEndPoint implements EndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2StreamEndPoint.class);
private static final Throwable EOF = new Throwable();

private final AutoLock lock = new AutoLock();
private final Deque<Entry> dataQueue = new ArrayDeque<>();
Expand Down Expand Up @@ -217,6 +216,9 @@ public int fill(ByteBuffer sink) throws IOException
else
{
entry.succeed();
// WebSocket does not have a backpressure API so you must always demand
// the next frame after succeeding the previous one.
stream.demand(1);
}
return length;
}
Expand Down Expand Up @@ -531,7 +533,7 @@ protected void offerData(DataFrame frame, Callback callback)
{
if (buffer.hasRemaining())
offer(buffer, Callback.from(Callback.NOOP::succeeded, callback::failed), null);
offer(BufferUtil.EMPTY_BUFFER, callback, EOF);
offer(BufferUtil.EMPTY_BUFFER, callback, Entry.EOF);
}
else
{
Expand Down Expand Up @@ -582,8 +584,10 @@ public String toString()
writeState);
}

private class Entry
private static class Entry
{
private static final Throwable EOF = new Throwable();

private final ByteBuffer buffer;
private final Callback callback;
private final Throwable failure;
Expand All @@ -610,7 +614,6 @@ private IOException ioFailure()
private void succeed()
{
callback.succeeded();
stream.demand(1);
}

private void fail(Throwable failure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,21 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isRemotelyClosed();

/**
* Fail all data queued in the stream and reset
* demand to 0.
* @param x the exception to fail the data with.
* @return true if the end of the stream was reached, false otherwise.
*/
boolean failAllData(Throwable x);

/**
* @return whether this stream has been reset (locally or remotely) or has been failed
* @see #isReset()
* @see Listener#onFailure(Stream, int, String, Throwable, Callback)
*/
boolean isResetOrFailed();

void fail(Throwable x);

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
Expand Down
Loading

0 comments on commit 814dc69

Please sign in to comment.