Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment with IteratingCallback #12040

Merged
merged 34 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6858307
Experiment with IteratingCallback
gregw Jul 15, 2024
2b60a6d
Experiment with IteratingCallback
gregw Jul 16, 2024
c2742cc
Experiment with IteratingCallback
gregw Jul 16, 2024
4c28464
Reduce capture of this:: on hot path
gregw Jul 16, 2024
a6fa5bf
Split the release operations in case of failures.
sbordet Jul 19, 2024
b56de27
Made onCompleted() private to address PR review.
sbordet Jul 19, 2024
3c7631f
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/IteratingCal…
gregw Jul 24, 2024
c50b4da
Fix
gregw Jul 24, 2024
60ed7d1
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/IteratingCal…
gregw Jul 25, 2024
14d3b08
reverted import changes from bad merge
gregw Jul 25, 2024
3299e02
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/IteratingCal…
gregw Jul 25, 2024
0b547f5
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/IteratingCal…
gregw Aug 3, 2024
5854e11
releaseForRemoval
gregw Aug 3, 2024
92b37b8
improved javadoc
gregw Aug 3, 2024
9d2ae7d
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/IteratingCal…
gregw Aug 4, 2024
703da3d
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/IteratingCal…
gregw Aug 4, 2024
5144e17
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/IteratingCal…
gregw Aug 5, 2024
c00f6f0
WIP
gregw Aug 5, 2024
d3157bf
WIP
gregw Aug 5, 2024
d65bc94
documentation
gregw Aug 5, 2024
700c69b
Merge remote-tracking branch 'origin/experiment/jetty-12.1.x/Iteratin…
gregw Aug 5, 2024
00b87f1
WIP
gregw Aug 6, 2024
e502db0
updates from review
gregw Aug 6, 2024
1530cd6
Fixed FCGI Flusher
gregw Aug 7, 2024
aa42201
Merge branch 'jetty-12.1.x' into experiment/jetty-12.1.x/IteratingCal…
gregw Aug 7, 2024
7c5c003
updates from review
gregw Aug 7, 2024
7f81e62
fix reported leak error messages
lorban Aug 12, 2024
9bce4fc
fix leak
lorban Aug 12, 2024
be283c4
fix missing calls to super
lorban Aug 13, 2024
ddf13d6
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
lorban Aug 13, 2024
6028353
Small tweaks to documentation and javadocs.
sbordet Aug 16, 2024
20392d2
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Aug 21, 2024
c19366e
Merge remote-tracking branch 'origin/jetty-12.1.x' into experiment/je…
gregw Aug 22, 2024
86b1442
deflake test
gregw Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,28 +329,30 @@ protected Action process() throws Throwable
// Read a chunk.
chunk = source.read();

// No chunk, demand to be called back when there will be more chunks.
// If no chunk,
if (chunk == null)
{
source.demand(this::iterate);
return Action.IDLE;
// schedule a demand callback when there are more chunks.
source.demand(this::succeeded);
return Action.SCHEDULED;
}

// The read failed, re-throw the failure
// causing onCompleteFailure() to be invoked.
if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();

// Copy the chunk.
// Copy the chunk by scheduling an async write
sink.write(chunk.isLast(), chunk.getByteBuffer(), this);
return Action.SCHEDULED;
}

@Override
protected void onSuccess()
{
// After every successful write, release the chunk.
chunk.release();
// After every successful write, release the chunk
// and reset to the next chunk
chunk = Content.Chunk.releaseAndNext(chunk);
}

@Override
Expand All @@ -361,14 +363,20 @@ protected void onCompleteSuccess()
}

@Override
protected void onCompleteFailure(Throwable failure)
protected void onFailure(Throwable cause)
{
// In case of a failure, either on the
// read or on the write, release the chunk.
chunk.release();

// The copy is failed, fail the callback.
callback.failed(failure);
// This may occur before a {@code write} has completed (due to abort or close),
// so we cannot release the chunk here.
callback.failed(cause);
}

@Override
protected void onCompleteFailure(Throwable failure)
{
// In case of a failure, we wait until here, when the {@code write}
// has completed before releasing any chunk.
chunk = Content.Chunk.releaseAndNext(chunk);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
Expand Down Expand Up @@ -225,11 +228,13 @@ public void echoCorrect()
// tag::echo-correct[]
class EchoConnection extends AbstractConnection
{
private final ByteBufferPool.Sized pool;
private final IteratingCallback callback = new EchoIteratingCallback();

public EchoConnection(EndPoint endp, Executor executor)
public EchoConnection(EndPoint endp, ByteBufferPool.Sized pool, Executor executor)
{
super(endp, executor);
this.pool = pool;
}

@Override
Expand All @@ -250,20 +255,20 @@ public void onFillable()

class EchoIteratingCallback extends IteratingCallback
{
private ByteBuffer buffer;
private RetainableByteBuffer buffer;

@Override
protected Action process() throws Throwable
{
// Obtain a buffer if we don't already have one.
if (buffer == null)
buffer = BufferUtil.allocate(1024);
buffer = pool.acquire();

int filled = getEndPoint().fill(buffer);
int filled = getEndPoint().fill(buffer.getByteBuffer());
if (filled > 0)
{
// We have filled some bytes, echo them back.
getEndPoint().write(this, buffer);
getEndPoint().write(this, buffer.getByteBuffer());

// Signal that the iteration should resume
// when the write() operation is completed.
Expand All @@ -273,14 +278,15 @@ else if (filled == 0)
{
// We don't need the buffer anymore, so
// don't keep it around while we are idle.
buffer = null;
buffer = Retainable.release(buffer);

// No more bytes to read, declare
// again interest for fill events.
fillInterested();
fillInterested(this);

// Signal that the iteration is now IDLE.
return Action.IDLE;
// Signal that the iteration is now SCHEDULED
// for a fillable callback.
return Action.SCHEDULED;
}
else
{
Expand All @@ -291,17 +297,11 @@ else if (filled == 0)
}

@Override
protected void onCompleteSuccess()
{
// The iteration completed successfully.
getEndPoint().close();
}

@Override
protected void onCompleteFailure(Throwable cause)
protected void onCompleted(Throwable cause)
{
// The iteration completed with a failure.
// The iteration completed.
getEndPoint().close(cause);
buffer = Retainable.release(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,15 @@ protected Action process() throws Throwable // <2>
@Override
public void succeed()
{
// Map the o.e.j.websocket.api.Callback to o.e.jetty.util.Callback API
// When the send succeeds, succeed this IteratingCallback.
succeeded();
}

@Override
public void fail(Throwable x)
{
// Map the o.e.j.websocket.api.Callback to o.e.jetty.util.Callback API
// When the send fails, fail this IteratingCallback.
failed(x);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void onOpen()
@Override
public void onFillable()
{
// Called from the fill interest in onOpen() to start iteration
callback.iterate();
}

Expand Down Expand Up @@ -206,24 +207,20 @@ protected Action process() throws Throwable
// the application completed the request processing.
return Action.SCHEDULED;
}
else
{
// Did not receive enough JSON bytes,
// loop around to try to read more.
}
// Did not receive enough JSON bytes to complete the parser,
// loop around to try to read more.
}
else if (filled == 0)
{
// We don't need the buffer anymore, so
// don't keep it around while we are idle.
buffer = null;

// No more bytes to read, declare
// again interest for fill events.
fillInterested();
// No more bytes to read, declare again interest for fill events.
fillInterested(this);

// Signal that the iteration is now IDLE.
return Action.IDLE;
// Signal that the iteration is now SCHEDULED for fill interest callback.
return Action.SCHEDULED;
}
else
{
Expand Down
15 changes: 10 additions & 5 deletions documentation/jetty/modules/programming-guide/pages/arch/io.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ In turn, this calls `IteratingCallback.process()`, an abstract method that must
Method `process()` must return:

* `Action.SCHEDULED`, to indicate whether the loop has performed a non-blocking, possibly asynchronous, operation
* `Action.IDLE`, to indicate that the loop should temporarily be suspended to be resumed later
* `Action.IDLE`, to indicate that the loop should temporarily be suspended to be resumed later with another call to iterate
* `Action.SUCCEEDED` to indicate that the loop exited successfully

Any exception thrown within `process()` exits the loops with a failure.
Expand All @@ -209,13 +209,18 @@ If this was the only active network connection, the system would now be idle, wi

Eventually, the Jetty I/O system will notify that the `write()` completed; this notifies the `IteratingCallback` that can now resume the loop and call `process()` again.

When `process()` is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the `Connection` to read, or it may send them after a long pause -- in both cases we do not want to retain the memory allocated by the buffer; next, you want to call `fillInterested()` to declare again interest for read events, and return `Action.IDLE` since there is nothing to write back and therefore the loop may be suspended.
When more bytes are again available to be read from the network, `onFillable()` will be called again and that will start the iteration again.
When `process()` is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the `Connection` to read, or it may send them after a long pause -- in both cases we do not want to retain the memory allocated by the buffer; next, you want to call `fillInterested(this)` to declare again interest for read events, and return `Action.SCHEDULED` since a callback is scheduled to occur once filling is possible.

Another possibility is that during `process()` the read returns `-1` indicating that the other peer has closed the connection; this means that there will not be more bytes to read and the loop can be exited, so you return `Action.SUCCEEDED`; `IteratingCallback` will then call `onCompleteSuccess()` where you can close the `EndPoint`.

The last case is that during `process()` an exception is thrown, for example by `EndPoint.fill(ByteBuffer)` or, in more advanced implementations, by code that parses the bytes that have been read and finds them unacceptable; any exception thrown within `process()` will be caught by `IteratingCallback` that will exit the loop with a failure and call `onCompleteFailure(Throwable)` with the exception that has been thrown, where you can close the `EndPoint`, passing the exception that is the reason for closing prematurely the `EndPoint`.

Note that some failures may occur whilst a scheduled operation is in progress.
Such failures are notified immediately via the `onFailure(Throwable)` method, but care must be taken to not release any resources that may still be in use by the scheduled operation.
The `onCompleteFailure(Throwable)` method is called when both a failure has occurred and any scheduled operation has completed.
An example of this issue is that a buffer used for a write operation cannot be returned to a pool in `onFailure(Throwable)` as the write may still be progressing.
Either the buffer must be removed from the pool in `onFailure(Throwable)` or the release of the buffer deferred until `onCompleteFailure(Throwable)` is called.

[IMPORTANT]
====
Asynchronous programming is hard.
Expand Down Expand Up @@ -356,9 +361,9 @@ You must initiate a second write only when the first is finished, for example:
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java[tags=sinkMany]
----

When you need to perform an unknown number of writes, you must use an `IteratingCallback`, explained in <<echo,this section>>, to avoid ``StackOverFlowError``s.
When you need to perform an unknown number of writes, you may use an `IteratingCallback`, explained in <<echo,this section>>, to avoid ``StackOverFlowError``s.

For example, to copy from a `Content.Source` to a `Content.Sink` you should use the convenience method `Content.copy(Content.Source, Content.Sink, Callback)`.
For example, to copy from a `Content.Source` to a `Content.Sink` you could use the convenience method `Content.copy(Content.Source, Content.Sink, Callback)`.
For illustrative purposes, below you can find the implementation of `copy(Content.Source, Content.Sink, Callback)` that uses an `IteratingCallback`:

[,java,indent=0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,14 +617,8 @@ protected void onSuccess()
}

@Override
protected void onCompleteFailure(Throwable x)
protected void onFailure(Throwable x)
{
if (chunk != null)
{
chunk.release();
chunk = Content.Chunk.next(chunk);
}

failRequest(x);
internalAbort(exchange, x);

Expand All @@ -633,6 +627,14 @@ protected void onCompleteFailure(Throwable x)
promise.succeeded(true);
}

@Override
protected void onCompleteFailure(Throwable x)
{
if (chunk != null)
chunk.release();
chunk = Content.Chunk.next(chunk);
}

@Override
public InvocationType getInvocationType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -237,7 +238,9 @@ protected Action process() throws Exception
@Override
protected void onSuccess()
{
release();
headerBuffer = Retainable.release(headerBuffer);
chunkBuffer = Retainable.release(chunkBuffer);
contentByteBuffer = null;
}

@Override
Expand All @@ -248,21 +251,16 @@ protected void onCompleteSuccess()
}

@Override
protected void onCompleteFailure(Throwable cause)
protected void onFailure(Throwable cause)
{
super.onCompleteFailure(cause);
release();
callback.failed(cause);
}

private void release()
@Override
protected void onCompleteFailure(Throwable cause)
{
if (headerBuffer != null)
headerBuffer.release();
headerBuffer = null;
if (chunkBuffer != null)
chunkBuffer.release();
chunkBuffer = null;
headerBuffer = Retainable.release(headerBuffer);
chunkBuffer = Retainable.release(chunkBuffer);
contentByteBuffer = null;
}
}
Expand Down Expand Up @@ -334,11 +332,16 @@ protected Action process() throws Exception
}
}

@Override
protected void onFailure(Throwable cause)
{
callback.failed(cause);
}

@Override
protected void onCompleteFailure(Throwable cause)
{
release();
callback.failed(cause);
}

private void release()
lorban marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading
Loading