diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java index 39089a1e801f..bd476883a85b 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java @@ -1377,9 +1377,12 @@ public void testRequestWithBigContentWithSplitBoundary() throws Exception // Check that we did not spin TimeUnit.MILLISECONDS.sleep(500); - assertThat(sslFills.get(), Matchers.lessThan(100)); + // The new HttpInput impl tends to call fill and parse more often than the previous one + // b/c HttpChannel.needContent() does a fill and parse before doing a fill interested; + // this runs the parser an goes to the OS more often but requires less rescheduling. + assertThat(sslFills.get(), Matchers.lessThan(150)); assertThat(sslFlushes.get(), Matchers.lessThan(50)); - assertThat(httpParses.get(), Matchers.lessThan(100)); + assertThat(httpParses.get(), Matchers.lessThan(150)); assertNull(request.get(5, TimeUnit.SECONDS)); @@ -1399,9 +1402,12 @@ public void testRequestWithBigContentWithSplitBoundary() throws Exception // Check that we did not spin TimeUnit.MILLISECONDS.sleep(500); - assertThat(sslFills.get(), Matchers.lessThan(100)); + // The new HttpInput impl tends to call fill and parse more often than the previous one + // b/c HttpChannel.needContent() does a fill and parse before doing a fill interested; + // this runs the parser an goes to the OS more often but requires less rescheduling. + assertThat(sslFills.get(), Matchers.lessThan(150)); assertThat(sslFlushes.get(), Matchers.lessThan(50)); - assertThat(httpParses.get(), Matchers.lessThan(100)); + assertThat(httpParses.get(), Matchers.lessThan(150)); closeClient(client); } @@ -1596,9 +1602,12 @@ record = proxy.readFromServer(); // Check that we did not spin TimeUnit.MILLISECONDS.sleep(500); - assertThat(sslFills.get(), Matchers.lessThan(50)); + // The new HttpInput impl tends to call fill and parse more often than the previous one + // b/c HttpChannel.needContent() does a fill and parse before doing a fill interested; + // this runs the parser and goes to the OS more often but requires less rescheduling. + assertThat(sslFills.get(), Matchers.lessThan(70)); assertThat(sslFlushes.get(), Matchers.lessThan(20)); - assertThat(httpParses.get(), Matchers.lessThan(50)); + assertThat(httpParses.get(), Matchers.lessThan(70)); closeClient(client); } @@ -1743,9 +1752,12 @@ record = proxy.readFromServer(); // Check that we did not spin TimeUnit.MILLISECONDS.sleep(500); - assertThat(sslFills.get(), Matchers.lessThan(50)); + // The new HttpInput impl tends to call fill and parse more often than the previous one + // b/c HttpChannel.needContent() does a fill and parse before doing a fill interested; + // this runs the parser and goes to the OS more often but requires less rescheduling. + assertThat(sslFills.get(), Matchers.lessThan(80)); assertThat(sslFlushes.get(), Matchers.lessThan(20)); - assertThat(httpParses.get(), Matchers.lessThan(100)); + assertThat(httpParses.get(), Matchers.lessThan(120)); closeClient(client); } diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java index ba279a9aabf8..5759121ca556 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java @@ -41,6 +41,7 @@ import org.eclipse.jetty.server.HttpInput; import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,8 @@ public class HttpChannelOverFCGI extends HttpChannel private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class); private final Queue _contentQueue = new LinkedList<>(); - private Throwable _contentFailure; + private final AutoLock _lock = new AutoLock(); + private HttpInput.Content _specialContent; private final HttpFields.Mutable fields = HttpFields.build(); private final Dispatcher dispatcher; private String method; @@ -64,49 +66,99 @@ public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration, this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this); } - void enqueueContent(HttpInput.Content content) + @Override + public boolean onContent(HttpInput.Content content) { + boolean b = super.onContent(content); + Throwable failure; - synchronized (_contentQueue) + try (AutoLock l = _lock.lock()) { - failure = _contentFailure; + failure = _specialContent == null ? null : _specialContent.getError(); if (failure == null) _contentQueue.offer(content); } if (failure != null) content.failed(failure); + + return b; } @Override - public void produceContent() + public boolean needContent() + { + try (AutoLock l = _lock.lock()) + { + boolean hasContent = _specialContent != null || !_contentQueue.isEmpty(); + if (LOG.isDebugEnabled()) + LOG.debug("needContent has content? {}", hasContent); + return hasContent; + } + } + + @Override + public HttpInput.Content produceContent() { HttpInput.Content content; - synchronized (_contentQueue) + try (AutoLock l = _lock.lock()) { - if (_contentFailure != null) - content = null; - else - content = _contentQueue.poll(); + content = _contentQueue.poll(); + if (content == null) + content = _specialContent; } - if (content != null) - onContent(content); + if (LOG.isDebugEnabled()) + LOG.debug("produceContent has produced {}", content); + return content; } @Override - public void failContent(Throwable failure) + public boolean failAllContent(Throwable failure) { + if (LOG.isDebugEnabled()) + LOG.debug("failing all content with {}", (Object)failure); List copy; - synchronized (_contentQueue) + try (AutoLock l = _lock.lock()) { - if (_contentFailure == null) - _contentFailure = failure; - else if (_contentFailure != failure) - _contentFailure.addSuppressed(failure); - copy = new ArrayList<>(_contentQueue); _contentQueue.clear(); } - copy.forEach(content -> content.failed(failure)); + copy.forEach(c -> c.failed(failure)); + HttpInput.Content lastContent = copy.isEmpty() ? null : copy.get(copy.size() - 1); + boolean atEof = lastContent != null && lastContent.isEof(); + if (LOG.isDebugEnabled()) + LOG.debug("failed all content, EOF = {}", atEof); + return atEof; + } + + @Override + public boolean failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("failed " + x); + + try (AutoLock l = _lock.lock()) + { + Throwable error = _specialContent == null ? null : _specialContent.getError(); + + if (error != null && error != x) + error.addSuppressed(x); + else + _specialContent = new HttpInput.ErrorContent(x); + } + + return getRequest().getHttpInput().onContentProducible(); + } + + @Override + protected boolean eof() + { + if (LOG.isDebugEnabled()) + LOG.debug("received EOF"); + try (AutoLock l = _lock.lock()) + { + _specialContent = new HttpInput.EofContent(); + } + return getRequest().getHttpInput().onContentProducible(); } protected void header(HttpField field) @@ -179,12 +231,46 @@ protected void dispatch() public boolean onIdleTimeout(Throwable timeout) { - boolean handle = getRequest().getHttpInput().onIdleTimeout(timeout); + boolean handle = doOnIdleTimeout(timeout); if (handle) execute(this); return !handle; } + private boolean doOnIdleTimeout(Throwable x) + { + boolean neverDispatched = getState().isIdle(); + boolean waitingForContent; + HttpInput.Content specialContent; + try (AutoLock l = _lock.lock()) + { + waitingForContent = _contentQueue.isEmpty() || _contentQueue.peek().remaining() == 0; + specialContent = _specialContent; + } + if ((waitingForContent || neverDispatched) && specialContent == null) + { + x.addSuppressed(new Throwable("HttpInput idle timeout")); + try (AutoLock l = _lock.lock()) + { + _specialContent = new HttpInput.ErrorContent(x); + } + return getRequest().getHttpInput().onContentProducible(); + } + return false; + } + + @Override + public void recycle() + { + try (AutoLock l = _lock.lock()) + { + if (!_contentQueue.isEmpty()) + throw new AssertionError("unconsumed content: " + _contentQueue); + _specialContent = null; + } + super.recycle(); + } + private static class Dispatcher implements Runnable { private final AtomicReference state = new AtomicReference<>(State.IDLE); diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java index 820dfc3e3e80..27b1b9ba4659 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java @@ -197,7 +197,7 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { ByteBuffer copy = ByteBuffer.allocate(buffer.remaining()); copy.put(buffer).flip(); - channel.enqueueContent(new HttpInput.Content(copy)); + channel.onContent(new HttpInput.Content(copy)); } return false; } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index a24030c266ad..7040c82407ab 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.channels.WritePendingException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; @@ -77,7 +78,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private Listener listener; private long dataLength; private long dataDemand; - private Throwable failure; private boolean dataInitial; private boolean dataProcess; @@ -237,20 +237,18 @@ public boolean isRemotelyClosed() } @Override - public void fail(Throwable x) + public boolean failAllData(Throwable x) { + List copy; try (AutoLock l = lock.lock()) { dataDemand = 0; - failure = x; - while (true) - { - DataEntry dataEntry = dataQueue.poll(); - if (dataEntry == null) - break; - dataEntry.callback.failed(x); - } + copy = new ArrayList<>(dataQueue); + dataQueue.clear(); } + copy.forEach(dataEntry -> dataEntry.callback.failed(x)); + DataEntry lastDataEntry = copy.isEmpty() ? null : copy.get(copy.size() - 1); + return lastDataEntry != null && lastDataEntry.frame.isEndStream(); } public boolean isLocallyClosed() @@ -418,12 +416,6 @@ private void onData(DataFrame frame, Callback callback) DataEntry entry = new DataEntry(frame, callback); try (AutoLock l = lock.lock()) { - if (failure != null) - { - // stream has been failed - callback.failed(failure); - return; - } dataQueue.offer(entry); initial = dataInitial; if (initial) @@ -463,8 +455,6 @@ public void demand(long n) boolean proceed = false; try (AutoLock l = lock.lock()) { - if (failure != null) - return; // stream has been failed demand = dataDemand = MathUtils.cappedAdd(dataDemand, n); if (!dataProcess) dataProcess = proceed = !dataQueue.isEmpty(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java index 08e3fd78317b..3f6fefc937e0 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java @@ -43,7 +43,6 @@ public abstract class HTTP2StreamEndPoint implements EndPoint { private static final Logger LOG = LoggerFactory.getLogger(HTTP2StreamEndPoint.class); - private static final Throwable EOF = new Throwable(); private final AutoLock lock = new AutoLock(); private final Deque dataQueue = new ArrayDeque<>(); @@ -217,6 +216,9 @@ public int fill(ByteBuffer sink) throws IOException else { entry.succeed(); + // WebSocket does not have a backpressure API so you must always demand + // the next frame after succeeding the previous one. + stream.demand(1); } return length; } @@ -531,7 +533,7 @@ protected void offerData(DataFrame frame, Callback callback) { if (buffer.hasRemaining()) offer(buffer, Callback.from(Callback.NOOP::succeeded, callback::failed), null); - offer(BufferUtil.EMPTY_BUFFER, callback, EOF); + offer(BufferUtil.EMPTY_BUFFER, callback, Entry.EOF); } else { @@ -582,8 +584,10 @@ public String toString() writeState); } - private class Entry + private static class Entry { + private static final Throwable EOF = new Throwable(); + private final ByteBuffer buffer; private final Callback callback; private final Throwable failure; @@ -610,7 +614,6 @@ private IOException ioFailure() private void succeed() { callback.succeeded(); - stream.demand(1); } private void fail(Throwable failure) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 4c3b470204db..6b97474febf1 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -119,6 +119,14 @@ public interface IStream extends Stream, Attachable, Closeable */ boolean isRemotelyClosed(); + /** + * Fail all data queued in the stream and reset + * demand to 0. + * @param x the exception to fail the data with. + * @return true if the end of the stream was reached, false otherwise. + */ + boolean failAllData(Throwable x); + /** * @return whether this stream has been reset (locally or remotely) or has been failed * @see #isReset() @@ -126,8 +134,6 @@ public interface IStream extends Stream, Attachable, Closeable */ boolean isResetOrFailed(); - void fail(Throwable x); - /** *

An ordered list of frames belonging to the same stream.

*/ diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ContentDemander_state.puml b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ContentDemander_state.puml new file mode 100644 index 000000000000..f9ee5b4af5a7 --- /dev/null +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ContentDemander_state.puml @@ -0,0 +1,26 @@ +@startuml + +null: +content: +DEMANDING: +EOF: + +[*] --> null + +null --> DEMANDING : demand() +null --> EOF : eof() +null -left-> null : onTimeout() + +DEMANDING --> DEMANDING : demand() +DEMANDING --> content : onContent()\n onTimeout() +DEMANDING --> EOF : eof() + +EOF --> EOF : eof()\n onTimeout() + +note bottom of content: content1 -> content2 is only\nvalid if content1 is special +note top of content: content -> null only happens\nwhen content is not special +content --> content : onContent()\n onTimeout() +content --> null: take() +content --> EOF: eof() + +@enduml diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index f5f60da88e56..f3b8100042ce 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.eclipse.jetty.http.BadMessageException; @@ -45,7 +46,6 @@ import org.eclipse.jetty.server.HttpInput; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,68 +58,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ private boolean _expect100Continue; private boolean _delayedUntilContent; private boolean _useOutputDirectByteBuffers; - private final RequestContent _requestContent = new RequestContent(); - - private class RequestContent - { - private HttpInput.Content _content; - private boolean _endStream; - private boolean _producing; - private final AutoLock _lock = new AutoLock(); - - void setContent(boolean endStream, HttpInput.Content content) - { - try (AutoLock ignored = _lock.lock()) - { - if (_content != null) - throw new AssertionError("content cannot be queued; stream=" + getStream()); - _endStream = endStream; - _content = content; - _producing = false; - } - } - - private HttpInput.Content takeContent(boolean[] endStreamResult) - { - try (AutoLock ignored = _lock.lock()) - { - if (_content == null) - return null; - HttpInput.Content contentCopy = _content; - endStreamResult[0] = _endStream; - _content = null; - _endStream = false; - return contentCopy; - } - } - - HttpInput.Content takeContentOrDemand(boolean[] endStreamResult) - { - HttpInput.Content content = takeContent(endStreamResult); - if (content != null) - return content; - - boolean demand; - try (AutoLock ignored = _lock.lock()) - { - demand = !_producing; - if (demand) - { - if (_content != null) - throw new AssertionError("_content should be null"); - _producing = true; - } - } - if (demand) - getStream().demand(1); - - return takeContent(endStreamResult); - } - } + private final ContentDemander _contentDemander; public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport) { super(connector, configuration, endPoint, transport); + _contentDemander = new ContentDemander(); } protected IStream getStream() @@ -192,14 +136,15 @@ public Runnable onRequest(HeadersFrame frame) // Delay the demand of DATA frames for CONNECT with :protocol // or for normal requests expecting 100 continue. - if (!connect) + if (connect) { - if (!_expect100Continue) - getStream().demand(1); + if (request.getProtocol() == null) + _contentDemander.demand(false); } - else if (request.getProtocol() == null) + else { - getStream().demand(1); + if (_delayedUntilContent) + _contentDemander.demand(false); } if (LOG.isDebugEnabled()) @@ -271,6 +216,7 @@ public void recycle() { _expect100Continue = false; _delayedUntilContent = false; + _contentDemander.recycle(); super.recycle(); getHttpTransport().recycle(); } @@ -291,38 +237,16 @@ protected void commit(MetaData.Response info) @Override public Runnable onData(DataFrame frame, Callback callback) { - return onRequestContent(frame, callback); - } - - public Runnable onRequestContent(DataFrame frame, final Callback callback) - { - Stream stream = getStream(); - if (stream.isReset()) - { - // Consume previously queued content to - // enlarge the session flow control window. - consumeInput(); - // Consume immediately this content. - callback.succeeded(); - return null; - } - ByteBuffer buffer = frame.getData(); int length = buffer.remaining(); - - if (LOG.isDebugEnabled()) + HttpInput.Content content = new HttpInput.Content(buffer) { - LOG.debug("HTTP2 Request #{}/{}: {} bytes of content", - stream.getId(), - Integer.toHexString(stream.getSession().hashCode()), - length); - } - - boolean wasDelayed = _delayedUntilContent; - _delayedUntilContent = false; + @Override + public boolean isEof() + { + return frame.isEndStream(); + } - _requestContent.setContent(frame.isEndStream(), new HttpInput.Content(buffer) - { @Override public void succeeded() { @@ -340,42 +264,357 @@ public InvocationType getInvocationType() { return callback.getInvocationType(); } - }); - if (getState().isAsync()) + }; + boolean needed = _contentDemander.onContent(content); + boolean handle = onContent(content); + + boolean endStream = frame.isEndStream(); + if (endStream) { - boolean handle = getState().onReadPossible(); - return handle || wasDelayed ? this : null; + boolean handleContent = onContentComplete(); + // This will generate EOF -> must happen before onContentProducible. + boolean handleRequest = onRequestComplete(); + handle |= handleContent | handleRequest; } - else + + boolean woken = needed && getRequest().getHttpInput().onContentProducible(); + handle |= woken; + if (LOG.isDebugEnabled()) { - getRequest().getHttpInput().unblock(); - return wasDelayed ? this : null; + Stream stream = getStream(); + LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, woken: {}, needed: {}, handle: {}", + stream.getId(), + Integer.toHexString(stream.getSession().hashCode()), + length, + endStream ? "last" : "some", + woken, + needed, + handle); } + + boolean wasDelayed = _delayedUntilContent; + _delayedUntilContent = false; + return handle || wasDelayed ? this : null; } - @Override - public void produceContent() + /** + * Demanding content is a marker content that is used to remember that a demand was + * registered into the stream. The {@code needed} flag indicates if the demand originated + * from a call to {@link #produceContent()} when false or {@link #needContent()} + * when true, as {@link HttpInput#onContentProducible()} must only be called + * only when {@link #needContent()} was called. + * Instances of this class must never escape the scope of this channel impl, + * so {@link #produceContent()} must never return one. + */ + private static final class DemandingContent extends HttpInput.SpecialContent { - // HttpInputOverHttp2 calls this method via produceRawContent(); - // this is the equivalent of Http1 parseAndFill(). + private final boolean needed; - boolean[] endStreamResult = new boolean[1]; - HttpInput.Content content = _requestContent.takeContentOrDemand(endStreamResult); - if (content != null) + private DemandingContent(boolean needed) { - onContent(content); - if (endStreamResult[0]) + this.needed = needed; + } + } + + private static final HttpInput.Content EOF = new HttpInput.EofContent(); + private static final HttpInput.Content DEMANDING_NEEDED = new DemandingContent(true); + private static final HttpInput.Content DEMANDING_NOT_NEEDED = new DemandingContent(false); + + private class ContentDemander + { + private final AtomicReference _content = new AtomicReference<>(); + + public void recycle() + { + if (LOG.isDebugEnabled()) + LOG.debug("recycle {}", this); + HttpInput.Content c = _content.getAndSet(null); + if (c != null && !c.isSpecial()) + throw new AssertionError("unconsumed content: " + c); + } + + public HttpInput.Content poll() + { + while (true) { - onContentComplete(); - onRequestComplete(); + HttpInput.Content c = _content.get(); + if (LOG.isDebugEnabled()) + LOG.debug("poll, content = {}", c); + if (c == null || c.isSpecial() || _content.compareAndSet(c, c.isEof() ? EOF : null)) + { + if (LOG.isDebugEnabled()) + LOG.debug("returning current content"); + return c; + } + } + } + + public boolean demand(boolean needed) + { + while (true) + { + HttpInput.Content c = _content.get(); + if (LOG.isDebugEnabled()) + LOG.debug("demand({}), content = {}", needed, c); + if (c instanceof DemandingContent) + { + if (needed && !((DemandingContent)c).needed) + { + if (!_content.compareAndSet(c, DEMANDING_NEEDED)) + { + if (LOG.isDebugEnabled()) + LOG.debug("already demanding but switched needed flag to true"); + continue; + } + } + if (LOG.isDebugEnabled()) + LOG.debug("already demanding, returning false"); + return false; + } + if (c != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("content available, returning true"); + return true; + } + if (_content.compareAndSet(null, needed ? DEMANDING_NEEDED : DEMANDING_NOT_NEEDED)) + { + IStream stream = getStream(); + if (stream == null) + { + _content.set(null); + if (LOG.isDebugEnabled()) + LOG.debug("no content available, switched to demanding but stream is now null"); + return false; + } + if (LOG.isDebugEnabled()) + LOG.debug("no content available, demanding stream {}", stream); + stream.demand(1); + c = _content.get(); + boolean hasContent = !(c instanceof DemandingContent) && c != null; + if (LOG.isDebugEnabled()) + LOG.debug("has content now? {}", hasContent); + return hasContent; + } + } + } + + public boolean onContent(HttpInput.Content content) + { + while (true) + { + HttpInput.Content c = _content.get(); + if (LOG.isDebugEnabled()) + LOG.debug("content delivered by stream: {}, current content: {}", content, c); + if (c instanceof DemandingContent) + { + if (_content.compareAndSet(c, content)) + { + boolean needed = ((DemandingContent)c).needed; + if (LOG.isDebugEnabled()) + LOG.debug("replacing demand content with {} succeeded; returning {}", content, needed); + return needed; + } + } + else if (c == null) + { + if (!content.isSpecial()) + { + // This should never happen, consider as a bug. + content.failed(new IllegalStateException("Non special content without demand : " + content)); + return false; + } + if (_content.compareAndSet(null, content)) + { + if (LOG.isDebugEnabled()) + LOG.debug("replacing null content with {} succeeded", content); + return false; + } + } + else if (c.isEof() && content.isEof() && content.isEmpty()) + { + content.succeeded(); + return true; + } + else if (content.getError() != null) + { + if (c.getError() != null) + { + if (c.getError() != content.getError()) + c.getError().addSuppressed(content.getError()); + return true; + } + if (_content.compareAndSet(c, content)) + { + c.failed(content.getError()); + if (LOG.isDebugEnabled()) + LOG.debug("replacing current content with {} succeeded", content); + return true; + } + } + else if (c.getError() != null && content.remaining() == 0) + { + content.succeeded(); + return true; + } + else + { + // This should never happen, consider as a bug. + content.failed(new IllegalStateException("Cannot overwrite exiting content " + c + " with " + content)); + return false; + } } } + + public boolean onTimeout(Throwable failure) + { + while (true) + { + HttpInput.Content c = _content.get(); + if (LOG.isDebugEnabled()) + LOG.debug("onTimeout with current content: {} and failure = {}", c, failure); + if (!(c instanceof DemandingContent)) + return false; + if (_content.compareAndSet(c, new HttpInput.ErrorContent(failure))) + { + if (LOG.isDebugEnabled()) + LOG.debug("replacing current content with error succeeded"); + return true; + } + } + } + + public void eof() + { + while (true) + { + HttpInput.Content c = _content.get(); + if (LOG.isDebugEnabled()) + LOG.debug("eof with current content: {}", c); + if (c instanceof DemandingContent) + { + if (_content.compareAndSet(c, EOF)) + { + if (LOG.isDebugEnabled()) + LOG.debug("replacing current content with special EOF succeeded"); + return; + } + } + else if (c == null) + { + if (_content.compareAndSet(null, EOF)) + { + if (LOG.isDebugEnabled()) + LOG.debug("replacing null content with special EOF succeeded"); + return; + } + } + else if (c.isEof()) + { + if (LOG.isDebugEnabled()) + LOG.debug("current content already is EOF"); + return; + } + else if (c.remaining() == 0) + { + if (_content.compareAndSet(c, EOF)) + { + if (LOG.isDebugEnabled()) + LOG.debug("replacing current content with special EOF succeeded"); + return; + } + } + else + { + // EOF may arrive with HEADERS frame (e.g. a trailer) that is not flow controlled, so we need to wrap the existing content. + // Covered by HttpTrailersTest.testRequestTrailersWithContent. + HttpInput.Content content = new HttpInput.WrappingContent(c, true); + if (_content.compareAndSet(c, content)) + { + if (LOG.isDebugEnabled()) + LOG.debug("replacing current content with {} succeeded", content); + return; + } + } + } + } + + public boolean failContent(Throwable failure) + { + while (true) + { + HttpInput.Content c = _content.get(); + if (LOG.isDebugEnabled()) + LOG.debug("failing current content: {} with failure: {}", c, failure); + if (c == null) + return false; + if (c.isSpecial()) + return c.isEof(); + if (_content.compareAndSet(c, null)) + { + c.failed(failure); + if (LOG.isDebugEnabled()) + LOG.debug("replacing current content with null succeeded"); + return false; + } + } + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "@" + hashCode() + " _content=" + _content; + } + } + + @Override + public boolean needContent() + { + boolean hasContent = _contentDemander.demand(true); + if (LOG.isDebugEnabled()) + LOG.debug("needContent has content? {}", hasContent); + return hasContent; } @Override - public void failContent(Throwable failure) + public HttpInput.Content produceContent() { - getStream().fail(failure); + HttpInput.Content content = null; + if (_contentDemander.demand(false)) + content = _contentDemander.poll(); + if (LOG.isDebugEnabled()) + LOG.debug("produceContent produced {}", content); + return content; + } + + @Override + public boolean failAllContent(Throwable failure) + { + if (LOG.isDebugEnabled()) + LOG.debug("failing all content with {}", (Object)failure); + boolean atEof = getStream().failAllData(failure); + atEof |= _contentDemander.failContent(failure); + if (LOG.isDebugEnabled()) + LOG.debug("failed all content, reached EOF? {}", atEof); + return atEof; + } + + @Override + public boolean failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("failed " + x); + + _contentDemander.onContent(new HttpInput.ErrorContent(x)); + + return getRequest().getHttpInput().onContentProducible(); + } + + @Override + protected boolean eof() + { + _contentDemander.eof(); + return false; } @Override @@ -393,7 +632,10 @@ public Runnable onTrailer(HeadersFrame frame) System.lineSeparator(), trailers); } + // This will generate EOF -> need to call onContentProducible. boolean handle = onRequestComplete(); + boolean woken = getRequest().getHttpInput().onContentProducible(); + handle |= woken; boolean wasDelayed = _delayedUntilContent; _delayedUntilContent = false; @@ -412,25 +654,30 @@ public boolean onTimeout(Throwable failure, Consumer consumer) final boolean delayed = _delayedUntilContent; _delayedUntilContent = false; - boolean result = isIdle(); - if (result) + boolean reset = isIdle(); + if (reset) consumeInput(); getHttpTransport().onStreamTimeout(failure); - if (getRequest().getHttpInput().onIdleTimeout(failure) || delayed) + + failure.addSuppressed(new Throwable("HttpInput idle timeout")); + _contentDemander.onTimeout(failure); + boolean needed = getRequest().getHttpInput().onContentProducible(); + + if (needed || delayed) { consumer.accept(this::handleWithContext); - result = false; + reset = false; } - return result; + return reset; } @Override public Runnable onFailure(Throwable failure, Callback callback) { getHttpTransport().onStreamFailure(failure); - boolean handle = getRequest().getHttpInput().failed(failure); + boolean handle = failed(failure); consumeInput(); return new FailureTask(failure, callback, handle); } diff --git a/jetty-security/src/test/java/org/eclipse/jetty/security/authentication/SpnegoAuthenticatorTest.java b/jetty-security/src/test/java/org/eclipse/jetty/security/authentication/SpnegoAuthenticatorTest.java index 218769c25191..22d7c0e8950a 100644 --- a/jetty-security/src/test/java/org/eclipse/jetty/security/authentication/SpnegoAuthenticatorTest.java +++ b/jetty-security/src/test/java/org/eclipse/jetty/security/authentication/SpnegoAuthenticatorTest.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpInput; import org.eclipse.jetty.server.HttpOutput; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; @@ -63,13 +64,33 @@ public Server getServer() } @Override - public void produceContent() + public boolean failed(Throwable x) { + return false; } @Override - public void failContent(Throwable failure) + protected boolean eof() { + return false; + } + + @Override + public boolean needContent() + { + return false; + } + + @Override + public HttpInput.Content produceContent() + { + return null; + } + + @Override + public boolean failAllContent(Throwable failure) + { + return false; } @Override @@ -108,13 +129,33 @@ public Server getServer() } @Override - public void produceContent() + public boolean failed(Throwable x) { + return false; + } + + @Override + protected boolean eof() + { + return false; + } + + @Override + public boolean needContent() + { + return false; + } + + @Override + public HttpInput.Content produceContent() + { + return null; } @Override - public void failContent(Throwable failure) + public boolean failAllContent(Throwable failure) { + return false; } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java new file mode 100644 index 000000000000..253097a40be8 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java @@ -0,0 +1,354 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.BadMessageException; +import org.eclipse.jetty.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Non-blocking {@link ContentProducer} implementation. Calling {@link #nextContent()} will never block + * but will return null when there is no available content. + */ +class AsyncContentProducer implements ContentProducer +{ + private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class); + + private final HttpChannel _httpChannel; + private HttpInput.Interceptor _interceptor; + private HttpInput.Content _rawContent; + private HttpInput.Content _transformedContent; + private boolean _error; + private long _firstByteTimeStamp = Long.MIN_VALUE; + private long _rawContentArrived; + + AsyncContentProducer(HttpChannel httpChannel) + { + _httpChannel = httpChannel; + } + + @Override + public void recycle() + { + if (LOG.isDebugEnabled()) + LOG.debug("recycling {}", this); + _interceptor = null; + _rawContent = null; + _transformedContent = null; + _error = false; + _firstByteTimeStamp = Long.MIN_VALUE; + _rawContentArrived = 0L; + } + + @Override + public HttpInput.Interceptor getInterceptor() + { + return _interceptor; + } + + @Override + public void setInterceptor(HttpInput.Interceptor interceptor) + { + this._interceptor = interceptor; + } + + @Override + public int available() + { + HttpInput.Content content = nextTransformedContent(); + int available = content == null ? 0 : content.remaining(); + if (LOG.isDebugEnabled()) + LOG.debug("available = {}", available); + return available; + } + + @Override + public boolean hasContent() + { + boolean hasContent = _rawContent != null; + if (LOG.isDebugEnabled()) + LOG.debug("hasContent = {}", hasContent); + return hasContent; + } + + @Override + public boolean isError() + { + if (LOG.isDebugEnabled()) + LOG.debug("isError = {}", _error); + return _error; + } + + @Override + public void checkMinDataRate() + { + long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate(); + if (LOG.isDebugEnabled()) + LOG.debug("checkMinDataRate [m={},t={}]", minRequestDataRate, _firstByteTimeStamp); + if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE) + { + long period = System.nanoTime() - _firstByteTimeStamp; + if (period > 0) + { + long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1); + if (getRawContentArrived() < minimumData) + { + if (LOG.isDebugEnabled()) + LOG.debug("checkMinDataRate check failed"); + BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, + String.format("Request content data rate < %d B/s", minRequestDataRate)); + if (_httpChannel.getState().isResponseCommitted()) + { + if (LOG.isDebugEnabled()) + LOG.debug("checkMinDataRate aborting channel"); + _httpChannel.abort(bad); + } + failCurrentContent(bad); + throw bad; + } + } + } + } + + @Override + public long getRawContentArrived() + { + if (LOG.isDebugEnabled()) + LOG.debug("getRawContentArrived = {}", _rawContentArrived); + return _rawContentArrived; + } + + @Override + public boolean consumeAll(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("consumeAll [e={}]", (Object)x); + failCurrentContent(x); + // A specific HttpChannel mechanism must be used as the following code + // does not guarantee that the channel will synchronously deliver all + // content it already contains: + // while (true) + // { + // HttpInput.Content content = _httpChannel.produceContent(); + // ... + // } + // as the HttpChannel's produceContent() contract makes no such promise; + // for instance the H2 implementation calls Stream.demand() that may + // deliver the content asynchronously. Tests in StreamResetTest cover this. + boolean atEof = _httpChannel.failAllContent(x); + if (LOG.isDebugEnabled()) + LOG.debug("failed all content of http channel; at EOF? {}", atEof); + return atEof; + } + + private void failCurrentContent(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("failing currently held content [r={},t={}]", _rawContent, _transformedContent, x); + if (_transformedContent != null && !_transformedContent.isSpecial()) + { + if (_transformedContent != _rawContent) + { + _transformedContent.skip(_transformedContent.remaining()); + _transformedContent.failed(x); + } + _transformedContent = null; + } + + if (_rawContent != null && !_rawContent.isSpecial()) + { + _rawContent.skip(_rawContent.remaining()); + _rawContent.failed(x); + _rawContent = null; + } + } + + @Override + public boolean onContentProducible() + { + if (LOG.isDebugEnabled()) + LOG.debug("onContentProducible"); + return _httpChannel.getState().onReadReady(); + } + + @Override + public HttpInput.Content nextContent() + { + HttpInput.Content content = nextTransformedContent(); + if (LOG.isDebugEnabled()) + LOG.debug("nextContent = {}", content); + if (content != null) + _httpChannel.getState().onReadIdle(); + return content; + } + + @Override + public void reclaim(HttpInput.Content content) + { + if (LOG.isDebugEnabled()) + LOG.debug("reclaim {} [t={}]", content, _transformedContent); + if (_transformedContent == content) + { + content.succeeded(); + if (_transformedContent == _rawContent) + _rawContent = null; + _transformedContent = null; + } + } + + @Override + public boolean isReady() + { + HttpInput.Content content = nextTransformedContent(); + if (content == null) + { + _httpChannel.getState().onReadUnready(); + if (_httpChannel.needContent()) + { + content = nextTransformedContent(); + if (LOG.isDebugEnabled()) + LOG.debug("isReady got transformed content after needContent retry {}", content); + if (content != null) + _httpChannel.getState().onContentAdded(); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("isReady has no transformed content after needContent"); + } + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("isReady got transformed content {}", content); + _httpChannel.getState().onContentAdded(); + } + boolean ready = content != null; + if (LOG.isDebugEnabled()) + LOG.debug("isReady = {}", ready); + return ready; + } + + private HttpInput.Content nextTransformedContent() + { + if (LOG.isDebugEnabled()) + LOG.debug("nextTransformedContent [r={},t={}]", _rawContent, _transformedContent); + if (_rawContent == null) + { + _rawContent = produceRawContent(); + if (_rawContent == null) + return null; + } + + if (_transformedContent != null && _transformedContent.isEmpty()) + { + if (_transformedContent != _rawContent) + _transformedContent.succeeded(); + if (LOG.isDebugEnabled()) + LOG.debug("nulling depleted transformed content"); + _transformedContent = null; + } + + while (_transformedContent == null) + { + if (_rawContent.isSpecial()) + { + // TODO does EOF need to be passed to the interceptors? + + _error = _rawContent.getError() != null; + if (LOG.isDebugEnabled()) + LOG.debug("raw content is special (with error = {}), returning it", _error); + return _rawContent; + } + + if (_interceptor != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("using interceptor {} to transform raw content", _interceptor); + _transformedContent = _interceptor.readFrom(_rawContent); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("null interceptor, transformed content = raw content"); + _transformedContent = _rawContent; + } + + if (_transformedContent != null && _transformedContent.isEmpty()) + { + if (_transformedContent != _rawContent) + _transformedContent.succeeded(); + if (LOG.isDebugEnabled()) + LOG.debug("nulling depleted transformed content"); + _transformedContent = null; + } + + if (_transformedContent == null) + { + if (_rawContent.isEmpty()) + { + _rawContent.succeeded(); + _rawContent = null; + if (LOG.isDebugEnabled()) + LOG.debug("nulling depleted raw content"); + _rawContent = produceRawContent(); + if (_rawContent == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("produced null raw content, returning null"); + return null; + } + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("raw content is not empty"); + } + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("transformed content is not empty"); + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("returning transformed content {}", _transformedContent); + return _transformedContent; + } + + private HttpInput.Content produceRawContent() + { + HttpInput.Content content = _httpChannel.produceContent(); + if (content != null) + { + _rawContentArrived += content.remaining(); + if (_firstByteTimeStamp == Long.MIN_VALUE) + _firstByteTimeStamp = System.nanoTime(); + if (LOG.isDebugEnabled()) + LOG.debug("produceRawContent updated rawContentArrived to {} and firstByteTimeStamp to {}", _rawContentArrived, _firstByteTimeStamp); + } + if (LOG.isDebugEnabled()) + LOG.debug("produceRawContent produced {}", content); + return content; + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingContentProducer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingContentProducer.java new file mode 100644 index 000000000000..3dff5c885946 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/BlockingContentProducer.java @@ -0,0 +1,164 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Blocking implementation of {@link ContentProducer}. Calling {@link #nextContent()} will block when + * there is no available content but will never return null. + */ +class BlockingContentProducer implements ContentProducer +{ + private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class); + + private final Semaphore _semaphore = new Semaphore(0); + private final AsyncContentProducer _asyncContentProducer; + + BlockingContentProducer(AsyncContentProducer delegate) + { + _asyncContentProducer = delegate; + } + + @Override + public void recycle() + { + if (LOG.isDebugEnabled()) + LOG.debug("recycling {}", this); + _asyncContentProducer.recycle(); + _semaphore.drainPermits(); + } + + @Override + public int available() + { + return _asyncContentProducer.available(); + } + + @Override + public boolean hasContent() + { + return _asyncContentProducer.hasContent(); + } + + @Override + public boolean isError() + { + return _asyncContentProducer.isError(); + } + + @Override + public void checkMinDataRate() + { + _asyncContentProducer.checkMinDataRate(); + } + + @Override + public long getRawContentArrived() + { + return _asyncContentProducer.getRawContentArrived(); + } + + @Override + public boolean consumeAll(Throwable x) + { + return _asyncContentProducer.consumeAll(x); + } + + @Override + public HttpInput.Content nextContent() + { + while (true) + { + HttpInput.Content content = _asyncContentProducer.nextContent(); + if (LOG.isDebugEnabled()) + LOG.debug("nextContent async producer returned {}", content); + if (content != null) + return content; + + // IFF isReady() returns false then HttpChannel.needContent() has been called, + // thus we know that eventually a call to onContentProducible will come. + if (_asyncContentProducer.isReady()) + { + if (LOG.isDebugEnabled()) + LOG.debug("nextContent async producer is ready, retrying"); + continue; + } + if (LOG.isDebugEnabled()) + LOG.debug("nextContent async producer is not ready, waiting on semaphore {}", _semaphore); + + try + { + _semaphore.acquire(); + } + catch (InterruptedException e) + { + return new HttpInput.ErrorContent(e); + } + } + } + + @Override + public void reclaim(HttpInput.Content content) + { + _asyncContentProducer.reclaim(content); + } + + @Override + public boolean isReady() + { + boolean ready = available() > 0; + if (LOG.isDebugEnabled()) + LOG.debug("isReady = {}", ready); + return ready; + } + + @Override + public HttpInput.Interceptor getInterceptor() + { + return _asyncContentProducer.getInterceptor(); + } + + @Override + public void setInterceptor(HttpInput.Interceptor interceptor) + { + _asyncContentProducer.setInterceptor(interceptor); + } + + @Override + public boolean onContentProducible() + { + // In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state + // DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why + // this method always returns false. + // But async errors can occur while the dispatched thread is NOT blocked reading (i.e.: in state WAITING), + // so the WAITING to WOKEN transition must be done by the error-notifying thread which then has to reschedule the + // dispatched thread after HttpChannelState.asyncError() is called. + // Calling _asyncContentProducer.wakeup() changes the channel state from WAITING to WOKEN which would prevent the + // subsequent call to HttpChannelState.asyncError() from rescheduling the thread. + // AsyncServletTest.testStartAsyncThenClientStreamIdleTimeout() tests this. + if (LOG.isDebugEnabled()) + LOG.debug("onContentProducible releasing semaphore {}", _semaphore); + _semaphore.release(); + return false; + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ContentProducer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ContentProducer.java new file mode 100644 index 000000000000..1a2a477001e9 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ContentProducer.java @@ -0,0 +1,141 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +/** + * ContentProducer is the bridge between {@link HttpInput} and {@link HttpChannel}. + * It wraps a {@link HttpChannel} and uses the {@link HttpChannel#needContent()}, + * {@link HttpChannel#produceContent()} and {@link HttpChannel#failAllContent(Throwable)} + * methods, tracks the current state of the channel's input by updating the + * {@link HttpChannelState} and provides the necessary mechanism to unblock + * the reader thread when using a blocking implementation or to know if the reader thread + * has to be rescheduled when using an async implementation. + */ +public interface ContentProducer +{ + /** + * Reset all internal state and clear any held resources. + */ + void recycle(); + + /** + * Fail all content currently available in this {@link ContentProducer} instance + * as well as in the underlying {@link HttpChannel}. + * + * This call is always non-blocking. + * Doesn't change state. + * @return true if EOF was reached. + */ + boolean consumeAll(Throwable x); + + /** + * Check if the current data rate consumption is above the minimal rate. + * Abort the channel, fail the content currently available and throw a + * BadMessageException(REQUEST_TIMEOUT_408) if the check fails. + */ + void checkMinDataRate(); + + /** + * Get the byte count produced by the underlying {@link HttpChannel}. + * + * This call is always non-blocking. + * Doesn't change state. + * @return the byte count produced by the underlying {@link HttpChannel}. + */ + long getRawContentArrived(); + + /** + * Get the byte count that can immediately be read from this + * {@link ContentProducer} instance or the underlying {@link HttpChannel}. + * + * This call is always non-blocking. + * Doesn't change state. + * @return the available byte count. + */ + int available(); + + /** + * Check if this {@link ContentProducer} instance contains some + * content without querying the underlying {@link HttpChannel}. + * + * This call is always non-blocking. + * Doesn't change state. + * Doesn't query the HttpChannel. + * @return true if this {@link ContentProducer} instance contains content, false otherwise. + */ + boolean hasContent(); + + /** + * Check if the underlying {@link HttpChannel} reached an error content. + * This call is always non-blocking. + * Doesn't change state. + * Doesn't query the HttpChannel. + * @return true if the underlying {@link HttpChannel} reached an error content, false otherwise. + */ + boolean isError(); + + /** + * Get the next content that can be read from or that describes the special condition + * that was reached (error, eof). + * This call may or may not block until some content is available, depending on the implementation. + * The returned content is decoded by the interceptor set with {@link #setInterceptor(HttpInput.Interceptor)} + * or left as-is if no intercept is set. + * After this call, state can be either of UNREADY or IDLE. + * @return the next content that can be read from or null if the implementation does not block + * and has no available content. + */ + HttpInput.Content nextContent(); + + /** + * Free up the content by calling {@link HttpInput.Content#succeeded()} on it + * and updating this instance' internal state. + */ + void reclaim(HttpInput.Content content); + + /** + * Check if this {@link ContentProducer} instance has some content that can be read without blocking. + * If there is some, the next call to {@link #nextContent()} will not block. + * If there isn't any and the implementation does not block, this method will trigger a + * {@link javax.servlet.ReadListener} callback once some content is available. + * This call is always non-blocking. + * After this call, state can be either of UNREADY or READY. + * @return true if some content is immediately available, false otherwise. + */ + boolean isReady(); + + /** + * Get the {@link org.eclipse.jetty.server.HttpInput.Interceptor}. + * @return The {@link org.eclipse.jetty.server.HttpInput.Interceptor}, or null if none set. + */ + HttpInput.Interceptor getInterceptor(); + + /** + * Set the interceptor. + * @param interceptor The interceptor to use. + */ + void setInterceptor(HttpInput.Interceptor interceptor); + + /** + * Wake up the thread that is waiting for the next content. + * After this call, state can be READY. + * @return true if the thread has to be rescheduled, false otherwise. + */ + boolean onContentProducible(); +} + diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 9b9b460fd78e..ca40821980a4 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -124,9 +124,47 @@ private HttpInput newHttpInput(HttpChannelState state) return new HttpInput(state); } - public abstract void produceContent(); + /** + * Notify the channel that content is needed. If some content is immediately available, true is returned and + * {@link #produceContent()} has to be called and will return a non-null object. + * If no content is immediately available, {@link HttpInput#onContentProducible()} is called once some content arrives + * and {@link #produceContent()} can be called without returning null. + * If a failure happens, then {@link HttpInput#onContentProducible()} will be called and an error content will return the + * error on the next call to {@link #produceContent()}. + * @return true if content is immediately available. + */ + public abstract boolean needContent(); - public abstract void failContent(Throwable failure); + /** + * Produce a {@link HttpInput.Content} object with data currently stored within the channel. The produced content + * can be special (meaning calling {@link HttpInput.Content#isSpecial()} returns true) if the channel reached a special + * state, like EOF or an error. + * Once a special content has been returned, all subsequent calls to this method will always return a special content + * of the same kind and {@link #needContent()} will always return true. + * The returned content is "raw", i.e.: not decoded. + * @return a {@link HttpInput.Content} object if one is immediately available without blocking, null otherwise. + */ + public abstract HttpInput.Content produceContent(); + + /** + * Fail all content that is currently stored within the channel. + * @param failure the failure to fail the content with. + * @return true if EOF was reached while failing all content, false otherwise. + */ + public abstract boolean failAllContent(Throwable failure); + + /** + * Fail the channel's input. + * @param failure the failure. + * @return true if the channel needs to be rescheduled. + */ + public abstract boolean failed(Throwable failure); + + /** + * Mark the channel's input as EOF. + * @return true if the channel needs to be rescheduled. + */ + protected abstract boolean eof(); protected HttpOutput newHttpOutput() { @@ -307,19 +345,6 @@ public void recycle() _transientListeners.clear(); } - public void onAsyncWaitForContent() - { - } - - public void onBlockWaitForContent() - { - } - - public void onBlockWaitForContentFailure(Throwable failure) - { - getRequest().getHttpInput().failed(failure); - } - @Override public void run() { @@ -449,18 +474,6 @@ public boolean handle() throw _state.getAsyncContextEvent().getThrowable(); } - case READ_REGISTER: - { - onAsyncWaitForContent(); - break; - } - - case READ_PRODUCE: - { - _request.getHttpInput().asyncReadProduce(); - break; - } - case READ_CALLBACK: { ContextHandler handler = _state.getContextHandler(); @@ -705,12 +718,12 @@ public void onRequest(MetaData.Request request) request.getFields()); } - public void onContent(HttpInput.Content content) + public boolean onContent(HttpInput.Content content) { if (LOG.isDebugEnabled()) LOG.debug("onContent {} {}", this, content); _combinedListener.onRequestContent(_request, content.getByteBuffer()); - _request.getHttpInput().addContent(content); + return false; } public boolean onContentComplete() @@ -733,7 +746,7 @@ public boolean onRequestComplete() { if (LOG.isDebugEnabled()) LOG.debug("onRequestComplete {}", this); - boolean result = _request.getHttpInput().eof(); + boolean result = eof(); _combinedListener.onRequestEnd(_request); return result; } @@ -769,11 +782,6 @@ public void onCompleted() _transport.onCompleted(); } - public boolean onEarlyEOF() - { - return _request.getHttpInput().earlyEOF(); - } - public void onBadMessage(BadMessageException failure) { int status = failure.getCode(); @@ -950,7 +958,7 @@ public HttpOutput.Interceptor getNextInterceptor() return null; } - public void execute(Runnable task) + protected void execute(Runnable task) { _executor.execute(task); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java index 0dee629f9529..f580e325d2bd 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java @@ -40,6 +40,7 @@ import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.EofException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,7 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque { private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHttp.class); private static final HttpField PREAMBLE_UPGRADE_H2C = new HttpField(HttpHeader.UPGRADE, "h2c"); + private static final HttpInput.Content EOF = new HttpInput.EofContent(); private final HttpConnection _httpConnection; private final RequestBuilder _requestBuilder = new RequestBuilder(); private MetaData.Request _metadata; @@ -61,6 +63,14 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque private boolean _expect102Processing = false; private List _complianceViolations; private HttpFields.Mutable _trailers; + // Field _content doesn't need to be volatile nor protected by a lock + // as it is always accessed by the same thread, i.e.: we get notified by onFillable + // that the socket contains new bytes and either schedule an onDataAvailable + // call that is going to read the socket or release the blocking semaphore to wake up + // the blocked reader and make it read the socket. The same logic is true for async + // events like timeout: we get notified and either schedule onError or release the + // blocking semaphore. + private HttpInput.Content _content; public HttpChannelOverHttp(HttpConnection httpConnection, Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport) { @@ -76,15 +86,76 @@ public void abort(Throwable failure) } @Override - public void produceContent() + public boolean needContent() { - ((HttpConnection)getEndPoint().getConnection()).parseAndFillForContent(); + if (_content != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("needContent has content immediately available: {}", _content); + return true; + } + _httpConnection.parseAndFillForContent(); + if (_content != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("needContent has content after parseAndFillForContent: {}", _content); + return true; + } + + if (LOG.isDebugEnabled()) + LOG.debug("needContent has no content"); + _httpConnection.asyncReadFillInterested(); + return false; } @Override - public void failContent(Throwable failure) + public HttpInput.Content produceContent() { - ((HttpConnection)getEndPoint().getConnection()).failContent(failure); + if (_content == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("produceContent has no content, parsing and filling"); + _httpConnection.parseAndFillForContent(); + } + HttpInput.Content result = _content; + if (result != null && !result.isSpecial()) + _content = result.isEof() ? EOF : null; + if (LOG.isDebugEnabled()) + LOG.debug("produceContent produced {}", result); + return result; + } + + @Override + public boolean failAllContent(Throwable failure) + { + if (LOG.isDebugEnabled()) + LOG.debug("failing all content with {}", (Object)failure); + if (_content != null && !_content.isSpecial()) + { + _content.failed(failure); + _content = _content.isEof() ? EOF : null; + if (_content == EOF) + return true; + } + while (true) + { + HttpInput.Content c = produceContent(); + if (c == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("failed all content, EOF was not reached"); + return false; + } + c.skip(c.remaining()); + c.failed(failure); + if (c.isSpecial()) + { + boolean atEof = c.isEof(); + if (LOG.isDebugEnabled()) + LOG.debug("failed all content, EOF = {}", atEof); + return atEof; + } + } } @Override @@ -97,7 +168,7 @@ public void badMessage(BadMessageException failure) if (_metadata == null) _metadata = _requestBuilder.build(); onRequest(_metadata); - getRequest().getHttpInput().earlyEOF(); + markEarlyEOF(); } catch (Exception e) { @@ -108,10 +179,22 @@ public void badMessage(BadMessageException failure) } @Override - public boolean content(ByteBuffer content) + public boolean content(ByteBuffer buffer) { - onContent(_httpConnection.newContent(content)); - _delayedForContent = false; + HttpInput.Content content = _httpConnection.newContent(buffer); + if (_content != null) + { + if (_content.isSpecial()) + content.failed(_content.getError()); + else + throw new AssertionError("Cannot overwrite exiting content " + _content + " with " + content); + } + else + { + _content = content; + onContent(_content); + _delayedForContent = false; + } return true; } @@ -158,14 +241,71 @@ public void earlyEOF() _httpConnection.getGenerator().setPersistent(false); // If we have no request yet, just close if (_metadata == null) + { _httpConnection.close(); - else if (onEarlyEOF() || _delayedForContent) + } + else { - _delayedForContent = false; - handle(); + markEarlyEOF(); + if (_delayedForContent) + { + _delayedForContent = false; + handle(); + } } } + private void markEarlyEOF() + { + if (LOG.isDebugEnabled()) + LOG.debug("received early EOF, content = {}", _content); + EofException failure = new EofException("Early EOF"); + if (_content != null) + _content.failed(failure); + _content = new HttpInput.ErrorContent(failure); + } + + @Override + protected boolean eof() + { + if (LOG.isDebugEnabled()) + LOG.debug("received EOF, content = {}", _content); + if (_content == null) + { + _content = EOF; + } + else + { + HttpInput.Content c = _content; + _content = new HttpInput.WrappingContent(c, true); + } + return false; + } + + @Override + public boolean failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("failed {}, content = {}", x, _content); + + Throwable error = null; + if (_content != null && _content.isSpecial()) + error = _content.getError(); + + if (error != null && error != x) + { + error.addSuppressed(x); + } + else + { + if (_content != null) + _content.failed(x); + _content = new HttpInput.ErrorContent(x); + } + + return getRequest().getHttpInput().onContentProducible(); + } + @Override public EndPoint getTunnellingEndPoint() { @@ -320,24 +460,6 @@ public boolean messageComplete() return onRequestComplete(); } - @Override - public void onAsyncWaitForContent() - { - _httpConnection.asyncReadFillInterested(); - } - - @Override - public void onBlockWaitForContent() - { - _httpConnection.blockingReadFillInterested(); - } - - @Override - public void onBlockWaitForContentFailure(Throwable failure) - { - _httpConnection.blockingReadFailure(failure); - } - @Override public void onComplianceViolation(ComplianceViolation.Mode mode, ComplianceViolation violation, String details) { @@ -445,6 +567,9 @@ public void recycle() _upgrade = null; _trailers = null; _metadata = null; + if (_content != null && !_content.isSpecial()) + throw new AssertionError("unconsumed content: " + _content); + _content = null; } @Override @@ -539,13 +664,24 @@ boolean onIdleTimeout(Throwable timeout) if (_delayedForContent) { _delayedForContent = false; - getRequest().getHttpInput().onIdleTimeout(timeout); + doOnIdleTimeout(timeout); execute(this); return false; } return true; } + private void doOnIdleTimeout(Throwable x) + { + boolean neverDispatched = getState().isIdle(); + boolean waitingForContent = _content == null || _content.remaining() == 0; + if ((waitingForContent || neverDispatched) && (_content == null || !_content.isSpecial())) + { + x.addSuppressed(new Throwable("HttpInput idle timeout")); + _content = new HttpInput.ErrorContent(x); + } + } + private static class RequestBuilder { private final HttpFields.Mutable _fieldsBuilder = HttpFields.build(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 6798ce18bfd3..c08291fbe071 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -107,12 +107,9 @@ private enum RequestState */ private enum InputState { - IDLE, // No isReady; No data - REGISTER, // isReady()==false handling; No data - REGISTERED, // isReady()==false !handling; No data - POSSIBLE, // isReady()==false async read callback called (http/1 only) - PRODUCING, // isReady()==false READ_PRODUCE action is being handled (http/1 only) - READY // isReady() was false, onContentAdded has been called + IDLE, // No isReady; No data + UNREADY, // isReady()==false; No data + READY // isReady() was false; data is available } /* @@ -137,8 +134,6 @@ public enum Action ASYNC_ERROR, // handle an async error ASYNC_TIMEOUT, // call asyncContext onTimeout WRITE_CALLBACK, // handle an IO write callback - READ_REGISTER, // Register for fill interest - READ_PRODUCE, // Check is a read is possible by parsing/filling READ_CALLBACK, // handle an IO read callback COMPLETE, // Complete the response by closing output TERMINATED, // No further actions @@ -465,19 +460,12 @@ private Action nextAction(boolean handling) case ASYNC: switch (_inputState) { - case POSSIBLE: - _inputState = InputState.PRODUCING; - return Action.READ_PRODUCE; + case IDLE: + case UNREADY: + break; case READY: _inputState = InputState.IDLE; return Action.READ_CALLBACK; - case REGISTER: - case PRODUCING: - _inputState = InputState.REGISTERED; - return Action.READ_REGISTER; - case IDLE: - case REGISTERED: - break; default: throw new IllegalStateException(getStatusStringLocked()); @@ -1223,78 +1211,53 @@ public void setAttribute(String name, Object attribute) } /** - * Called to signal async read isReady() has returned false. - * This indicates that there is no content available to be consumed - * and that once the channel enters the ASYNC_WAIT state it will - * register for read interest by calling {@link HttpChannel#onAsyncWaitForContent()} - * either from this method or from a subsequent call to {@link #unhandle()}. + * Called to signal that the channel is ready for a callback. + * + * @return true if woken */ - public void onReadUnready() + public boolean onReadReady() { - boolean interested = false; + boolean woken = false; try (AutoLock l = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("onReadUnready {}", toStringLocked()); + LOG.debug("onReadReady {}", toStringLocked()); switch (_inputState) { - case IDLE: case READY: + _inputState = InputState.READY; + break; + case IDLE: + case UNREADY: + _inputState = InputState.READY; if (_state == State.WAITING) { - interested = true; - _inputState = InputState.REGISTERED; - } - else - { - _inputState = InputState.REGISTER; + woken = true; + _state = State.WOKEN; } break; - case REGISTER: - case REGISTERED: - case POSSIBLE: - case PRODUCING: - break; - default: throw new IllegalStateException(toStringLocked()); } } - - if (interested) - _channel.onAsyncWaitForContent(); + return woken; } - /** - * Called to signal that content is now available to read. - * If the channel is in ASYNC_WAIT state and unready (ie isReady() has - * returned false), then the state is changed to ASYNC_WOKEN and true - * is returned. - * - * @return True IFF the channel was unready and in ASYNC_WAIT state - */ - public boolean onContentAdded() + public boolean onReadEof() { boolean woken = false; try (AutoLock l = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("onContentAdded {}", toStringLocked()); + LOG.debug("onReadEof {}", toStringLocked()); switch (_inputState) { case IDLE: case READY: - break; - - case PRODUCING: - _inputState = InputState.READY; - break; - - case REGISTER: - case REGISTERED: + case UNREADY: _inputState = InputState.READY; if (_state == State.WAITING) { @@ -1310,101 +1273,72 @@ public boolean onContentAdded() return woken; } - /** - * Called to signal that the channel is ready for a callback. - * This is similar to calling {@link #onReadUnready()} followed by - * {@link #onContentAdded()}, except that as content is already - * available, read interest is never set. - * - * @return true if woken - */ - public boolean onReadReady() + public void onContentAdded() { - boolean woken = false; try (AutoLock l = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("onReadReady {}", toStringLocked()); + LOG.debug("onContentAdded {}", toStringLocked()); switch (_inputState) { case IDLE: + case UNREADY: + case READY: _inputState = InputState.READY; - if (_state == State.WAITING) - { - woken = true; - _state = State.WOKEN; - } break; default: throw new IllegalStateException(toStringLocked()); } } - return woken; } - /** - * Called to indicate that more content may be available, - * but that a handling thread may need to produce (fill/parse) - * it. Typically called by the async read success callback. - * - * @return {@code true} if more content may be available - */ - public boolean onReadPossible() + public void onReadIdle() { - boolean woken = false; try (AutoLock l = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("onReadPossible {}", toStringLocked()); + LOG.debug("onReadIdle {}", toStringLocked()); switch (_inputState) { - case REGISTERED: - _inputState = InputState.POSSIBLE; - if (_state == State.WAITING) - { - woken = true; - _state = State.WOKEN; - } - break; - - case IDLE: + case UNREADY: case READY: - case REGISTER: + case IDLE: + _inputState = InputState.IDLE; break; default: throw new IllegalStateException(toStringLocked()); } } - return woken; } /** - * Called to signal that a read has read -1. - * Will wake if the read was called while in ASYNC_WAIT state - * - * @return {@code true} if woken + * Called to indicate that more content may be available, + * but that a handling thread may need to produce (fill/parse) + * it. Typically called by the async read success callback. */ - public boolean onReadEof() + public void onReadUnready() { - boolean woken = false; try (AutoLock l = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("onEof {}", toStringLocked()); + LOG.debug("onReadUnready {}", toStringLocked()); - // Force read ready so onAllDataRead can be called - _inputState = InputState.READY; - if (_state == State.WAITING) + switch (_inputState) { - woken = true; - _state = State.WOKEN; + case IDLE: + case UNREADY: + case READY: // READY->UNREADY is needed by AsyncServletIOTest.testStolenAsyncRead + _inputState = InputState.UNREADY; + break; + + default: + throw new IllegalStateException(toStringLocked()); } } - return woken; } public boolean onWritePossible() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState_input.puml b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState_input.puml new file mode 100644 index 000000000000..13eb5dc325ba --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState_input.puml @@ -0,0 +1,84 @@ +@startuml +title HttpChannelState + +note top of onReadReady_inputState: onReadReady + +state "input state" as onReadReady_inputState { + state "IDLE" as onReadReady_IDLE + state "UNREADY" as onReadReady_UNREADY + state "READY" as onReadReady_READY + + state "channel state" as onReadReady_channelState { + state "WAITING" as onReadReady_WAITING + state "WOKEN" as onReadReady_WOKEN + onReadReady_WAITING --> onReadReady_WOKEN + } + + onReadReady_IDLE --> onReadReady_channelState + onReadReady_UNREADY --> onReadReady_channelState + + onReadReady_channelState --> onReadReady_READY + onReadReady_READY --> onReadReady_READY +} + + +note top of onReadEof_inputState: onReadEof + +state "input state" as onReadEof_inputState { + state "IDLE" as onReadEof_IDLE + state "UNREADY" as onReadEof_UNREADY + state "READY" as onReadEof_READY + + state "channel state" as onReadEof_channelState { + state "WAITING" as onReadEof_WAITING + state "WOKEN" as onReadEof_WOKEN + onReadEof_WAITING --> onReadEof_WOKEN + } + + onReadEof_IDLE --> onReadEof_channelState + onReadEof_UNREADY --> onReadEof_channelState + onReadEof_READY --> onReadEof_channelState + + onReadEof_channelState --> onReadEof_READY +} + + +note top of onReadIdle_inputState: onReadIdle + +state "input state" as onReadIdle_inputState { + state "IDLE" as onReadIdle_IDLE + state "UNREADY" as onReadIdle_UNREADY + state "READY" as onReadIdle_READY + + onReadIdle_IDLE --> onReadIdle_IDLE + onReadIdle_UNREADY --> onReadIdle_IDLE + onReadIdle_READY --> onReadIdle_IDLE +} + + +note top of onReadUnready_inputState: onReadUnready + +state "input state" as onReadUnready_inputState { + state "IDLE" as onReadUnready_IDLE + state "UNREADY" as onReadUnready_UNREADY + state "READY" as onReadUnready_READY + + onReadUnready_IDLE --> onReadUnready_UNREADY + onReadUnready_UNREADY --> onReadUnready_UNREADY + onReadUnready_READY --> onReadUnready_UNREADY +} + + +note top of onContentAdded_inputState: onContentAdded + +state "input state" as onContentAdded_inputState { + state "IDLE" as onContentAdded_IDLE + state "UNREADY" as onContentAdded_UNREADY + state "READY" as onContentAdded_READY + + onContentAdded_IDLE --> onContentAdded_READY + onContentAdded_UNREADY --> onContentAdded_READY + onContentAdded_READY --> onContentAdded_READY +} + +@enduml diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 5d0cc2755f81..add0c44b9d37 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -33,8 +33,6 @@ import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpParser; -import org.eclipse.jetty.http.HttpParser.RequestHandler; -import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.io.AbstractConnection; @@ -69,7 +67,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private final HttpParser _parser; private final AtomicInteger _contentBufferReferences = new AtomicInteger(); private volatile ByteBuffer _requestBuffer = null; - private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback(); private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback(); private final SendCallback _sendCallback = new SendCallback(); private final boolean _recordHttpComplianceViolations; @@ -321,27 +318,13 @@ else if (filled < 0) */ void parseAndFillForContent() { - // parseRequestBuffer() must always be called after fillRequestBuffer() otherwise this method doesn't trigger EOF/earlyEOF - // which breaks AsyncRequestReadTest.testPartialReadThenShutdown() + // When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method + // doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown() int filled = Integer.MAX_VALUE; while (_parser.inContentState()) { boolean handled = parseRequestBuffer(); - if (handled || filled <= 0 || _input.hasContent()) - break; - filled = fillRequestBuffer(); - } - } - - void failContent(Throwable failure) - { - int filled = Integer.MAX_VALUE; - while (_parser.inContentState()) - { - // The parser is going generate and forward contents to the HttpInput - // so it's up to it to fail them individually. - parseRequestBuffer(); - if (filled <= 0 || _input.hasContent()) + if (handled || filled <= 0) break; filled = fillRequestBuffer(); } @@ -614,25 +597,7 @@ public void push(org.eclipse.jetty.http.MetaData.Request request) public void asyncReadFillInterested() { - getEndPoint().fillInterested(_asyncReadCallback); - } - - public void blockingReadFillInterested() - { - // We try fillInterested here because of SSL and - // spurious wakeups. With blocking reads, we read in a loop - // that tries to read/parse content and blocks waiting if there is - // none available. The loop can be woken up by incoming encrypted - // bytes, which due to SSL might not produce any decrypted bytes. - // Thus the loop needs to register fill interest again. However if - // the loop is woken up spuriously, then the register interest again - // can result in a pending read exception, unless we use tryFillInterested. - getEndPoint().tryFillInterested(_blockingReadCallback); - } - - public void blockingReadFailure(Throwable e) - { - _blockingReadCallback.failed(e); + getEndPoint().tryFillInterested(_asyncReadCallback); } @Override @@ -687,43 +652,29 @@ public void failed(Throwable x) } } - private class BlockingReadCallback implements Callback + private class AsyncReadCallback implements Callback { @Override public void succeeded() { - _input.unblock(); + if (_channel.getRequest().getHttpInput().onContentProducible()) + _channel.handle(); } @Override public void failed(Throwable x) { - _input.failed(x); - } - - @Override - public InvocationType getInvocationType() - { - // This callback does not block, rather it wakes up the - // thread that is blocked waiting on the read. - return InvocationType.NON_BLOCKING; - } - } - - private class AsyncReadCallback implements Callback - { - @Override - public void succeeded() - { - if (_channel.getState().onReadPossible()) + if (_channel.failed(x)) _channel.handle(); } @Override - public void failed(Throwable x) + public InvocationType getInvocationType() { - if (_input.failed(x)) - _channel.handle(); + // This callback does not block when the HttpInput is in blocking mode, + // rather it wakes up the thread that is blocked waiting on the read; + // but it can if it is in async mode, hence the varying InvocationType. + return _channel.getRequest().getHttpInput().isAsync() ? InvocationType.BLOCKING : InvocationType.NON_BLOCKING; } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index e9dceaf08f7c..d1aaa1d5c8f2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -21,21 +21,15 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; -import org.eclipse.jetty.http.BadMessageException; -import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.Destroyable; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** *

While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link @@ -43,28 +37,23 @@ */ public class HttpInput extends ServletInputStream implements Runnable { - private static final Logger LOG = Log.getLogger(HttpInput.class); + private static final Logger LOG = LoggerFactory.getLogger(HttpInput.class); private final byte[] _oneByteBuffer = new byte[1]; + private final BlockingContentProducer _blockingContentProducer; + private final AsyncContentProducer _asyncContentProducer; private final HttpChannelState _channelState; - private final ContentProducer _contentProducer = new ContentProducer(); - // This semaphore is only used in blocking mode, and a standard lock with a condition variable - // cannot work here because there is a race condition between the _contentProducer.read() call - // and the blockForContent() call: content can be produced any time between these two calls so - // the call to unblock() done by the content-producing thread to wake up the user thread executing read() - // must 'remember' the unblock() call, such as if it happens before the thread executing read() reaches the - // blockForContent() method, it will not get stuck in it forever waiting for an unblock() call it missed. - private final Semaphore _semaphore = new Semaphore(0); - - private Eof _eof = Eof.NOT_YET; - private Throwable _error; + private ContentProducer _contentProducer; + private boolean _consumedEof; private ReadListener _readListener; - private long _firstByteTimeStamp = Long.MIN_VALUE; public HttpInput(HttpChannelState state) { _channelState = state; + _asyncContentProducer = new AsyncContentProducer(state.getHttpChannel()); + _blockingContentProducer = new BlockingContentProducer(_asyncContentProducer); + _contentProducer = _blockingContentProducer; } /* HttpInput */ @@ -72,12 +61,11 @@ public HttpInput(HttpChannelState state) public void recycle() { if (LOG.isDebugEnabled()) - LOG.debug("recycle"); - _contentProducer.recycle(); - _eof = Eof.NOT_YET; - _error = null; + LOG.debug("recycle {}", this); + _blockingContentProducer.recycle(); + _contentProducer = _blockingContentProducer; + _consumedEof = false; _readListener = null; - _firstByteTimeStamp = Long.MIN_VALUE; } /** @@ -95,6 +83,8 @@ public Interceptor getInterceptor() */ public void setInterceptor(Interceptor interceptor) { + if (LOG.isDebugEnabled()) + LOG.debug("setting interceptor to {}", interceptor); _contentProducer.setInterceptor(interceptor); } @@ -108,157 +98,52 @@ public void addInterceptor(Interceptor interceptor) { Interceptor currentInterceptor = _contentProducer.getInterceptor(); if (currentInterceptor == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("adding single interceptor: {}", interceptor); _contentProducer.setInterceptor(interceptor); + } else - _contentProducer.setInterceptor(new ChainedInterceptor(currentInterceptor, interceptor)); - } - - /** - * Called by channel when asynchronous IO needs to produce more content - */ - public void asyncReadProduce() - { - if (LOG.isDebugEnabled()) - LOG.debug("asyncReadProduce {}", _contentProducer); - produceContent(); - } - - /** - * Adds some content to this input stream. - * - * @param content the content to add - */ - public void addContent(Content content) - { - if (LOG.isDebugEnabled()) - LOG.debug("addContent {} {}", content, _contentProducer); - if (_firstByteTimeStamp == Long.MIN_VALUE) { - _firstByteTimeStamp = System.nanoTime(); - if (_firstByteTimeStamp == Long.MIN_VALUE) - _firstByteTimeStamp++; + ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor); + if (LOG.isDebugEnabled()) + LOG.debug("adding chained interceptor: {}", chainedInterceptor); + _contentProducer.setInterceptor(chainedInterceptor); } - _contentProducer.addContent(content); - if (isAsync() && _contentProducer.available(this::produceContent) > 0) - _channelState.onContentAdded(); - } - - public boolean hasContent() - { - return _contentProducer.hasRawContent(); - } - - // There are 3 sources which can call this method in parallel: - // 1) HTTP2 read() that has a demand served on the app thread; - // 2) HTTP2 read() that has a demand served by a server thread; - // 3) onIdleTimeout called by a server thread; - // which means the semaphore can have up to 2 permits. - public void unblock() - { - if (LOG.isDebugEnabled()) - LOG.debug("signalling blocked thread to wake up"); - if (!isError() && !_eof.isEof() && _semaphore.availablePermits() > 1) - throw new AssertionError("Only one thread should call unblock and only if we are blocked"); - _semaphore.release(); - } - - public long getContentLength() - { - return _contentProducer.getRawContentArrived(); } public long getContentReceived() { - return getContentLength(); - } - - /** - * This method should be called to signal that an EOF has been detected before all the expected content arrived. - *

- * Typically this will result in an EOFException being thrown from a subsequent read rather than a -1 return. - * - * @return true if content channel woken for read - */ - public boolean earlyEOF() - { - if (LOG.isDebugEnabled()) - LOG.debug("received early EOF"); - _eof = Eof.EARLY_EOF; - if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - return false; - } - - /** - * This method should be called to signal that all the expected content arrived. - * - * @return true if content channel woken for read - */ - public boolean eof() - { - if (LOG.isDebugEnabled()) - LOG.debug("received EOF"); - _eof = Eof.EOF; - if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - return false; + return _contentProducer.getRawContentArrived(); } public boolean consumeAll() { if (LOG.isDebugEnabled()) LOG.debug("consume all"); - _contentProducer.consumeTransformedContent(this::failContent, new IOException("Unconsumed content")); - if (_eof.isEof()) - _eof = Eof.CONSUMED_EOF; + boolean atEof = _contentProducer.consumeAll(new IOException("Unconsumed content")); + if (atEof) + _consumedEof = true; if (isFinished()) return !isError(); - _eof = Eof.EARLY_EOF; return false; } public boolean isError() { - return _error != null; + boolean error = _contentProducer.isError(); + if (LOG.isDebugEnabled()) + LOG.debug("isError = {}", error); + return error; } public boolean isAsync() - { - return _readListener != null; - } - - public boolean onIdleTimeout(Throwable x) - { - boolean neverDispatched = _channelState.isIdle(); - boolean waitingForContent = _contentProducer.available(this::produceContent) == 0 && !_eof.isEof(); - if ((waitingForContent || neverDispatched) && !isError()) - { - x.addSuppressed(new Throwable("HttpInput idle timeout")); - _error = x; - if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - } - return false; - } - - public boolean failed(Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("failed " + x); - if (_error != null && _error != x) - _error.addSuppressed(x); - else - _error = x; - - if (isAsync()) - return _channelState.onContentAdded(); - unblock(); - return false; + LOG.debug("isAsync read listener = " + _readListener); + return _readListener != null; } /* ServletInputStream */ @@ -266,7 +151,7 @@ public boolean failed(Throwable x) @Override public boolean isFinished() { - boolean finished = !_contentProducer.hasRawContent() && _eof.isConsumed(); + boolean finished = _consumedEof; if (LOG.isDebugEnabled()) LOG.debug("isFinished? {}", finished); return finished; @@ -275,22 +160,24 @@ public boolean isFinished() @Override public boolean isReady() { - // calling _contentProducer.available() might change the _eof state, so the following test order matters - if (_contentProducer.available(this::produceContent) > 0 || _eof.isEof()) + boolean ready = _contentProducer.isReady(); + if (!ready) { if (LOG.isDebugEnabled()) - LOG.debug("isReady? true"); - return true; + LOG.debug("isReady? false"); + return false; } + if (LOG.isDebugEnabled()) - LOG.debug("isReady? false"); - _channelState.onReadUnready(); - return false; + LOG.debug("isReady? true"); + return true; } @Override public void setReadListener(ReadListener readListener) { + if (LOG.isDebugEnabled()) + LOG.debug("setting read listener to {}", readListener); if (_readListener != null) throw new IllegalStateException("ReadListener already set"); _readListener = Objects.requireNonNull(readListener); @@ -298,40 +185,15 @@ public void setReadListener(ReadListener readListener) if (!_channelState.isAsyncStarted()) throw new IllegalStateException("Async not started"); - if (LOG.isDebugEnabled()) - LOG.debug("setReadListener error=" + _error + " eof=" + _eof + " " + _contentProducer); - boolean woken; - if (isError()) - { - woken = _channelState.onReadReady(); - } - else - { - if (_contentProducer.available(this::produceContent) > 0) - { - woken = _channelState.onReadReady(); - } - else if (_eof.isEof()) - { - woken = _channelState.onReadEof(); - } - else - { - _channelState.onReadUnready(); - woken = false; - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("setReadListener woken=" + woken); - if (woken) - scheduleReadListenerNotification(); + _contentProducer = _asyncContentProducer; + // trigger content production + if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN + scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead } - private void scheduleReadListenerNotification() + public boolean onContentProducible() { - HttpChannel channel = _channelState.getHttpChannel(); - channel.execute(channel); + return _contentProducer.onContentProducible(); } @Override @@ -346,117 +208,76 @@ public int read() throws IOException @Override public int read(byte[] b, int off, int len) throws IOException { - // Calculate minimum request rate for DOS protection - long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); - if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE) - { - long period = System.nanoTime() - _firstByteTimeStamp; - if (period > 0) - { - long minimumData = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1); - if (_contentProducer.getRawContentArrived() < minimumData) - { - BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, - String.format("Request content data rate < %d B/s", minRequestDataRate)); - if (_channelState.isResponseCommitted()) - _channelState.getHttpChannel().abort(bad); - throw bad; - } - } - } + // Calculate minimum request rate for DoS protection + _contentProducer.checkMinDataRate(); - while (true) + Content content = _contentProducer.nextContent(); + if (content == null) + throw new IllegalStateException("read on unready input"); + if (!content.isSpecial()) { - // The semaphore's permits must be drained before we call read() because: - // 1) _contentProducer.read() may call unblock() which enqueues a permit even if the content was produced - // by the exact thread that called HttpInput.read(), hence leaving around an unconsumed permit that would - // be consumed the next time HttpInput.read() is called, mistakenly believing that content was produced. - // 2) HTTP2 demand served asynchronously does call unblock which does enqueue a permit in the semaphore; - // this permit would then be mistakenly consumed by the next call to blockForContent() once all the produced - // content got consumed. - if (!isAsync()) - _semaphore.drainPermits(); - int read = _contentProducer.read(this::produceContent, b, off, len); + int read = content.get(b, off, len); if (LOG.isDebugEnabled()) LOG.debug("read produced {} byte(s)", read); - if (read > 0) - return read; - - if (LOG.isDebugEnabled()) - LOG.debug("read error = " + _error); - if (_error != null) - throw new IOException(_error); + if (content.isEmpty()) + _contentProducer.reclaim(content); + return read; + } - if (LOG.isDebugEnabled()) - LOG.debug("read EOF = {}", _eof); - if (_eof.isEarly()) - throw new EofException("Early EOF"); + Throwable error = content.getError(); + if (LOG.isDebugEnabled()) + LOG.debug("read error = " + error); + if (error != null) + { + if (error instanceof IOException) + throw (IOException)error; + throw new IOException(error); + } + if (content.isEof()) + { if (LOG.isDebugEnabled()) - LOG.debug("read async = {}", isAsync()); - if (!isAsync()) - { - if (_eof.isEof()) - { - _eof = Eof.CONSUMED_EOF; - if (LOG.isDebugEnabled()) - LOG.debug("read on EOF, switching to CONSUMED_EOF and returning"); - return -1; - } - if (LOG.isDebugEnabled()) - LOG.debug("read blocked"); - blockForContent(); - if (LOG.isDebugEnabled()) - LOG.debug("read unblocked"); - } - else - { - if (_eof.isEof()) - { - _eof = Eof.CONSUMED_EOF; - boolean wasInAsyncWait = _channelState.onReadEof(); - if (LOG.isDebugEnabled()) - LOG.debug("async read on EOF (was in async wait? {}), switching to CONSUMED_EOF and returning", wasInAsyncWait); - if (wasInAsyncWait) - scheduleReadListenerNotification(); - return -1; - } - else - { - //TODO returning 0 breaks the InputStream contract. Shouldn't IOException be thrown instead? - _channelState.getHttpChannel().onAsyncWaitForContent(); // switches on fill interested - return 0; - } - } + LOG.debug("read at EOF, setting consumed EOF to true"); + _consumedEof = true; + // If EOF do we need to wake for allDataRead callback? + if (onContentProducible()) + scheduleReadListenerNotification(); + return -1; } + + throw new AssertionError("no data, no error and not EOF"); + } + + private void scheduleReadListenerNotification() + { + HttpChannel channel = _channelState.getHttpChannel(); + channel.execute(channel); + } + + /** + * Check if this HttpInput instance has content stored internally, without fetching/parsing + * anything from the underlying channel. + * @return true if the input contains content, false otherwise. + */ + public boolean hasContent() + { + // Do not call _contentProducer.available() as it calls HttpChannel.produceContent() + // which is forbidden by this method's contract. + boolean hasContent = _contentProducer.hasContent(); + if (LOG.isDebugEnabled()) + LOG.debug("hasContent = {}", hasContent); + return hasContent; } @Override public int available() { - int available = _contentProducer.available(this::produceContent); + int available = _contentProducer.available(); if (LOG.isDebugEnabled()) LOG.debug("available = {}", available); return available; } - private void blockForContent() - { - try - { - _channelState.getHttpChannel().onBlockWaitForContent(); // switches on fill interested - if (LOG.isDebugEnabled()) - LOG.debug("waiting for signal to wake up"); - _semaphore.acquire(); - if (LOG.isDebugEnabled()) - LOG.debug("signalled to wake up"); - } - catch (Throwable x) - { - _channelState.getHttpChannel().onBlockWaitForContentFailure(x); - } - } - /* Runnable */ /* @@ -466,23 +287,40 @@ private void blockForContent() @Override public void run() { - if (!_contentProducer.hasRawContent()) + Content content = _contentProducer.nextContent(); + if (LOG.isDebugEnabled()) + LOG.debug("running on content {}", content); + // The nextContent() call could return null if the transformer ate all + // the raw bytes without producing any transformed content. + if (content == null) + return; + + // This check is needed when a request is started async but no read listener is registered. + if (_readListener == null) { if (LOG.isDebugEnabled()) - LOG.debug("running has no raw content; error: {}, EOF = {}", _error, _eof); - if (_error != null || _eof.isEarly()) + LOG.debug("running without a read listener"); + onContentProducible(); + return; + } + + if (content.isSpecial()) + { + Throwable error = content.getError(); + if (error != null) { + if (LOG.isDebugEnabled()) + LOG.debug("running has error: {}", (Object)error); // TODO is this necessary to add here? _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); - if (_error != null) - _readListener.onError(_error); - else - _readListener.onError(new EofException("Early EOF")); + _readListener.onError(error); } - else if (_eof.isEof()) + else if (content.isEof()) { try { + if (LOG.isDebugEnabled()) + LOG.debug("running at EOF"); _readListener.onAllDataRead(); } catch (Throwable x) @@ -492,12 +330,11 @@ else if (_eof.isEof()) _readListener.onError(x); } } - // else: !hasContent() && !error && !EOF -> no-op } else { if (LOG.isDebugEnabled()) - LOG.debug("running has raw content"); + LOG.debug("running has content"); try { _readListener.onDataAvailable(); @@ -511,357 +348,337 @@ else if (_eof.isEof()) } } - private void produceContent() + @Override + public String toString() { - if (LOG.isDebugEnabled()) - LOG.debug("produceContent {}", _contentProducer); - _channelState.getHttpChannel().produceContent(); + return getClass().getSimpleName() + "@" + hashCode() + + " cs=" + _channelState + + " cp=" + _contentProducer + + " eof=" + _consumedEof; } - private void failContent(Throwable failure) + public interface Interceptor { - if (LOG.isDebugEnabled()) - LOG.debug("failContent {} - " + failure, _contentProducer); - _channelState.getHttpChannel().failContent(failure); + /** + * @param content The content to be intercepted. + * The content will be modified with any data the interceptor consumes, but there is no requirement + * that all the data is consumed by the interceptor. + * @return The intercepted content or null if interception is completed for that content. + */ + Content readFrom(Content content); } - private enum Eof + /** + * An {@link Interceptor} that chains two other {@link Interceptor}s together. + * The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s + * {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned + * to the next {@link Interceptor}. + */ + private static class ChainedInterceptor implements Interceptor, Destroyable { - NOT_YET(false, false, false), - EOF(true, false, false), - CONSUMED_EOF(true, true, false), - EARLY_EOF(true, false, true), - ; + private final Interceptor _prev; + private final Interceptor _next; - private final boolean _eof; - private final boolean _consumed; - private final boolean _early; + ChainedInterceptor(Interceptor prev, Interceptor next) + { + _prev = prev; + _next = next; + } - Eof(boolean eof, boolean consumed, boolean early) + Interceptor getPrev() { - _eof = eof; - _consumed = consumed; - _early = early; + return _prev; } - boolean isEof() + Interceptor getNext() { - return _eof; + return _next; } - boolean isConsumed() + @Override + public Content readFrom(Content content) { - return _consumed; + Content c = getPrev().readFrom(content); + if (c == null) + return null; + return getNext().readFrom(c); } - boolean isEarly() + @Override + public void destroy() { - return _early; + if (_prev instanceof Destroyable) + ((Destroyable)_prev).destroy(); + if (_next instanceof Destroyable) + ((Destroyable)_next).destroy(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "@" + hashCode() + " [p=" + _prev + ",n=" + _next + "]"; } } - // All methods of this class have to be synchronized because a HTTP2 reset can call consumeTransformedContent() - // while nextNonEmptyContent() is executing, hence all accesses to _rawContent and _transformedContent must be - // mutually excluded. - // TODO: maybe the locking could be more fine grained, by only protecting the if (null|!null) blocks? - private static class ContentProducer + /** + * A content represents the production of a {@link HttpChannel} returned by {@link HttpChannel#produceContent()}. + * There are two fundamental types of content: special and non-special. + * Non-special content always wraps a byte buffer that can be consumed and must be recycled once it is empty, either + * via {@link #succeeded()} or {@link #failed(Throwable)}. + * Special content indicates a special event, like EOF or an error and never wraps a byte buffer. Calling + * {@link #succeeded()} or {@link #failed(Throwable)} on those have no effect. + */ + public static class Content implements Callback { - // Note: _rawContent can never be null for as long as _transformedContent is not null. - private Content _rawContent; - private Content _transformedContent; - private long _rawContentArrived; - private Interceptor _interceptor; - private Throwable _consumeFailure; + protected final ByteBuffer _content; - void recycle() + public Content(ByteBuffer content) { - synchronized (this) - { - if (LOG.isDebugEnabled()) - LOG.debug("recycle {}", this); - if (_transformedContent == _rawContent) - _transformedContent = null; - if (_transformedContent != null) - _transformedContent.failed(null); - _transformedContent = null; - if (_rawContent != null) - _rawContent.failed(null); - _rawContent = null; - _rawContentArrived = 0L; - if (_interceptor instanceof Destroyable) - ((Destroyable)_interceptor).destroy(); - _interceptor = null; - _consumeFailure = null; - } + _content = content; } - long getRawContentArrived() + /** + * Get the wrapped byte buffer. Throws {@link IllegalStateException} if the content is special. + * @return the wrapped byte buffer. + */ + public ByteBuffer getByteBuffer() { - synchronized (this) - { - return _rawContentArrived; - } + return _content; } - boolean hasRawContent() + @Override + public InvocationType getInvocationType() { - synchronized (this) - { - return _rawContent != null; - } + return InvocationType.NON_BLOCKING; } - Interceptor getInterceptor() + /** + * Read the wrapped byte buffer. Throws {@link IllegalStateException} if the content is special. + * @param buffer The array into which bytes are to be written. + * @param offset The offset within the array of the first byte to be written. + * @param length The maximum number of bytes to be written to the given array. + * @return The amount of bytes read from the buffer. + */ + public int get(byte[] buffer, int offset, int length) { - synchronized (this) - { - return _interceptor; - } + length = Math.min(_content.remaining(), length); + _content.get(buffer, offset, length); + return length; } - void setInterceptor(Interceptor interceptor) + /** + * Skip some bytes from the buffer. Has no effect on a special content. + * @param length How many bytes to skip. + * @return How many bytes were skipped. + */ + public int skip(int length) { - synchronized (this) - { - this._interceptor = interceptor; - } + length = Math.min(_content.remaining(), length); + _content.position(_content.position() + length); + return length; } - void addContent(Content content) + /** + * Check if there is at least one byte left in the buffer. + * Always false on a special content. + * @return true if there is at least one byte left in the buffer. + */ + public boolean hasContent() { - synchronized (this) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} addContent {}", this, content); - if (content == null) - throw new AssertionError("Cannot add null content"); - if (_consumeFailure != null) - { - content.failed(_consumeFailure); - return; - } - if (_rawContent != null) - throw new AssertionError("Cannot add new content while current one hasn't been processed"); - - _rawContent = content; - _rawContentArrived += content.remaining(); - } + return _content.hasRemaining(); } - void consumeTransformedContent(Consumer failRawContent, Throwable failure) + /** + * Get the number of bytes remaining in the buffer. + * Always 0 on a special content. + * @return the number of bytes remaining in the buffer. + */ + public int remaining() { - synchronized (this) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} consumeTransformedContent", this); - // start by depleting the current _transformedContent - if (_transformedContent != null) - { - _transformedContent.skip(_transformedContent.remaining()); - if (_transformedContent != _rawContent) - _transformedContent.failed(failure); - _transformedContent = null; - } - - // don't bother transforming content, directly deplete the raw one - if (_rawContent != null) - { - _rawContent.skip(_rawContent.remaining()); - _rawContent.failed(failure); - _rawContent = null; - } - - // fail whatever other content the producer may have - _consumeFailure = failure; - failRawContent.accept(failure); - } + return _content.remaining(); } - int available(Runnable rawContentProducer) + /** + * Check if the buffer is empty. + * Always true on a special content. + * @return true if there is 0 byte left in the buffer. + */ + public boolean isEmpty() { - synchronized (this) - { - Content content = nextNonEmptyContent(rawContentProducer); - return content == null ? 0 : content.remaining(); - } + return !_content.hasRemaining(); } - int read(Runnable rawContentProducer, byte[] b, int off, int len) + /** + * Check if the content is special. + * @return true if the content is special, false otherwise. + */ + public boolean isSpecial() { - synchronized (this) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} read", this); - Content content = nextNonEmptyContent(rawContentProducer); - return content == null ? 0 : content.get(b, off, len); - } + return false; } - private Content nextNonEmptyContent(Runnable rawContentProducer) + /** + * Check if EOF was reached. Both special and non-special content + * can have this flag set to true but in the case of non-special content, + * this can be interpreted as a hint as it is always going to be followed + * by another content that is both special and EOF. + * @return true if EOF was reached, false otherwise. + */ + public boolean isEof() { - if (_rawContent == null) - { - rawContentProducer.run(); - if (_rawContent == null) - return null; - } - - if (_transformedContent != null && _transformedContent.isEmpty()) - { - if (_transformedContent != _rawContent) - _transformedContent.succeeded(); - _transformedContent = null; - } - - while (_transformedContent == null) - { - if (_interceptor != null) - _transformedContent = _interceptor.readFrom(_rawContent); - else - _transformedContent = _rawContent; - - if (_transformedContent != null && _transformedContent.isEmpty()) - { - if (_transformedContent != _rawContent) - _transformedContent.succeeded(); - _transformedContent = null; - } - - if (_transformedContent == null) - { - if (_rawContent.isEmpty()) - { - _rawContent.succeeded(); - _rawContent = null; - rawContentProducer.run(); - if (_rawContent == null) - return null; - } - } - } + return false; + } - return _transformedContent; + /** + * Get the reported error. Only special contents can have an error. + * @return the error or null if there is none. + */ + public Throwable getError() + { + return null; } @Override public String toString() { - return getClass().getSimpleName() + "[i=" + _interceptor + ",b=" + _rawContentArrived + - ",r=" + _rawContent + ",t=" + _transformedContent + "]"; + return String.format("%s@%x{%s,spc=%s,eof=%s,err=%s}", getClass().getSimpleName(), hashCode(), + BufferUtil.toDetailString(_content), isSpecial(), isEof(), getError()); } } /** - * An {@link Interceptor} that chains two other {@link Interceptor}s together. - * The {@link Interceptor#readFrom(Content)} calls the previous {@link Interceptor}'s - * {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned - * to the next {@link Interceptor}. + * Simple non-special content wrapper allow overriding the EOF flag. */ - private static class ChainedInterceptor implements Interceptor, Destroyable + public static class WrappingContent extends Content { - private final Interceptor _prev; - private final Interceptor _next; + private final Content _delegate; + private final boolean _eof; - ChainedInterceptor(Interceptor prev, Interceptor next) + public WrappingContent(Content delegate, boolean eof) { - _prev = prev; - _next = next; + super(delegate.getByteBuffer()); + _delegate = delegate; + _eof = eof; } - Interceptor getPrev() + @Override + public boolean isEof() { - return _prev; + return _eof; } - Interceptor getNext() + @Override + public void succeeded() { - return _next; + _delegate.succeeded(); } @Override - public Content readFrom(Content content) + public void failed(Throwable x) { - Content c = getPrev().readFrom(content); - if (c == null) - return null; - return getNext().readFrom(c); + _delegate.failed(x); } @Override - public void destroy() + public InvocationType getInvocationType() { - if (_prev instanceof Destroyable) - ((Destroyable)_prev).destroy(); - if (_next instanceof Destroyable) - ((Destroyable)_next).destroy(); + return _delegate.getInvocationType(); } } - public interface Interceptor - { - /** - * @param content The content to be intercepted. - * The content will be modified with any data the interceptor consumes, but there is no requirement - * that all the data is consumed by the interceptor. - * @return The intercepted content or null if interception is completed for that content. - */ - Content readFrom(Content content); - } - - public static class Content implements Callback + /** + * Abstract class that implements the standard special content behavior. + */ + public abstract static class SpecialContent extends Content { - protected final ByteBuffer _content; + public SpecialContent() + { + super(null); + } - public Content(ByteBuffer content) + @Override + public final ByteBuffer getByteBuffer() { - _content = content; + throw new IllegalStateException(this + " has no buffer"); } - public ByteBuffer getByteBuffer() + @Override + public final int get(byte[] buffer, int offset, int length) { - return _content; + throw new IllegalStateException(this + " has no buffer"); } @Override - public InvocationType getInvocationType() + public final int skip(int length) { - return InvocationType.NON_BLOCKING; + return 0; } - public int get(byte[] buffer, int offset, int length) + @Override + public final boolean hasContent() { - length = Math.min(_content.remaining(), length); - _content.get(buffer, offset, length); - return length; + return false; } - public int skip(int length) + @Override + public final int remaining() { - length = Math.min(_content.remaining(), length); - _content.position(_content.position() + length); - return length; + return 0; } - public boolean hasContent() + @Override + public final boolean isEmpty() { - return _content.hasRemaining(); + return true; } - public int remaining() + @Override + public final boolean isSpecial() { - return _content.remaining(); + return true; } + } - public boolean isEmpty() + /** + * EOF special content. + */ + public static final class EofContent extends SpecialContent + { + @Override + public boolean isEof() { - return !_content.hasRemaining(); + return true; } @Override public String toString() { - return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content)); + return getClass().getSimpleName(); } } + /** + * Error special content. + */ + public static final class ErrorContent extends SpecialContent + { + private final Throwable _error; + + public ErrorContent(Throwable error) + { + _error = error; + } + + @Override + public Throwable getError() + { + return _error; + } + + @Override + public String toString() + { + return getClass().getSimpleName() + " [" + _error + "]"; + } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputState.puml b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputState.puml new file mode 100644 index 000000000000..0ef1896b5fe2 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputState.puml @@ -0,0 +1,16 @@ +@startuml + +IDLE: +READY: +UNREADY: + +[*] --> IDLE + +IDLE --> UNREADY : isReady +IDLE -right->READY : isReady + +UNREADY -up-> READY : ASYNC onContentProducible + +READY -left->IDLE : nextContent + +@enduml diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_async.puml b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_async.puml new file mode 100644 index 000000000000..c520361faef6 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_async.puml @@ -0,0 +1,114 @@ +@startuml +title "HttpInput" + +participant AsyncContentDelivery as "[async\ncontent\ndelivery]" +participant HttpChannel as "Http\nChannel\n" +participant HttpChannelState as "Http\nChannel\nState" +participant HttpInputInterceptor as "Http\nInput.\nInterceptor" +participant AsyncContentProducer as "Async\nContent\nProducer" +participant HttpInput as "Http\nInput\n" +participant Application as "\nApplication\n" + +autoactivate on + +== Async Read == + +Application->HttpInput: read +activate Application + HttpInput->AsyncContentProducer: nextContent + AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent + AsyncContentProducer->HttpChannel: produceContent + return raw content or null + alt if raw content is not null + AsyncContentProducer->HttpInputInterceptor: readFrom + return transformed content + end + return + alt if transformed content is not null + AsyncContentProducer->HttpChannelState: onReadIdle + return + end + return content or null + note over HttpInput + throw ISE + if content + is null + end note + HttpInput->AsyncContentProducer: reclaim + return +return +deactivate Application + +== isReady == + +Application->HttpInput: isReady +activate Application + HttpInput->AsyncContentProducer: isReady + AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent + AsyncContentProducer->HttpChannel: produceContent + return raw content or null + alt if raw content is not null + AsyncContentProducer->HttpInputInterceptor: readFrom + return transformed content + end + return + alt if transformed content is not null + AsyncContentProducer->HttpChannelState: onContentAdded + return + else transformed content is null + AsyncContentProducer->HttpChannelState: onReadUnready + return + AsyncContentProducer->HttpChannel: needContent + return + alt if needContent returns true + AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent + return + alt if transformed content is not null + AsyncContentProducer->HttpChannelState: onContentAdded + return + end + end + end + return boolean\n[transformed\ncontent is not null] +return +deactivate Application + +alt if content arrives + AsyncContentDelivery->HttpInput: onContentProducible + HttpInput->AsyncContentProducer: onContentProducible + alt if not at EOF + AsyncContentProducer->HttpChannelState: onReadReady + return true if woken + else if at EOF + AsyncContentProducer->HttpChannelState: onReadEof + return true if woken + end + return true if woken + return true if woken + alt onContentProducible returns true + AsyncContentDelivery->HttpChannel: execute(HttpChannel) + return + end +end + +||| + +== available == + +Application->HttpInput: available +activate Application + HttpInput->AsyncContentProducer: available + AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent + AsyncContentProducer->HttpChannel: produceContent + return raw content or null + alt if raw content is not null + AsyncContentProducer->HttpInputInterceptor: readFrom + return transformed content + end + return + return content size or\n0 if content is null +return +deactivate Application + +||| +@enduml diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_blocking.puml b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_blocking.puml new file mode 100644 index 000000000000..06cb82c4cfd9 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_blocking.puml @@ -0,0 +1,64 @@ +@startuml +title "HttpInput" + +participant AsyncContentDelivery as "[async\ncontent\ndelivery]" +participant HttpChannel as "Http\nChannel\n" +participant HttpChannelState as "Http\nChannel\nState" +participant AsyncContentProducer as "Async\nContent\nProducer" +participant Semaphore as "\nSemaphore\n" +participant BlockingContentProducer as "Blocking\nContent\nProducer" +participant HttpInput as "Http\nInput\n" +participant Application as "\nApplication\n" + +autoactivate on + +== Blocking Read == + +Application->HttpInput: read +activate Application + HttpInput->BlockingContentProducer: nextContent + loop + BlockingContentProducer->AsyncContentProducer: nextContent + AsyncContentProducer->AsyncContentProducer: nextTransformedContent + AsyncContentProducer->HttpChannel: produceContent + return + return + alt content is not null + AsyncContentProducer->HttpChannelState: onReadIdle + return + end + return content or null + alt content is null + BlockingContentProducer->HttpChannelState: onReadUnready + return + BlockingContentProducer->HttpChannel: needContent + return + alt needContent returns false + BlockingContentProducer->Semaphore: acquire + return + else needContent returns true + note over BlockingContentProducer + continue loop + end note + end + else content is not null + return non-null content + end + end + ' return from BlockingContentProducer: nextContent + HttpInput->BlockingContentProducer: reclaim + BlockingContentProducer->AsyncContentProducer: reclaim + return + return +return +deactivate Application + +alt if content arrives + AsyncContentDelivery->HttpInput: wakeup + HttpInput->BlockingContentProducer: wakeup + BlockingContentProducer->Semaphore: release + return + return false + return false +end +@enduml diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index 5e3908048fb0..934cd2e6a37a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -749,7 +749,7 @@ public long getContentLengthLong() public long getContentRead() { - return _input.getContentLength(); + return _input.getContentReceived(); } @Override diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java new file mode 100644 index 000000000000..379a93864314 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java @@ -0,0 +1,340 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPOutputStream; + +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; +import org.eclipse.jetty.util.compression.CompressionPool; +import org.eclipse.jetty.util.compression.InflaterPool; +import org.hamcrest.core.Is; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class AsyncContentProducerTest +{ + private ScheduledExecutorService scheduledExecutorService; + private InflaterPool inflaterPool; + + @BeforeEach + public void setUp() + { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + inflaterPool = new InflaterPool(-1, true); + } + + @AfterEach + public void tearDown() + { + scheduledExecutorService.shutdownNow(); + } + + @Test + public void testAsyncContentProducerNoInterceptor() throws Exception + { + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(buffers); + final String originalContentString = asString(buffers); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, nullValue()); + } + + @Test + public void testAsyncContentProducerNoInterceptorWithError() throws Exception + { + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(buffers); + final String originalContentString = asString(buffers); + final Throwable expectedError = new EofException("Early EOF"); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, Is.is(expectedError)); + } + + @Test + public void testAsyncContentProducerGzipInterceptor() throws Exception + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, nullValue()); + } + + @Test + public void testAsyncContentProducerGzipInterceptorWithTinyBuffers() throws Exception + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier); + assertThat(error, nullValue()); + } + + @Test + public void testBlockingContentProducerGzipInterceptorWithError() throws Exception + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + final Throwable expectedError = new Throwable("HttpInput idle timeout"); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier)); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, Is.is(expectedError)); + } + + private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, ContentProducer contentProducer, int totalContentCount, int readyCount, int notReadyCount, CyclicBarrier barrier) throws InterruptedException, BrokenBarrierException, TimeoutException + { + int readBytes = 0; + String consumedString = ""; + int nextContentCount = 0; + int isReadyFalseCount = 0; + int isReadyTrueCount = 0; + Throwable error = null; + + while (true) + { + if (contentProducer.isReady()) + isReadyTrueCount++; + else + isReadyFalseCount++; + + HttpInput.Content content = contentProducer.nextContent(); + nextContentCount++; + if (content == null) + { + barrier.await(5, TimeUnit.SECONDS); + content = contentProducer.nextContent(); + nextContentCount++; + } + assertThat(content, notNullValue()); + + if (content.isSpecial()) + { + if (content.isEof()) + break; + error = content.getError(); + break; + } + + byte[] b = new byte[content.remaining()]; + readBytes += b.length; + content.getByteBuffer().get(b); + consumedString += new String(b, StandardCharsets.ISO_8859_1); + content.skip(content.remaining()); + } + + assertThat(nextContentCount, is(totalContentCount)); + assertThat(readBytes, is(totalContentBytesCount)); + assertThat(consumedString, is(originalContentString)); + assertThat(isReadyFalseCount, is(notReadyCount)); + assertThat(isReadyTrueCount, is(readyCount)); + return error; + } + + private static int countRemaining(ByteBuffer[] byteBuffers) + { + int total = 0; + for (ByteBuffer byteBuffer : byteBuffers) + { + total += byteBuffer.remaining(); + } + return total; + } + + private static String asString(ByteBuffer[] buffers) + { + StringBuilder sb = new StringBuilder(); + for (ByteBuffer buffer : buffers) + { + byte[] b = new byte[buffer.remaining()]; + buffer.duplicate().get(b); + sb.append(new String(b, StandardCharsets.ISO_8859_1)); + } + return sb.toString(); + } + + private static ByteBuffer gzipByteBuffer(ByteBuffer uncompressedBuffer) + { + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + + byte[] b = new byte[uncompressedBuffer.remaining()]; + uncompressedBuffer.get(b); + output.write(b); + + output.close(); + return ByteBuffer.wrap(baos.toByteArray()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static class ArrayDelayedHttpChannel extends HttpChannel + { + private final ByteBuffer[] byteBuffers; + private final HttpInput.Content finalContent; + private final ScheduledExecutorService scheduledExecutorService; + private final CyclicBarrier barrier; + private int counter; + private volatile HttpInput.Content nextContent; + + public ArrayDelayedHttpChannel(ByteBuffer[] byteBuffers, HttpInput.Content finalContent, ScheduledExecutorService scheduledExecutorService, CyclicBarrier barrier) + { + super(new MockConnector(), new HttpConfiguration(), null, null); + this.byteBuffers = new ByteBuffer[byteBuffers.length]; + this.finalContent = finalContent; + this.scheduledExecutorService = scheduledExecutorService; + this.barrier = barrier; + for (int i = 0; i < byteBuffers.length; i++) + { + this.byteBuffers[i] = byteBuffers[i].duplicate(); + } + } + + @Override + public boolean needContent() + { + if (nextContent != null) + return true; + scheduledExecutorService.schedule(() -> + { + if (byteBuffers.length > counter) + nextContent = new HttpInput.Content(byteBuffers[counter++]); + else + nextContent = finalContent; + try + { + barrier.await(5, TimeUnit.SECONDS); + } + catch (Exception e) + { + throw new AssertionError(e); + } + }, 50, TimeUnit.MILLISECONDS); + return false; + } + + @Override + public HttpInput.Content produceContent() + { + HttpInput.Content result = nextContent; + nextContent = null; + return result; + } + + @Override + public boolean failAllContent(Throwable failure) + { + nextContent = null; + counter = byteBuffers.length; + return false; + } + + @Override + public boolean failed(Throwable x) + { + return false; + } + + @Override + protected boolean eof() + { + return false; + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java new file mode 100644 index 000000000000..a8f8fdb7dfc9 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java @@ -0,0 +1,320 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.GZIPOutputStream; + +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; +import org.eclipse.jetty.util.compression.InflaterPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; + +public class BlockingContentProducerTest +{ + private ScheduledExecutorService scheduledExecutorService; + private InflaterPool inflaterPool; + + @BeforeEach + public void setUp() + { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + inflaterPool = new InflaterPool(-1, true); + } + + @AfterEach + public void tearDown() + { + scheduledExecutorService.shutdownNow(); + } + + @Test + public void testBlockingContentProducerNoInterceptor() + { + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(buffers); + final String originalContentString = asString(buffers); + + AtomicReference ref = new AtomicReference<>(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); + ref.set(contentProducer); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); + assertThat(error, nullValue()); + } + + @Test + public void testBlockingContentProducerNoInterceptorWithError() + { + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(buffers); + final String originalContentString = asString(buffers); + final Throwable expectedError = new EofException("Early EOF"); + + AtomicReference ref = new AtomicReference<>(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); + ref.set(contentProducer); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); + assertThat(error, is(expectedError)); + } + + @Test + public void testBlockingContentProducerGzipInterceptor() + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + AtomicReference ref = new AtomicReference<>(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); + ref.set(contentProducer); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); + assertThat(error, nullValue()); + } + + @Test + public void testBlockingContentProducerGzipInterceptorWithTinyBuffers() + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + AtomicReference ref = new AtomicReference<>(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); + ref.set(contentProducer); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, totalContentBytesCount + 1, contentProducer); + assertThat(error, nullValue()); + } + + @Test + public void testBlockingContentProducerGzipInterceptorWithError() + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + final Throwable expectedError = new Throwable("HttpInput idle timeout"); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + AtomicReference ref = new AtomicReference<>(); + ArrayDelayedHttpChannel httpChannel = new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, () -> ref.get().onContentProducible()); + ContentProducer contentProducer = new BlockingContentProducer(new AsyncContentProducer(httpChannel)); + ref.set(contentProducer); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, buffers.length + 1, contentProducer); + assertThat(error, is(expectedError)); + } + + private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, int totalContentCount, ContentProducer contentProducer) + { + int readBytes = 0; + int nextContentCount = 0; + String consumedString = ""; + Throwable error = null; + while (true) + { + HttpInput.Content content = contentProducer.nextContent(); + nextContentCount++; + + if (content.isSpecial()) + { + if (content.isEof()) + break; + error = content.getError(); + break; + } + + byte[] b = new byte[content.remaining()]; + content.getByteBuffer().get(b); + consumedString += new String(b, StandardCharsets.ISO_8859_1); + + readBytes += b.length; + } + assertThat(readBytes, is(totalContentBytesCount)); + assertThat(nextContentCount, is(totalContentCount)); + assertThat(consumedString, is(originalContentString)); + return error; + } + + private static int countRemaining(ByteBuffer[] byteBuffers) + { + int total = 0; + for (ByteBuffer byteBuffer : byteBuffers) + { + total += byteBuffer.remaining(); + } + return total; + } + + private static String asString(ByteBuffer[] buffers) + { + StringBuilder sb = new StringBuilder(); + for (ByteBuffer buffer : buffers) + { + byte[] b = new byte[buffer.remaining()]; + buffer.duplicate().get(b); + sb.append(new String(b, StandardCharsets.ISO_8859_1)); + } + return sb.toString(); + } + + private static ByteBuffer gzipByteBuffer(ByteBuffer uncompressedBuffer) + { + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + + byte[] b = new byte[uncompressedBuffer.remaining()]; + uncompressedBuffer.get(b); + output.write(b); + + output.close(); + return ByteBuffer.wrap(baos.toByteArray()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private interface ContentListener + { + void onContent(); + } + + private static class ArrayDelayedHttpChannel extends HttpChannel + { + private final ByteBuffer[] byteBuffers; + private final HttpInput.Content finalContent; + private final ScheduledExecutorService scheduledExecutorService; + private final ContentListener contentListener; + private int counter; + private volatile HttpInput.Content nextContent; + + public ArrayDelayedHttpChannel(ByteBuffer[] byteBuffers, HttpInput.Content finalContent, ScheduledExecutorService scheduledExecutorService, ContentListener contentListener) + { + super(new MockConnector(), new HttpConfiguration(), null, null); + this.byteBuffers = new ByteBuffer[byteBuffers.length]; + this.finalContent = finalContent; + this.scheduledExecutorService = scheduledExecutorService; + this.contentListener = contentListener; + for (int i = 0; i < byteBuffers.length; i++) + { + this.byteBuffers[i] = byteBuffers[i].duplicate(); + } + } + + @Override + public boolean needContent() + { + if (nextContent != null) + return true; + scheduledExecutorService.schedule(() -> + { + if (byteBuffers.length > counter) + nextContent = new HttpInput.Content(byteBuffers[counter++]); + else + nextContent = finalContent; + contentListener.onContent(); + }, 50, TimeUnit.MILLISECONDS); + return false; + } + + @Override + public HttpInput.Content produceContent() + { + HttpInput.Content result = nextContent; + nextContent = null; + return result; + } + + @Override + public boolean failAllContent(Throwable failure) + { + nextContent = null; + counter = byteBuffers.length; + return false; + } + + @Override + public boolean failed(Throwable x) + { + return false; + } + + @Override + protected boolean eof() + { + return false; + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java index d584037a1c29..c40e3d89109e 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java @@ -49,13 +49,21 @@ public void init() throws Exception HttpChannel channel = new HttpChannel(new MockConnector(), new HttpConfiguration(), null, null) { @Override - public void produceContent() + public boolean needContent() { + return false; } @Override - public void failContent(Throwable failure) + public HttpInput.Content produceContent() { + return null; + } + + @Override + public boolean failAllContent(Throwable failure) + { + return false; } @Override @@ -63,6 +71,18 @@ public ByteBufferPool getByteBufferPool() { return pool; } + + @Override + public boolean failed(Throwable x) + { + return false; + } + + @Override + protected boolean eof() + { + return false; + } }; _httpOut = new HttpOutput(channel) diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index fb17e86da8e7..afb9d8210adc 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -178,13 +178,33 @@ public void abort(Throwable failure) }) { @Override - public void produceContent() + public boolean needContent() { + return false; + } + + @Override + public HttpInput.Content produceContent() + { + return null; + } + + @Override + public boolean failAllContent(Throwable failure) + { + return false; + } + + @Override + public boolean failed(Throwable x) + { + return false; } @Override - public void failContent(Throwable failure) + protected boolean eof() { + return false; } }; } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java index cbc3bd800a69..098cf5bd7291 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.zip.GZIPOutputStream; import javax.servlet.AsyncContext; import javax.servlet.DispatcherType; @@ -75,6 +76,8 @@ import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.compression.CompressionPool; +import org.eclipse.jetty.util.compression.InflaterPool; import org.hamcrest.Matchers; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Disabled; @@ -89,6 +92,7 @@ import static org.eclipse.jetty.util.BufferUtil.toArray; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -778,10 +782,18 @@ public void onDataAvailable() throw new IllegalStateException(); if (input.read() != 'X') throw new IllegalStateException(); - if (!input.isReady()) - throw new IllegalStateException(); - if (input.read() != -1) - throw new IllegalStateException(); + if (input.isReady()) + { + try + { + if (input.read() != -1) + throw new IllegalStateException(); + } + catch (IOException e) + { + // ignore + } + } } catch (IOException x) { @@ -1346,6 +1358,81 @@ public void onComplete(Result result) assertTrue(clientLatch.await(10, TimeUnit.SECONDS)); } + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testAsyncEcho(Transport transport) throws Exception + { + init(transport); + scenario.start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + System.err.println("Service " + request); + + AsyncContext asyncContext = request.startAsync(); + ServletInputStream input = request.getInputStream(); + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + while (input.isReady()) + { + int b = input.read(); + if (b >= 0) + { + // System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b)); + response.getOutputStream().write(b); + } + else + return; + } + } + + @Override + public void onAllDataRead() throws IOException + { + asyncContext.complete(); + } + + @Override + public void onError(Throwable x) + { + } + }); + } + }); + + AsyncRequestContent contentProvider = new AsyncRequestContent(); + CountDownLatch clientLatch = new CountDownLatch(1); + + AtomicReference resultRef = new AtomicReference<>(); + scenario.client.newRequest(scenario.newURI()) + .method(HttpMethod.POST) + .path(scenario.servletPath) + .body(contentProvider) + .send(new BufferingResponseListener(16 * 1024 * 1024) + { + @Override + public void onComplete(Result result) + { + resultRef.set(result); + clientLatch.countDown(); + } + }); + + for (int i = 0; i < 1_000_000; i++) + { + contentProvider.offer(BufferUtil.toBuffer("S" + i)); + } + contentProvider.close(); + + assertTrue(clientLatch.await(30, TimeUnit.SECONDS)); + assertThat(resultRef.get().isSucceeded(), Matchers.is(true)); + assertThat(resultRef.get().getResponse().getStatus(), Matchers.equalTo(HttpStatus.OK_200)); + } + @ParameterizedTest @ArgumentsSource(TransportProvider.class) public void testAsyncInterceptedTwice(Transport transport) throws Exception @@ -1359,7 +1446,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) System.err.println("Service " + request); final HttpInput httpInput = ((Request)request).getHttpInput(); - httpInput.addInterceptor(new GzipHttpInputInterceptor(((Request)request).getHttpChannel().getByteBufferPool(), 1024)); + httpInput.addInterceptor(new GzipHttpInputInterceptor(new InflaterPool(-1, true), ((Request)request).getHttpChannel().getByteBufferPool(), 1024)); httpInput.addInterceptor(content -> { ByteBuffer byteBuffer = content.getByteBuffer(); @@ -1406,7 +1493,7 @@ public void onError(Throwable x) } }); - DeferredContentProvider contentProvider = new DeferredContentProvider(); + AsyncRequestContent contentProvider = new AsyncRequestContent(); CountDownLatch clientLatch = new CountDownLatch(1); String expected = @@ -1421,7 +1508,7 @@ public void onError(Throwable x) scenario.client.newRequest(scenario.newURI()) .method(HttpMethod.POST) .path(scenario.servletPath) - .content(contentProvider) + .body(contentProvider) .send(new BufferingResponseListener() { @Override @@ -1437,19 +1524,11 @@ public void onComplete(Result result) } }); - contentProvider.offer(gzipToBuffer("S0")); - contentProvider.flush(); - contentProvider.offer(gzipToBuffer("S1")); - contentProvider.flush(); - contentProvider.offer(gzipToBuffer("S2")); - contentProvider.flush(); - contentProvider.offer(gzipToBuffer("S3")); - contentProvider.flush(); - contentProvider.offer(gzipToBuffer("S4")); - contentProvider.flush(); - contentProvider.offer(gzipToBuffer("S5")); - contentProvider.flush(); - contentProvider.offer(gzipToBuffer("S6")); + for (int i = 0; i < 7; i++) + { + contentProvider.offer(gzipToBuffer("S" + i)); + contentProvider.flush(); + } contentProvider.close(); assertTrue(clientLatch.await(10, TimeUnit.SECONDS)); @@ -1534,7 +1613,7 @@ public void onError(Throwable x) } }); - DeferredContentProvider contentProvider = new DeferredContentProvider(); + AsyncRequestContent contentProvider = new AsyncRequestContent(); CountDownLatch clientLatch = new CountDownLatch(1); String expected = @@ -1546,7 +1625,7 @@ public void onError(Throwable x) scenario.client.newRequest(scenario.newURI()) .method(HttpMethod.POST) .path(scenario.servletPath) - .content(contentProvider) + .body(contentProvider) .send(new BufferingResponseListener() { @Override @@ -1631,18 +1710,21 @@ public int read(byte[] b, int off, int len) })) .send(); assertEquals(HttpStatus.OK_200, response.getStatus()); - latch.countDown(); } catch (Throwable x) { failures.offer(x); } + finally + { + latch.countDown(); + } } }); } assertTrue(latch.await(30, TimeUnit.SECONDS)); - assertTrue(failures.isEmpty()); + assertThat(failures, empty()); } private static class Listener implements ReadListener, WriteListener @@ -1771,10 +1853,11 @@ private void checkScope() } @Override - public void stopServer() + public void stopServer() throws Exception { checkScope(); scope.set(null); + super.stopServer(); } } }