Skip to content

Commit

Permalink
Merge pull request #4556 from lorban/http-input
Browse files Browse the repository at this point in the history
HttpInput refactoring
  • Loading branch information
sbordet committed Nov 3, 2020
2 parents 167eded + 814dc69 commit ebea687
Show file tree
Hide file tree
Showing 31 changed files with 3,307 additions and 2,512 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1377,9 +1377,12 @@ public void testRequestWithBigContentWithSplitBoundary() throws Exception

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(100));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser an goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(150));
assertThat(sslFlushes.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(150));

assertNull(request.get(5, TimeUnit.SECONDS));

Expand All @@ -1399,9 +1402,12 @@ public void testRequestWithBigContentWithSplitBoundary() throws Exception

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(100));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser an goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(150));
assertThat(sslFlushes.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(150));

closeClient(client);
}
Expand Down Expand Up @@ -1596,9 +1602,12 @@ record = proxy.readFromServer();

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(50));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser and goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(70));
assertThat(sslFlushes.get(), Matchers.lessThan(20));
assertThat(httpParses.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(70));

closeClient(client);
}
Expand Down Expand Up @@ -1743,9 +1752,12 @@ record = proxy.readFromServer();

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(50));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser and goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(80));
assertThat(sslFlushes.get(), Matchers.lessThan(20));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(120));

closeClient(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.eclipse.jetty.fcgi.server;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -34,15 +38,20 @@
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpChannelOverFCGI extends HttpChannel
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class);

private final Queue<HttpInput.Content> _contentQueue = new LinkedList<>();
private final AutoLock _lock = new AutoLock();
private HttpInput.Content _specialContent;
private final HttpFields.Mutable fields = HttpFields.build();
private final Dispatcher dispatcher;
private String method;
Expand All @@ -57,6 +66,101 @@ public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration,
this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this);
}

@Override
public boolean onContent(HttpInput.Content content)
{
boolean b = super.onContent(content);

Throwable failure;
try (AutoLock l = _lock.lock())
{
failure = _specialContent == null ? null : _specialContent.getError();
if (failure == null)
_contentQueue.offer(content);
}
if (failure != null)
content.failed(failure);

return b;
}

@Override
public boolean needContent()
{
try (AutoLock l = _lock.lock())
{
boolean hasContent = _specialContent != null || !_contentQueue.isEmpty();
if (LOG.isDebugEnabled())
LOG.debug("needContent has content? {}", hasContent);
return hasContent;
}
}

@Override
public HttpInput.Content produceContent()
{
HttpInput.Content content;
try (AutoLock l = _lock.lock())
{
content = _contentQueue.poll();
if (content == null)
content = _specialContent;
}
if (LOG.isDebugEnabled())
LOG.debug("produceContent has produced {}", content);
return content;
}

@Override
public boolean failAllContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failing all content with {}", (Object)failure);
List<HttpInput.Content> copy;
try (AutoLock l = _lock.lock())
{
copy = new ArrayList<>(_contentQueue);
_contentQueue.clear();
}
copy.forEach(c -> c.failed(failure));
HttpInput.Content lastContent = copy.isEmpty() ? null : copy.get(copy.size() - 1);
boolean atEof = lastContent != null && lastContent.isEof();
if (LOG.isDebugEnabled())
LOG.debug("failed all content, EOF = {}", atEof);
return atEof;
}

@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);

try (AutoLock l = _lock.lock())
{
Throwable error = _specialContent == null ? null : _specialContent.getError();

if (error != null && error != x)
error.addSuppressed(x);
else
_specialContent = new HttpInput.ErrorContent(x);
}

return getRequest().getHttpInput().onContentProducible();
}

@Override
protected boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.EofContent();
}
return getRequest().getHttpInput().onContentProducible();
}

protected void header(HttpField field)
{
String name = field.getName();
Expand Down Expand Up @@ -127,12 +231,46 @@ protected void dispatch()

public boolean onIdleTimeout(Throwable timeout)
{
boolean handle = getRequest().getHttpInput().onIdleTimeout(timeout);
boolean handle = doOnIdleTimeout(timeout);
if (handle)
execute(this);
return !handle;
}

private boolean doOnIdleTimeout(Throwable x)
{
boolean neverDispatched = getState().isIdle();
boolean waitingForContent;
HttpInput.Content specialContent;
try (AutoLock l = _lock.lock())
{
waitingForContent = _contentQueue.isEmpty() || _contentQueue.peek().remaining() == 0;
specialContent = _specialContent;
}
if ((waitingForContent || neverDispatched) && specialContent == null)
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.ErrorContent(x);
}
return getRequest().getHttpInput().onContentProducible();
}
return false;
}

@Override
public void recycle()
{
try (AutoLock l = _lock.lock())
{
if (!_contentQueue.isEmpty())
throw new AssertionError("unconsumed content: " + _contentQueue);
_specialContent = null;
}
super.recycle();
}

private static class Dispatcher implements Runnable
{
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.channels.WritePendingException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -235,6 +236,21 @@ public boolean isRemotelyClosed()
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
}

@Override
public boolean failAllData(Throwable x)
{
List<DataEntry> copy;
try (AutoLock l = lock.lock())
{
dataDemand = 0;
copy = new ArrayList<>(dataQueue);
dataQueue.clear();
}
copy.forEach(dataEntry -> dataEntry.callback.failed(x));
DataEntry lastDataEntry = copy.isEmpty() ? null : copy.get(copy.size() - 1);
return lastDataEntry != null && lastDataEntry.frame.isEndStream();
}

public boolean isLocallyClosed()
{
return closeState.get() == CloseState.LOCALLY_CLOSED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ public int fill(ByteBuffer sink) throws IOException
else
{
entry.succeed();
// WebSocket does not have a backpressure API so you must always demand
// the next frame after succeeding the previous one.
stream.demand(1);
}
return length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isRemotelyClosed();

/**
* Fail all data queued in the stream and reset
* demand to 0.
* @param x the exception to fail the data with.
* @return true if the end of the stream was reached, false otherwise.
*/
boolean failAllData(Throwable x);

/**
* @return whether this stream has been reset (locally or remotely) or has been failed
* @see #isReset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ public default void onBeforeData(Stream stream)
* @param callback the callback to complete when the bytes of the DATA frame have been consumed
* @see #onDataDemanded(Stream, DataFrame, Callback)
*/
public void onData(Stream stream, DataFrame frame, Callback callback);
public default void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
}

/**
* <p>Callback method invoked when a DATA frame has been demanded.</p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@startuml

null:
content:
DEMANDING:
EOF:

[*] --> null

null --> DEMANDING : demand()
null --> EOF : eof()
null -left-> null : onTimeout()

DEMANDING --> DEMANDING : demand()
DEMANDING --> content : onContent()\n onTimeout()
DEMANDING --> EOF : eof()

EOF --> EOF : eof()\n onTimeout()

note bottom of content: content1 -> content2 is only\nvalid if content1 is special
note top of content: content -> null only happens\nwhen content is not special
content --> content : onContent()\n onTimeout()
content --> null: take()
content --> EOF: eof()

@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
}

@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
public void onDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
getConnection().onData((IStream)stream, frame, callback);
}
Expand Down
Loading

0 comments on commit ebea687

Please sign in to comment.