Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HttpInput refactoring #4556

Merged
merged 6 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1377,9 +1377,12 @@ public void testRequestWithBigContentWithSplitBoundary() throws Exception

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(100));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser an goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(150));
assertThat(sslFlushes.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(150));

assertNull(request.get(5, TimeUnit.SECONDS));

Expand All @@ -1399,9 +1402,12 @@ public void testRequestWithBigContentWithSplitBoundary() throws Exception

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(100));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser an goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(150));
assertThat(sslFlushes.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(150));

closeClient(client);
}
Expand Down Expand Up @@ -1596,9 +1602,12 @@ record = proxy.readFromServer();

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(50));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser and goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(70));
assertThat(sslFlushes.get(), Matchers.lessThan(20));
assertThat(httpParses.get(), Matchers.lessThan(50));
assertThat(httpParses.get(), Matchers.lessThan(70));

closeClient(client);
}
Expand Down Expand Up @@ -1743,9 +1752,12 @@ record = proxy.readFromServer();

// Check that we did not spin
TimeUnit.MILLISECONDS.sleep(500);
assertThat(sslFills.get(), Matchers.lessThan(50));
// The new HttpInput impl tends to call fill and parse more often than the previous one
// b/c HttpChannel.needContent() does a fill and parse before doing a fill interested;
// this runs the parser and goes to the OS more often but requires less rescheduling.
assertThat(sslFills.get(), Matchers.lessThan(80));
assertThat(sslFlushes.get(), Matchers.lessThan(20));
assertThat(httpParses.get(), Matchers.lessThan(100));
assertThat(httpParses.get(), Matchers.lessThan(120));

closeClient(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.eclipse.jetty.fcgi.server;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -34,15 +38,20 @@
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpChannelOverFCGI extends HttpChannel
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class);

private final Queue<HttpInput.Content> _contentQueue = new LinkedList<>();
lorban marked this conversation as resolved.
Show resolved Hide resolved
private final AutoLock _lock = new AutoLock();
private HttpInput.Content _specialContent;
private final HttpFields.Mutable fields = HttpFields.build();
private final Dispatcher dispatcher;
private String method;
Expand All @@ -57,6 +66,101 @@ public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration,
this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this);
}

@Override
public boolean onContent(HttpInput.Content content)
{
boolean b = super.onContent(content);

Throwable failure;
try (AutoLock l = _lock.lock())
{
failure = _specialContent == null ? null : _specialContent.getError();
if (failure == null)
_contentQueue.offer(content);
}
if (failure != null)
content.failed(failure);

return b;
}

@Override
public boolean needContent()
{
try (AutoLock l = _lock.lock())
{
boolean hasContent = _specialContent != null || !_contentQueue.isEmpty();
if (LOG.isDebugEnabled())
LOG.debug("needContent has content? {}", hasContent);
return hasContent;
}
}

@Override
public HttpInput.Content produceContent()
{
HttpInput.Content content;
try (AutoLock l = _lock.lock())
{
content = _contentQueue.poll();
if (content == null)
content = _specialContent;
}
if (LOG.isDebugEnabled())
LOG.debug("produceContent has produced {}", content);
return content;
}

@Override
public boolean failAllContent(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failing all content with {}", (Object)failure);
List<HttpInput.Content> copy;
try (AutoLock l = _lock.lock())
{
copy = new ArrayList<>(_contentQueue);
_contentQueue.clear();
}
lorban marked this conversation as resolved.
Show resolved Hide resolved
copy.forEach(c -> c.failed(failure));
HttpInput.Content lastContent = copy.isEmpty() ? null : copy.get(copy.size() - 1);
boolean atEof = lastContent != null && lastContent.isEof();
if (LOG.isDebugEnabled())
LOG.debug("failed all content, EOF = {}", atEof);
return atEof;
}

@Override
public boolean failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed " + x);

try (AutoLock l = _lock.lock())
{
Throwable error = _specialContent == null ? null : _specialContent.getError();

if (error != null && error != x)
error.addSuppressed(x);
else
_specialContent = new HttpInput.ErrorContent(x);
}

return getRequest().getHttpInput().onContentProducible();
}

@Override
protected boolean eof()
{
if (LOG.isDebugEnabled())
LOG.debug("received EOF");
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.EofContent();
}
return getRequest().getHttpInput().onContentProducible();
}

protected void header(HttpField field)
{
String name = field.getName();
Expand Down Expand Up @@ -127,12 +231,46 @@ protected void dispatch()

public boolean onIdleTimeout(Throwable timeout)
{
boolean handle = getRequest().getHttpInput().onIdleTimeout(timeout);
boolean handle = doOnIdleTimeout(timeout);
if (handle)
execute(this);
return !handle;
}

private boolean doOnIdleTimeout(Throwable x)
{
boolean neverDispatched = getState().isIdle();
boolean waitingForContent;
HttpInput.Content specialContent;
try (AutoLock l = _lock.lock())
{
waitingForContent = _contentQueue.isEmpty() || _contentQueue.peek().remaining() == 0;
specialContent = _specialContent;
}
lorban marked this conversation as resolved.
Show resolved Hide resolved
if ((waitingForContent || neverDispatched) && specialContent == null)
{
x.addSuppressed(new Throwable("HttpInput idle timeout"));
try (AutoLock l = _lock.lock())
{
_specialContent = new HttpInput.ErrorContent(x);
}
return getRequest().getHttpInput().onContentProducible();
}
return false;
}

@Override
public void recycle()
{
try (AutoLock l = _lock.lock())
{
if (!_contentQueue.isEmpty())
throw new AssertionError("unconsumed content: " + _contentQueue);
lorban marked this conversation as resolved.
Show resolved Hide resolved
_specialContent = null;
}
super.recycle();
}

private static class Dispatcher implements Runnable
{
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.channels.WritePendingException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -235,6 +236,21 @@ public boolean isRemotelyClosed()
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
}

@Override
public boolean failAllData(Throwable x)
{
List<DataEntry> copy;
try (AutoLock l = lock.lock())
{
dataDemand = 0;
copy = new ArrayList<>(dataQueue);
dataQueue.clear();
}
copy.forEach(dataEntry -> dataEntry.callback.failed(x));
DataEntry lastDataEntry = copy.isEmpty() ? null : copy.get(copy.size() - 1);
return lastDataEntry != null && lastDataEntry.frame.isEndStream();
}

public boolean isLocallyClosed()
{
return closeState.get() == CloseState.LOCALLY_CLOSED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ public int fill(ByteBuffer sink) throws IOException
else
{
entry.succeed();
// WebSocket does not have a backpressure API so you must always demand
// the next frame after succeeding the previous one.
stream.demand(1);
lorban marked this conversation as resolved.
Show resolved Hide resolved
}
return length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isRemotelyClosed();

/**
* Fail all data queued in the stream and reset
* demand to 0.
* @param x the exception to fail the data with.
* @return true if the end of the stream was reached, false otherwise.
*/
boolean failAllData(Throwable x);

/**
* @return whether this stream has been reset (locally or remotely) or has been failed
* @see #isReset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ public default void onBeforeData(Stream stream)
* @param callback the callback to complete when the bytes of the DATA frame have been consumed
* @see #onDataDemanded(Stream, DataFrame, Callback)
*/
public void onData(Stream stream, DataFrame frame, Callback callback);
public default void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
}

/**
* <p>Callback method invoked when a DATA frame has been demanded.</p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@startuml

null:
content:
DEMANDING:
EOF:

[*] --> null

null --> DEMANDING : demand()
null --> EOF : eof()
lorban marked this conversation as resolved.
Show resolved Hide resolved
null -left-> null : onTimeout()

DEMANDING --> DEMANDING : demand()
DEMANDING --> content : onContent()\n onTimeout()
DEMANDING --> EOF : eof()

EOF --> EOF : eof()\n onTimeout()

note bottom of content: content1 -> content2 is only\nvalid if content1 is special
note top of content: content -> null only happens\nwhen content is not special
content --> content : onContent()\n onTimeout()
content --> null: take()
content --> EOF: eof()

@enduml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
}

@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
public void onDataDemanded(Stream stream, DataFrame frame, Callback callback)
{
getConnection().onData((IStream)stream, frame, callback);
}
Expand Down
Loading