Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HttpInput may skip setting fill interest #5692

Merged
merged 4 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ protected void needsFillInterest() throws IOException
throw new ClosedChannelException();

ByteBuffer in = _inQ.peek();
if (LOG.isDebugEnabled())
LOG.debug("{} needsFillInterest EOF={} {}", this, in == EOF, BufferUtil.toDetailString(in));
if (BufferUtil.hasContent(in) || isEOF(in))
execute(_runFillable);
}
Expand All @@ -201,11 +203,15 @@ public void addInput(ByteBuffer in)
boolean wasEmpty = _inQ.isEmpty();
if (in == null)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addEOFAndRun=true", this);
_inQ.add(EOF);
fillable = true;
}
if (BufferUtil.hasContent(in))
{
if (LOG.isDebugEnabled())
LOG.debug("{} addInputAndRun={} {}", this, wasEmpty, BufferUtil.toDetailString(in));
_inQ.add(in);
fillable = wasEmpty;
}
Expand Down Expand Up @@ -234,11 +240,15 @@ public void addInputAndExecute(ByteBuffer in)
boolean wasEmpty = _inQ.isEmpty();
if (in == null)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addEOFAndExecute=true", this);
_inQ.add(EOF);
fillable = true;
}
if (BufferUtil.hasContent(in))
{
if (LOG.isDebugEnabled())
LOG.debug("{} addInputAndExecute={} {}", this, wasEmpty, BufferUtil.toDetailString(in));
_inQ.add(in);
fillable = wasEmpty;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public int available()
HttpInput.Content content = nextTransformedContent();
int available = content == null ? 0 : content.remaining();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
LOG.debug("available = {} {}", available, this);
return available;
}

Expand All @@ -86,15 +86,15 @@ public boolean hasContent()
{
boolean hasContent = _rawContent != null;
if (LOG.isDebugEnabled())
LOG.debug("hasContent = {}", hasContent);
LOG.debug("hasContent = {} {}", hasContent, this);
return hasContent;
}

@Override
public boolean isError()
{
if (LOG.isDebugEnabled())
LOG.debug("isError = {}", _error);
LOG.debug("isError = {} {}", _error, this);
return _error;
}

Expand All @@ -103,7 +103,7 @@ public void checkMinDataRate()
{
long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate();
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate [m={},t={}]", minRequestDataRate, _firstByteTimeStamp);
LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this);
if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE)
{
long period = System.nanoTime() - _firstByteTimeStamp;
Expand All @@ -113,13 +113,13 @@ public void checkMinDataRate()
if (getRawContentArrived() < minimumData)
{
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate check failed");
LOG.debug("checkMinDataRate check failed {}", this);
BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,
String.format("Request content data rate < %d B/s", minRequestDataRate));
if (_httpChannel.getState().isResponseCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate aborting channel");
LOG.debug("checkMinDataRate aborting channel {}", this);
_httpChannel.abort(bad);
}
failCurrentContent(bad);
Expand All @@ -133,15 +133,15 @@ public void checkMinDataRate()
public long getRawContentArrived()
{
if (LOG.isDebugEnabled())
LOG.debug("getRawContentArrived = {}", _rawContentArrived);
LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this);
return _rawContentArrived;
}

@Override
public boolean consumeAll(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("consumeAll [e={}]", (Object)x);
LOG.debug("consumeAll [e={}] {}", x, this);
failCurrentContent(x);
// A specific HttpChannel mechanism must be used as the following code
// does not guarantee that the channel will synchronously deliver all
Expand All @@ -156,14 +156,14 @@ public boolean consumeAll(Throwable x)
// deliver the content asynchronously. Tests in StreamResetTest cover this.
boolean atEof = _httpChannel.failAllContent(x);
if (LOG.isDebugEnabled())
LOG.debug("failed all content of http channel; at EOF? {}", atEof);
LOG.debug("failed all content of http channel EOF={} {}", atEof, this);
return atEof;
}

private void failCurrentContent(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failing currently held content [r={},t={}]", _rawContent, _transformedContent, x);
LOG.debug("failing currently held content {}", this, x);
if (_transformedContent != null && !_transformedContent.isSpecial())
{
if (_transformedContent != _rawContent)
Expand All @@ -186,7 +186,7 @@ private void failCurrentContent(Throwable x)
public boolean onContentProducible()
{
if (LOG.isDebugEnabled())
LOG.debug("onContentProducible");
LOG.debug("onContentProducible {}", this);
return _httpChannel.getState().onReadReady();
}

Expand All @@ -195,7 +195,7 @@ public HttpInput.Content nextContent()
{
HttpInput.Content content = nextTransformedContent();
if (LOG.isDebugEnabled())
LOG.debug("nextContent = {}", content);
LOG.debug("nextContent = {} {}", content, this);
if (content != null)
_httpChannel.getState().onReadIdle();
return content;
Expand All @@ -205,7 +205,7 @@ public HttpInput.Content nextContent()
public void reclaim(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("reclaim {} [t={}]", content, _transformedContent);
LOG.debug("reclaim {} {}", content, this);
if (_transformedContent == content)
{
content.succeeded();
Expand All @@ -222,24 +222,39 @@ public boolean isReady()
if (content == null)
{
_httpChannel.getState().onReadUnready();
if (_httpChannel.needContent())
while (true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is correct, but I think it would read better as:

public boolean isReady()
{
    HttpInput.Content content = nextTransformedContent();
    if (content != null)
    {
        // TODO do we really need to call onContentAdded in this case??? I don't think so as we have not called onReadUnready
        return true;
    }
            
   _httpChannel.getState().onReadUnready();
    while (_httpChannel.needContent())
    {
        HttpInput.Content content = nextTransformedContent();
        if (content != null)
        {
            _httpChannel.getState().onContentAdded();
            return true;
        }
        _httpChannel.getState().onReadUnready();
    }

    return false;
}

Copy link
Contributor

@lorban lorban Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gregw Regarding your TODO, yes you must call onContentAdded in this case. Imagine a servlet calling isReady twice. The first time false is returned but the 2nd time true is returned. If you do not call onContentAdded to switch the state to READY, it'll be stuck at UNREADY which will cause havoc.

Oh, and I also added in the javadoc of ContentProducer.isReady that After this call, state can be either of UNREADY or READY as the state cannot be left IDLE after isReady is called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lorban But if isReady() has returned false, then regardless of it being called another time there is a scheduling action that will take place behind the scenes to make sure onDataAvailable is called when data becomes available. Surely it is that action that will call onContentAdded. Can you write a test case to demonstrate why this call is needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #5704 to track this.

{
content = nextTransformedContent();
if (LOG.isDebugEnabled())
LOG.debug("isReady got transformed content after needContent retry {}", content);
if (content != null)
_httpChannel.getState().onContentAdded();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("isReady has no transformed content after needContent");
if (_httpChannel.needContent())
{
content = nextTransformedContent();
if (LOG.isDebugEnabled())
LOG.debug("isReady got transformed content after needContent retry {} {}", content, this);
if (content != null)
{
_httpChannel.getState().onContentAdded();
break;
}
else
{
// We could have read some rawContent but not enough to generate
// transformed content, so we need to call needContent() again
// to tell the channel that more content is needed.
if (LOG.isDebugEnabled())
LOG.debug("isReady could not transform content after needContent retry {}", this);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("isReady false needContent retry {}", this);
break;
}
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("isReady got transformed content {}", content);
LOG.debug("isReady got transformed content {} {}", content, this);
_httpChannel.getState().onContentAdded();
}
boolean ready = content != null;
Expand All @@ -251,7 +266,7 @@ public boolean isReady()
private HttpInput.Content nextTransformedContent()
{
if (LOG.isDebugEnabled())
LOG.debug("nextTransformedContent [r={},t={}]", _rawContent, _transformedContent);
LOG.debug("nextTransformedContent {}", this);
if (_rawContent == null)
{
_rawContent = produceRawContent();
Expand All @@ -264,7 +279,7 @@ private HttpInput.Content nextTransformedContent()
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted transformed content");
LOG.debug("nulling depleted transformed content {}", this);
_transformedContent = null;
}

Expand All @@ -276,20 +291,20 @@ private HttpInput.Content nextTransformedContent()

_error = _rawContent.getError() != null;
if (LOG.isDebugEnabled())
LOG.debug("raw content is special (with error = {}), returning it", _error);
LOG.debug("raw content is special (with error = {}), returning it {}", _error, this);
return _rawContent;
}

if (_interceptor != null)
{
if (LOG.isDebugEnabled())
LOG.debug("using interceptor {} to transform raw content", _interceptor);
LOG.debug("using interceptor to transform raw content {}", this);
_transformedContent = _interceptor.readFrom(_rawContent);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("null interceptor, transformed content = raw content");
LOG.debug("null interceptor, transformed content = raw content {}", this);
_transformedContent = _rawContent;
}

Expand All @@ -298,7 +313,7 @@ private HttpInput.Content nextTransformedContent()
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted transformed content");
LOG.debug("nulling depleted transformed content {}", this);
_transformedContent = null;
}

Expand All @@ -309,30 +324,30 @@ private HttpInput.Content nextTransformedContent()
_rawContent.succeeded();
_rawContent = null;
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted raw content");
LOG.debug("nulling depleted raw content {}", this);
_rawContent = produceRawContent();
if (_rawContent == null)
{
if (LOG.isDebugEnabled())
LOG.debug("produced null raw content, returning null");
LOG.debug("produced null raw content, returning null, {}", this);
return null;
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("raw content is not empty");
LOG.debug("raw content is not empty {}", this);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("transformed content is not empty");
LOG.debug("transformed content is not empty {}", this);
}
}

if (LOG.isDebugEnabled())
LOG.debug("returning transformed content {}", _transformedContent);
LOG.debug("returning transformed content {}", this);
return _transformedContent;
}

Expand All @@ -345,10 +360,24 @@ private HttpInput.Content produceRawContent()
if (_firstByteTimeStamp == Long.MIN_VALUE)
_firstByteTimeStamp = System.nanoTime();
if (LOG.isDebugEnabled())
LOG.debug("produceRawContent updated rawContentArrived to {} and firstByteTimeStamp to {}", _rawContentArrived, _firstByteTimeStamp);
LOG.debug("produceRawContent updated rawContentArrived to {} and firstByteTimeStamp to {} {}", _rawContentArrived, _firstByteTimeStamp, this);
}
if (LOG.isDebugEnabled())
LOG.debug("produceRawContent produced {}", content);
LOG.debug("produceRawContent produced {} {}", content, this);
return content;
}

@Override
public String toString()
{
return String.format("%s@%x[r=%s,t=%s,i=%s,error=%b,c=%s]",
getClass().getSimpleName(),
hashCode(),
_rawContent,
_transformedContent,
_interceptor,
_error,
_httpChannel
);
}
}
Loading