From 6858307760e40d087fcef1b661748d385a0f67d6 Mon Sep 17 00:00:00 2001 From: gregw Date: Mon, 15 Jul 2024 18:18:09 +1000 Subject: [PATCH] Experiment with IteratingCallback The previous semantic of `onCompleteFailure` has been renamed to `onFailure(Throwable)`, which is called immediately (but serialized) on either an abort or a failure. A new `onCompleteFailure(Throwable)` method has been added that is called only after a `failed(throwable)` or a `abort(Throwable)` followed by `succeeded()` or `failed(Throwable)`` No usage has yet been made of the new `onCompleteFailure`, but the ICB implementation has been completely replaced by the one developed in #11876 --- .../jetty/docs/programming/ContentDocs.java | 2 +- .../docs/programming/SelectorManagerDocs.java | 2 +- .../jetty/docs/programming/WebSocketDocs.java | 2 +- .../docs/programming/server/ServerDocs.java | 2 +- .../jetty/client/transport/HttpSender.java | 2 +- .../internal/HttpSenderOverHTTP.java | 6 +- .../eclipse/jetty/fcgi/generator/Flusher.java | 2 +- .../jetty/http2/internal/HTTP2Flusher.java | 2 +- .../jetty/http2/tests/RawHTTP2ProxyTest.java | 4 +- .../eclipse/jetty/http3/ControlFlusher.java | 2 +- .../jetty/http3/InstructionFlusher.java | 2 +- .../eclipse/jetty/http3/MessageFlusher.java | 2 +- .../org/eclipse/jetty/io/IOResources.java | 4 +- .../jetty/io/internal/ContentCopier.java | 4 +- .../jetty/quic/common/QuicConnection.java | 2 +- .../jetty/quic/common/QuicSession.java | 2 +- .../jetty/server/handler/ConnectHandler.java | 2 +- .../handler/gzip/GzipResponseAndCallback.java | 4 +- .../jetty/server/internal/HttpConnection.java | 2 +- .../client/transport/CustomTransportTest.java | 4 +- .../eclipse/jetty/util/IteratingCallback.java | 611 +++++++++++----- .../jetty/util/IteratingNestedCallback.java | 2 +- .../jetty/util/IteratingCallbackTest.java | 665 ++++++++++++++++-- .../websocket/core/WebSocketConnection.java | 4 +- .../websocket/core/internal/FrameFlusher.java | 2 +- .../internal/PerMessageDeflateExtension.java | 4 +- .../websocket/core/util/DemandingFlusher.java | 2 +- .../core/util/TransformingFlusher.java | 4 +- .../core/internal/FrameFlusherTest.java | 4 +- .../ee10/proxy/AsyncMiddleManServlet.java | 2 +- .../jetty/ee10/proxy/AsyncProxyServlet.java | 2 +- .../jetty/ee10/servlet/HttpOutput.java | 14 +- .../ee11/proxy/AsyncMiddleManServlet.java | 2 +- .../jetty/ee11/servlet/HttpOutput.java | 14 +- .../nested/FileBufferedResponseHandler.java | 2 +- .../eclipse/jetty/ee9/nested/HttpOutput.java | 14 +- .../ee9/proxy/AsyncMiddleManServlet.java | 2 +- .../jetty/ee9/proxy/AsyncProxyServlet.java | 2 +- 38 files changed, 1089 insertions(+), 317 deletions(-) diff --git a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java index d406ee1d3bd9..4b932825b6d7 100644 --- a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java +++ b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java @@ -361,7 +361,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable failure) + protected void onFailure(Throwable failure) { // In case of a failure, either on the // read or on the write, release the chunk. diff --git a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/SelectorManagerDocs.java b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/SelectorManagerDocs.java index cc60b78e7f92..4f31cc613301 100644 --- a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/SelectorManagerDocs.java +++ b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/SelectorManagerDocs.java @@ -298,7 +298,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { // The iteration completed with a failure. getEndPoint().close(cause); diff --git a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java index b0a65e64e336..84939d8957d2 100644 --- a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java +++ b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java @@ -541,7 +541,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { x.printStackTrace(); } diff --git a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/server/ServerDocs.java b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/server/ServerDocs.java index a109be5ed6d3..deb03d16822c 100644 --- a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/server/ServerDocs.java +++ b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/server/ServerDocs.java @@ -241,7 +241,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { getEndPoint().close(cause); } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java index 551a210ae0e7..3b505886ebe1 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java @@ -591,7 +591,7 @@ else if (expect100) } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { if (chunk != null) { diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java index 1f6bc4a3f756..cdc0b150df65 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java @@ -248,9 +248,9 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { - super.onCompleteFailure(cause); + super.onFailure(cause); release(); callback.failed(cause); } @@ -335,7 +335,7 @@ protected Action process() throws Exception } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { release(); callback.failed(cause); diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java index 9c64ffb77943..f5431cf9061d 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java @@ -109,7 +109,7 @@ protected void onSuccess() } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { if (active != null) active.failed(x); diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java index 1d3849f066f5..68f9521459ec 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java @@ -344,7 +344,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { release(); diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java index 59827ebe1958..77a4a1d0b722 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java @@ -519,7 +519,7 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { frameInfo.callback.failed(cause); } @@ -673,7 +673,7 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { frameInfo.callback.failed(cause); } diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java index deee66c03b7f..7f4cb53c7db2 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java @@ -122,7 +122,7 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable failure) + protected void onFailure(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("failed to write {} on {}", entries, this, failure); diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java index 7a41ba177c6b..7b73c228179a 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java @@ -118,7 +118,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable failure) + protected void onFailure(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("failed to write buffers on {}", this, failure); diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java index f450bfaf0ec2..a0073fa2c317 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java @@ -118,7 +118,7 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { if (LOG.isDebugEnabled()) LOG.debug("failed to write {} on {}", entry, this, cause); diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java index 41a59c0330a7..0e2fc7f481e3 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java @@ -413,12 +413,12 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { if (retainableByteBuffer != null) retainableByteBuffer.release(); IO.close(channel); - super.onCompleteFailure(x); + super.onFailure(x); } } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java index b4b59f851d4b..bff4b267dea2 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java @@ -77,13 +77,13 @@ protected Action process() throws Throwable } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { if (current != null) { current.release(); current = Content.Chunk.next(current); } - ExceptionUtil.callAndThen(x, source::fail, super::onCompleteFailure); + ExceptionUtil.callAndThen(x, source::fail, super::onFailure); } } diff --git a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java index e5c1d340b659..3dba959a0d6d 100644 --- a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java +++ b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java @@ -377,7 +377,7 @@ public InvocationType getInvocationType() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { entry.callback.failed(cause); QuicConnection.this.close(); diff --git a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index 75ee111e659a..aeb09e59ff09 100644 --- a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -543,7 +543,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable failure) + protected void onFailure(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("failed to write cipher bytes, closing session on {}", QuicSession.this, failure); diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index 51fdb7e10117..9a7a6183e3da 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -768,7 +768,7 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("Failed to write {} bytes {}", filled, TunnelConnection.this, x); diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipResponseAndCallback.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipResponseAndCallback.java index d14d2c68d015..702f765f082d 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipResponseAndCallback.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipResponseAndCallback.java @@ -322,14 +322,14 @@ public GzipBufferCB(boolean complete, Callback callback, ByteBuffer content) } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { if (_deflaterEntry != null) { _deflaterEntry.release(); _deflaterEntry = null; } - super.onCompleteFailure(x); + super.onFailure(x); } @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 2027b6542fc0..57814be4f816 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -910,7 +910,7 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(final Throwable x) + public void onFailure(final Throwable x) { failedCallback(release(), x); } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java index a3a7ecf9a464..7b535efcbfe9 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java @@ -335,7 +335,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { // There was a write error, close the Gateway Channel. channel.close(cause); @@ -378,7 +378,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { // There was a write error, close the Gateway Channel. channel.close(cause); diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 155577f58e24..5f4169f4b720 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -14,8 +14,11 @@ package org.eclipse.jetty.util; import java.io.IOException; +import java.util.Objects; import org.eclipse.jetty.util.thread.AutoLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This specialized callback implements a pattern that allows @@ -41,20 +44,27 @@ *

* Subclasses must implement method {@link #process()} where the * asynchronous sub-task is initiated and a suitable {@link Action} - * is returned to this callback to indicate the overall progress of + * is returned to this callback to indicate the overall progress ofk * the large asynchronous task. * This callback is passed to the asynchronous sub-task, and a call * to {@link #succeeded()} on this callback represents the successful * completion of the asynchronous sub-task, while a call to * {@link #failed(Throwable)} on this callback represents the * completion with a failure of the large asynchronous task. + *

+ * For most purposes, the {@link #succeeded()} and {@link #failed(Throwable)} + * methods of this class should be considered final, and only overridden in + * extraordinary circumstances. Any action taken in such extensions are not + * serialized. */ public abstract class IteratingCallback implements Callback { + private static final Logger LOG = LoggerFactory.getLogger(IteratingCallback.class); + /** * The internal states of this callback. */ - private enum State + enum State { /** * This callback is idle, ready to iterate. @@ -64,48 +74,35 @@ private enum State /** * This callback is just about to call {@link #process()}, * or within it, or just exited from it, either normally - * or by throwing. + * or by throwing. Further actions are waiting for the + * {@link #process()} method to return. */ PROCESSING, /** - * Method {@link #process()} returned {@link Action#SCHEDULED} - * and this callback is waiting for the asynchronous sub-task - * to complete. - */ - PENDING, - - /** - * The asynchronous sub-task was completed successfully - * via a call to {@link #succeeded()} while in - * {@link #PROCESSING} state. - */ - CALLED, - - /** - * The iteration terminated successfully as indicated by - * {@link Action#SUCCEEDED} returned from - * {@link IteratingCallback#process()}. + * The asynchronous sub-task was completed either with + * a call to {@link #succeeded()} or {@link #failed(Throwable)}, whilst in + * {@link #PROCESSING} state. Further actions are waiting for the + * {@link #process()} method to return. */ - SUCCEEDED, + PROCESSING_CALLED, /** - * The iteration terminated with a failure via a call - * to {@link IteratingCallback#failed(Throwable)}. + * Method {@link #process()} returned {@link Action#SCHEDULED} + * and this callback is waiting for the asynchronous sub-task + * to complete via a callback to {@link #succeeded()} or {@link #failed(Throwable)} */ - FAILED, + PENDING, /** - * This callback has been {@link #close() closed} and - * cannot be {@link #reset() reset}. + * This callback is complete. */ - CLOSED, + COMPLETE, /** - * This callback has been {@link #abort(Throwable) aborted}, - * and cannot be {@link #reset() reset}. + * Complete and can't be reset. */ - ABORTED + CLOSED } /** @@ -120,6 +117,7 @@ protected enum Action * for additional events to trigger more work. */ IDLE, + /** * Indicates that {@link #process()} has initiated an asynchronous * sub-task, where the execution has started but the callback @@ -127,6 +125,7 @@ protected enum Action * may have not yet been invoked. */ SCHEDULED, + /** * Indicates that {@link #process()} has completed the whole * iteration successfully. @@ -137,7 +136,8 @@ protected enum Action private final AutoLock _lock = new AutoLock(); private State _state; private Throwable _failure; - private boolean _iterate; + private boolean _reprocess; + private boolean _aborted; protected IteratingCallback() { @@ -146,7 +146,7 @@ protected IteratingCallback() protected IteratingCallback(boolean needReset) { - _state = needReset ? State.SUCCEEDED : State.IDLE; + _state = needReset ? State.COMPLETE : State.IDLE; } /** @@ -179,8 +179,24 @@ protected void onSuccess() { } + /** + * Invoked when the overall task has been {@link #abort(Throwable) aborted} or {@link #failed(Throwable) failed}. + *

+ * Calls to this method are serialized with respect to {@link #onAborted(Throwable)}, {@link #process()}, + * {@link #onCompleteFailure(Throwable)} and {@link #onCompleted(Throwable)}. + * + * @param cause The cause of the failure or abort + */ + protected void onFailure(Throwable cause) + { + } + /** * Invoked when the overall task has completed successfully. + *

+ * Calls to this method are serialized with respect to {@link #process()}, {@link #onAborted(Throwable)} + * and {@link #onCompleted(Throwable)}. + * If this method is called, then {@link #onCompleteFailure(Throwable)} ()} will never be called. * * @see #onCompleteFailure(Throwable) */ @@ -190,6 +206,10 @@ protected void onCompleteSuccess() /** * Invoked when the overall task has completed with a failure. + *

+ * Calls to this method are serialized with respect to {@link #process()}, {@link #onAborted(Throwable)} + * and {@link #onCompleted(Throwable)}. + * If this method is called, then {@link #onCompleteSuccess()} will never be called. * * @param cause the throwable to indicate cause of failure * @see #onCompleteSuccess() @@ -198,6 +218,50 @@ protected void onCompleteFailure(Throwable cause) { } + /** + * Invoked when the overall task has been aborted. + *

+ * Calls to this method are serialized with respect to {@link #process()}, {@link #onCompleteFailure(Throwable)} + * and {@link #onCompleted(Throwable)}. + * If this method is called, then {@link #onCompleteSuccess()} will never be called. + *

+ * The default implementation of this method calls {@link #failed(Throwable)}. Overridden implementations of + * this method SHOULD NOT call {@code super.onAborted(Throwable)}. + * + * @param cause The cause of the abort + */ + protected void onAborted(Throwable cause) + { + } + + /** + * Invoked when the overall task has completed. + *

+ * Calls to this method are serialized with respect to {@link #process()} and {@link #onAborted(Throwable)}. + * The default implementation of this method will call either {@link #onCompleteSuccess()} or {@link #onCompleteFailure(Throwable)} + * thus implementations of this method should always call {@code super.onCompleted(Throwable)}. + * + * @param causeOrNull the cause of any {@link #abort(Throwable) abort} or {@link #failed(Throwable) failure}, + * else {@code null} for {@link #succeeded() success}. + */ + protected void onCompleted(Throwable causeOrNull) + { + if (causeOrNull == null) + onCompleteSuccess(); + else + onCompleteFailure(causeOrNull); + } + + private void doCompleteSuccess() + { + onCompleted(null); + } + + private void doCompleteFailure(Throwable cause) + { + onCompleted(cause); + } + /** * This method must be invoked by applications to start the processing * of asynchronous sub-tasks. @@ -215,28 +279,18 @@ public void iterate() { switch (_state) { - case PENDING: - case CALLED: - // process will be called when callback is handled - break; - case IDLE: _state = State.PROCESSING; process = true; break; case PROCESSING: - _iterate = true; - break; - - case FAILED: - case SUCCEEDED: + case PROCESSING_CALLED: + _reprocess = true; break; - case CLOSED: - case ABORTED: default: - throw new IllegalStateException(toString()); + break; } } if (process) @@ -248,21 +302,24 @@ private void processing() // This should only ever be called when in processing state, however a failed or close call // may happen concurrently, so state is not assumed. - boolean notifyCompleteSuccess = false; - Throwable notifyCompleteFailure = null; + boolean completeSuccess = false; + Throwable abortDoCompleteFailure = null; + Throwable completeFailure = null; + Throwable onAbort = null; // While we are processing processing: while (true) { // Call process to get the action that we have to take. - Action action = null; + Action action; try { action = process(); } catch (Throwable x) { + action = null; failed(x); // Fall through to possibly invoke onCompleteFailure(). } @@ -271,72 +328,104 @@ private void processing() // acted on the action we have just received try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("processing {} {}", action, this); + switch (_state) { case PROCESSING: { - if (action != null) + if (action == null) + break processing; + switch (action) { - switch (action) + case IDLE: { - case IDLE: + if (_aborted) { - // Has iterate been called while we were processing? - if (_iterate) - { - // yes, so skip idle and keep processing - _iterate = false; - continue; - } - - // No, so we can go idle - _state = State.IDLE; + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + abortDoCompleteFailure = _failure; break processing; } - case SCHEDULED: + + // Has iterate been called while we were processing? + if (_reprocess) { - // we won the race against the callback, so the callback has to process and we can break processing - _state = State.PENDING; - break processing; + // yes, so skip idle and keep processing + _reprocess = false; + continue; + } + + // No, so we can go idle + _state = State.IDLE; + break processing; + } + case SCHEDULED: + { + // we won the race against the callback, so the callback has to process and we can break processing + _state = State.PENDING; + if (_aborted) + { + onAbort = _failure; + _failure = new AbortingException(onAbort); } - case SUCCEEDED: + break processing; + } + case SUCCEEDED: + { + // we lost the race against the callback, + _reprocess = false; + if (_aborted) { - // we lost the race against the callback, - _iterate = false; - _state = State.SUCCEEDED; - notifyCompleteSuccess = true; - break processing; + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + abortDoCompleteFailure = _failure; } - default: + else { - break; + _state = State.COMPLETE; + completeSuccess = true; } + break processing; + } + default: + { + break; } } throw new IllegalStateException(String.format("%s[action=%s]", this, action)); } - case CALLED: + case PROCESSING_CALLED: { + if (action != Action.SCHEDULED && action != null) + { + _state = State.CLOSED; + abortDoCompleteFailure = new IllegalStateException("Action not scheduled"); + if (_failure == null) + { + _failure = abortDoCompleteFailure; + } + else + { + ExceptionUtil.addSuppressedIfNotAssociated(_failure, onAbort); + abortDoCompleteFailure = _failure; + } + break processing; + } + if (_failure != null) + { + if (_aborted) + abortDoCompleteFailure = _failure; + else + completeFailure = _failure; + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + break processing; + } callOnSuccess = true; - if (action != Action.SCHEDULED) - throw new IllegalStateException(String.format("%s[action=%s]", this, action)); - // we lost the race, so we have to keep processing _state = State.PROCESSING; - continue; + break; } - case FAILED: - case CLOSED: - case ABORTED: - notifyCompleteFailure = _failure; - break processing; - - case SUCCEEDED: - break processing; - - case IDLE: - case PENDING: default: throw new IllegalStateException(String.format("%s[action=%s]", this, action)); } @@ -347,47 +436,74 @@ private void processing() onSuccess(); } } - - if (notifyCompleteSuccess) - onCompleteSuccess(); - else if (notifyCompleteFailure != null) - onCompleteFailure(notifyCompleteFailure); + if (abortDoCompleteFailure != null) + ExceptionUtil.callAndThen(abortDoCompleteFailure, this::doOnAbortedOnFailure, this::doCompleteFailure); + else if (completeSuccess) + doCompleteSuccess(); + else if (completeFailure != null) + ExceptionUtil.callAndThen(completeFailure, this::onFailure, this::doCompleteFailure); + else if (onAbort != null) + ExceptionUtil.callAndThen(onAbort, this::doOnAbortedOnFailure, this::doAbortPendingCompletion); } /** * Method to invoke when the asynchronous sub-task succeeds. *

- * This method should be considered final for all practical purposes. - *

+ * For most purposes, this method should be considered {@code final} and should only be + * overridden in extraordinary circumstances. + * Subclasses that override this method must always call {@code super.succeeded()}. + * Such overridden methods are not serialized with respect to {@link #process()}, {@link #onCompleteSuccess()}, + * {@link #onCompleteFailure(Throwable)}, nor {@link #onAborted(Throwable)}. They should not act on nor change any + * fields that may be used by those methods. * Eventually, {@link #onSuccess()} is * called, either by the caller thread or by the processing * thread. */ @Override - public void succeeded() + public final void succeeded() { boolean process = false; + Throwable completeFailure = null; try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("succeeded {}", this); switch (_state) { case PROCESSING: { - _state = State.CALLED; + // Another thread is processing, so we just tell it the state + _state = State.PROCESSING_CALLED; break; } case PENDING: { - _state = State.PROCESSING; - process = true; + if (_aborted) + { + if (_failure instanceof AbortingException) + { + // Another thread is still calling onAborted, so we will let it do the completion + _state = _failure.getCause() instanceof ClosedException ? State.CLOSED : State.COMPLETE; + } + else + { + // The onAborted call is complete, so we must do the completion + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + completeFailure = _failure; + } + } + else + { + // No other thread is processing, so we will do the processing + _state = State.PROCESSING; + process = true; + } break; } - case FAILED: - case CLOSED: - case ABORTED: + case COMPLETE, CLOSED: { - // Too late! - break; + // Too late + return; } default: { @@ -397,8 +513,11 @@ public void succeeded() } if (process) { - onSuccess(); - processing(); + ExceptionUtil.callAndThen(this::onSuccess, this::processing); + } + else if (completeFailure != null) + { + doCompleteFailure(completeFailure); } } @@ -407,47 +526,84 @@ public void succeeded() * or to fail the overall asynchronous task and therefore * terminate the iteration. *

- * This method should be considered final for all practical purposes. - *

* Eventually, {@link #onCompleteFailure(Throwable)} is * called, either by the caller thread or by the processing * thread. - * + *

+ * For most purposes, this method should be considered {@code final} and should only be + * overridden in extraordinary circumstances. + * Subclasses that override this method must always call {@code super.succeeded()}. + * Such overridden methods are not serialized with respect to {@link #process()}, {@link #onCompleteSuccess()}, + * {@link #onCompleteFailure(Throwable)}, nor {@link #onAborted(Throwable)}. They should not act on nor change any + * fields that may be used by those methods. * @see #isFailed() */ @Override - public void failed(Throwable x) + public final void failed(Throwable cause) { - boolean failure = false; + cause = Objects.requireNonNullElseGet(cause, IOException::new); + + Throwable completeFailure = null; + Throwable abortCompletion = null; try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("failed {}", this, cause); switch (_state) { - case CALLED: - case SUCCEEDED: - case FAILED: - case CLOSED: - case ABORTED: - // Too late! + case PROCESSING: + { + // Another thread is processing, so we just tell it the state + _state = State.PROCESSING_CALLED; + if (_failure == null) + _failure = cause; + else + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); break; + } case PENDING: { - _state = State.FAILED; - failure = true; + if (_aborted) + { + if (_failure instanceof AbortingException) + { + // Another thread is still calling onAborted, so we will let it do the completion + ExceptionUtil.addSuppressedIfNotAssociated(_failure.getCause(), cause); + _state = _failure.getCause() instanceof ClosedException ? State.CLOSED : State.COMPLETE; + } + else + { + // The onAborted call is complete, so we must do the completion + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + abortCompletion = _failure; + } + } + else + { + // No other thread is processing, so we will do the processing + _state = State.COMPLETE; + _failure = cause; + completeFailure = _failure; + } break; } - case PROCESSING: + case COMPLETE, CLOSED: { - _state = State.FAILED; - _failure = x; - break; + // Too late + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + return; } default: + { throw new IllegalStateException(toString()); + } } } - if (failure) - onCompleteFailure(x); + if (completeFailure != null) + ExceptionUtil.callAndThen(completeFailure, this::onFailure, this::doCompleteFailure); + else if (abortCompletion != null) + doCompleteFailure(abortCompletion); } /** @@ -459,37 +615,63 @@ public void failed(Throwable x) * * @see #isClosed() */ - public void close() + public final void close() { - String failure = null; + Throwable onAbort = null; + Throwable onAbortDoCompleteFailure = null; + try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("close {}", this); switch (_state) { - case IDLE: - case SUCCEEDED: - case FAILED: - _state = State.CLOSED; - break; - - case PROCESSING: - _failure = new IOException(String.format("Close %s in state %s", this, _state)); + case IDLE -> + { + // Nothing happening so we can abort and complete _state = State.CLOSED; - break; + _failure = new ClosedException(); + onAbortDoCompleteFailure = _failure; + } + case PROCESSING, PROCESSING_CALLED -> + { + // Another thread is processing, so we just tell it the state and let it handle it + if (_aborted) + { + ExceptionUtil.addSuppressedIfNotAssociated(_failure, new ClosedException()); + } + else + { + _aborted = true; + _failure = new ClosedException(); + } + } - case CLOSED: - case ABORTED: - break; + case PENDING -> + { + // We are waiting for the callback, so we can only call onAbort and then keep waiting + onAbort = new ClosedException(); + _failure = new AbortingException(onAbort); + _aborted = true; + } - default: - failure = String.format("Close %s in state %s", this, _state); + case COMPLETE -> + { _state = State.CLOSED; - break; + } + + case CLOSED -> + { + // too late + return; + } } } - if (failure != null) - onCompleteFailure(new IOException(failure)); + if (onAbort != null) + ExceptionUtil.callAndThen(onAbort, this::doOnAbortedOnFailure, this::doAbortPendingCompletion); + else if (onAbortDoCompleteFailure != null) + ExceptionUtil.callAndThen(onAbortDoCompleteFailure, this::doOnAbortedOnFailure, this::doCompleteFailure); } /** @@ -498,49 +680,106 @@ public void close() * ultimately be invoked, either during this call or later after * any call to {@link #process()} has returned.

* - * @param failure the cause of the abort + * @param cause the cause of the abort + * @return {@code true} if abort was called before the callback was complete. * @see #isAborted() */ - public void abort(Throwable failure) + public final boolean abort(Throwable cause) { - boolean abort = false; + cause = Objects.requireNonNullElseGet(cause, Throwable::new); + + boolean onAbort = false; + boolean onAbortDoCompleteFailure = false; try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("abort {}", this, cause); + + // Are we already aborted? + if (_aborted) + { + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + return false; + } + switch (_state) { - case SUCCEEDED: - case FAILED: - case CLOSED: - case ABORTED: + case IDLE: { - // Too late. + // Nothing happening so we can abort and complete + _state = State.COMPLETE; + _failure = cause; + _aborted = true; + onAbortDoCompleteFailure = true; break; } - case IDLE: - case PENDING: + case PROCESSING: { - _failure = failure; - _state = State.ABORTED; - abort = true; + // Another thread is processing, so we just tell it the state and let it handle everything + _failure = cause; + _aborted = true; break; } - case PROCESSING: - case CALLED: + case PROCESSING_CALLED: { - _failure = failure; - _state = State.ABORTED; + // Another thread is processing, but we have already succeeded or failed. + if (_failure == null) + _failure = cause; + else + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + _aborted = true; break; } - default: - throw new IllegalStateException(toString()); + case PENDING: + { + // We are waiting for the callback, so we can only call onAbort and then keep waiting + onAbort = true; + _failure = new AbortingException(cause); + _aborted = true; + break; + } + + case COMPLETE, CLOSED: + { + // too late + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + return false; + } } } - if (abort) - onCompleteFailure(failure); + if (onAbortDoCompleteFailure) + ExceptionUtil.callAndThen(cause, this::doOnAbortedOnFailure, this::doCompleteFailure); + else if (onAbort) + ExceptionUtil.callAndThen(cause, this::doOnAbortedOnFailure, this::doAbortPendingCompletion); + + return true; + } + + private void doOnAbortedOnFailure(Throwable cause) + { + ExceptionUtil.callAndThen(cause, this::onAborted, this::onFailure); + } + + private void doAbortPendingCompletion() + { + Throwable doCompleteFailure = null; + try (AutoLock ignored = _lock.lock()) + { + _failure = _failure.getCause(); + + if (Objects.requireNonNull(_state) != State.PENDING) + { + // the callback completed, one way or another, so it is up to use to do the completion + doCompleteFailure = _failure; + } + } + + if (doCompleteFailure != null) + ExceptionUtil.call(doCompleteFailure, this::doCompleteFailure); } /** @@ -561,7 +800,7 @@ public boolean isClosed() { try (AutoLock ignored = _lock.lock()) { - return _state == State.CLOSED; + return _state == State.CLOSED || _failure instanceof ClosedException; } } @@ -572,7 +811,7 @@ public boolean isFailed() { try (AutoLock ignored = _lock.lock()) { - return _state == State.FAILED; + return _failure != null; } } @@ -585,7 +824,7 @@ public boolean isSucceeded() { try (AutoLock ignored = _lock.lock()) { - return _state == State.SUCCEEDED; + return _state == State.COMPLETE && _failure == null; } } @@ -596,7 +835,7 @@ public boolean isAborted() { try (AutoLock ignored = _lock.lock()) { - return _state == State.ABORTED; + return _aborted; } } @@ -618,11 +857,10 @@ public boolean reset() case IDLE: return true; - case SUCCEEDED: - case FAILED: + case COMPLETE: _state = State.IDLE; _failure = null; - _iterate = false; + _reprocess = false; return true; default: @@ -634,6 +872,31 @@ public boolean reset() @Override public String toString() { - return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), _state); + try (AutoLock ignored = _lock.lock()) + { + return String.format("%s@%x[%s, %b, %s]", getClass().getSimpleName(), hashCode(), _state, _aborted, _failure); + } + } + + private static class ClosedException extends Exception + { + ClosedException() + { + super("Closed"); + } + + ClosedException(Throwable suppressed) + { + this(); + ExceptionUtil.addSuppressedIfNotAssociated(this, suppressed); + } + } + + private static class AbortingException extends Exception + { + AbortingException(Throwable cause) + { + super(cause.getMessage(), cause); + } } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java index 745e8b800d7d..1d409cd859ed 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java @@ -53,7 +53,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { _callback.failed(x); } diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java index 00f6b007b7dd..cd04a884f46f 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java @@ -13,19 +13,37 @@ package org.eclipse.jetty.util; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicMarkableReference; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import org.awaitility.Awaitility; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; +import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; - +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class IteratingCallbackTest @@ -202,46 +220,31 @@ protected Action process() { processed++; - switch (i--) + return switch (i--) { - case 5: + case 5, 2 -> + { succeeded(); - return Action.SCHEDULED; - - case 4: + yield Action.SCHEDULED; + } + case 4, 1 -> + { scheduler.schedule(successTask, 5, TimeUnit.MILLISECONDS); - return Action.SCHEDULED; - - case 3: - scheduler.schedule(new Runnable() - { - @Override - public void run() - { - idle.countDown(); - } - }, 5, TimeUnit.MILLISECONDS); - return Action.IDLE; - - case 2: - succeeded(); - return Action.SCHEDULED; - - case 1: - scheduler.schedule(successTask, 5, TimeUnit.MILLISECONDS); - return Action.SCHEDULED; - - case 0: - return Action.SUCCEEDED; - - default: - throw new IllegalStateException(); - } + yield Action.SCHEDULED; + } + case 3 -> + { + scheduler.schedule(idle::countDown, 5, TimeUnit.MILLISECONDS); + yield Action.IDLE; + } + case 0 -> Action.SUCCEEDED; + default -> throw new IllegalStateException(); + }; } }; cb.iterate(); - idle.await(10, TimeUnit.SECONDS); + assertTrue(idle.await(10, TimeUnit.SECONDS)); assertTrue(cb.isIdle()); cb.iterate(); @@ -252,25 +255,21 @@ public void run() @Test public void testCloseDuringProcessingReturningScheduled() throws Exception { - testCloseDuringProcessing(IteratingCallback.Action.SCHEDULED); - } - - @Test - public void testCloseDuringProcessingReturningSucceeded() throws Exception - { - testCloseDuringProcessing(IteratingCallback.Action.SUCCEEDED); - } - - private void testCloseDuringProcessing(final IteratingCallback.Action action) throws Exception - { + final CountDownLatch abortLatch = new CountDownLatch(1); final CountDownLatch failureLatch = new CountDownLatch(1); IteratingCallback callback = new IteratingCallback() { @Override - protected Action process() throws Exception + protected Action process() { close(); - return action; + return Action.SCHEDULED; + } + + @Override + protected void onAborted(Throwable cause) + { + abortLatch.countDown(); } @Override @@ -282,27 +281,45 @@ protected void onCompleteFailure(Throwable cause) callback.iterate(); - assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + assertFalse(failureLatch.await(100, TimeUnit.MILLISECONDS)); + assertTrue(abortLatch.await(1000000000, TimeUnit.SECONDS)); + assertTrue(callback.isClosed()); + + callback.succeeded(); + assertTrue(failureLatch.await(1, TimeUnit.SECONDS)); + assertTrue(callback.isFailed()); + assertTrue(callback.isClosed()); } - private abstract static class TestCB extends IteratingCallback + @Test + public void testCloseDuringProcessingReturningSucceeded() throws Exception { - protected Runnable successTask = new Runnable() + final CountDownLatch failureLatch = new CountDownLatch(1); + IteratingCallback callback = new IteratingCallback() { @Override - public void run() + protected Action process() { - succeeded(); + close(); + return Action.SUCCEEDED; } - }; - protected Runnable failTask = new Runnable() - { + @Override - public void run() + protected void onCompleteFailure(Throwable cause) { - failed(new Exception("testing failure")); + failureLatch.countDown(); } }; + + callback.iterate(); + + assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + } + + private abstract static class TestCB extends IteratingCallback + { + protected Runnable successTask = this::succeeded; + protected Runnable failTask = () -> failed(new Exception("testing failure")); protected CountDownLatch completed = new CountDownLatch(1); protected int processed = 0; @@ -320,8 +337,7 @@ public void onCompleteFailure(Throwable x) boolean waitForComplete() throws InterruptedException { - completed.await(10, TimeUnit.SECONDS); - return isSucceeded(); + return completed.await(10, TimeUnit.SECONDS) && isSucceeded(); } } @@ -390,57 +406,541 @@ protected void onCompleteFailure(Throwable cause) assertEquals(1, count.get()); - // Aborting should not iterate. icb.abort(new Exception()); assertTrue(ocfLatch.await(5, TimeUnit.SECONDS)); + assertTrue(icb.isFailed()); assertTrue(icb.isAborted()); assertEquals(1, count.get()); } @Test - public void testWhenProcessingAbortSerializesOnCompleteFailure() throws Exception + public void testWhenPendingAbortSerializesOnCompleteFailure() throws Exception { - AtomicInteger count = new AtomicInteger(); - CountDownLatch ocfLatch = new CountDownLatch(1); + AtomicReference aborted = new AtomicReference<>(); + CountDownLatch abortLatch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicMarkableReference completed = new AtomicMarkableReference<>(null, false); + IteratingCallback icb = new IteratingCallback() { @Override protected Action process() throws Throwable { - count.incrementAndGet(); - abort(new Exception()); - - // After calling abort, onCompleteFailure() must not be called yet. - assertFalse(ocfLatch.await(1, TimeUnit.SECONDS)); - return Action.SCHEDULED; } + @Override + protected void onAborted(Throwable cause) + { + aborted.set(cause); + ExceptionUtil.call(abortLatch::await, Throwable::printStackTrace); + } + @Override protected void onCompleteFailure(Throwable cause) { - ocfLatch.countDown(); + failure.set(cause); + } + + @Override + protected void onCompleted(Throwable causeOrNull) + { + completed.set(causeOrNull, true); + super.onCompleted(causeOrNull); } }; icb.iterate(); - assertEquals(1, count.get()); + assertThat(icb.toString(), containsString("[PENDING, false,")); - assertTrue(ocfLatch.await(5, TimeUnit.SECONDS)); - assertTrue(icb.isAborted()); + Throwable cause = new Throwable("test abort"); + new Thread(() -> icb.abort(cause)).start(); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> icb.toString().contains("[PENDING, true,")); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> aborted.get() != null); - // Calling succeeded() won't cause further iterations. icb.succeeded(); - assertEquals(1, count.get()); + // We are now complete, but callbacks have not yet been done + assertThat(icb.toString(), containsString("[COMPLETE, true,")); + assertThat(failure.get(), nullValue()); + assertFalse(completed.isMarked()); + + abortLatch.countDown(); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(completed::isMarked); + assertThat(failure.get(), sameInstance(cause)); + assertThat(completed.getReference(), sameInstance(cause)); + } + + public enum Event + { + PROCESSED, + ABORTED, + SUCCEEDED, + FAILED + } + + public static Stream> serializedEvents() + { + return Stream.of( + List.of(Event.PROCESSED, Event.ABORTED, Event.SUCCEEDED), + List.of(Event.PROCESSED, Event.SUCCEEDED, Event.ABORTED), + + List.of(Event.SUCCEEDED, Event.PROCESSED, Event.ABORTED), + List.of(Event.SUCCEEDED, Event.ABORTED, Event.PROCESSED), + + List.of(Event.ABORTED, Event.SUCCEEDED, Event.PROCESSED), + List.of(Event.ABORTED, Event.PROCESSED, Event.SUCCEEDED), + + List.of(Event.PROCESSED, Event.ABORTED, Event.FAILED), + List.of(Event.PROCESSED, Event.FAILED, Event.ABORTED), + + List.of(Event.FAILED, Event.PROCESSED, Event.ABORTED), + List.of(Event.FAILED, Event.ABORTED, Event.PROCESSED), + + List.of(Event.ABORTED, Event.FAILED, Event.PROCESSED), + List.of(Event.ABORTED, Event.PROCESSED, Event.FAILED) + ); + } + + @ParameterizedTest + @MethodSource("serializedEvents") + public void testSerializesProcessAbortCompletion(List events) throws Exception + { + AtomicReference aborted = new AtomicReference<>(); + CountDownLatch processingLatch = new CountDownLatch(1); + CountDownLatch abortLatch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicMarkableReference completed = new AtomicMarkableReference<>(null, false); + + + Throwable cause = new Throwable("test abort"); + + IteratingCallback icb = new IteratingCallback() + { + @Override + protected Action process() throws Throwable + { + abort(cause); + ExceptionUtil.call(processingLatch::await, Throwable::printStackTrace); + return Action.SCHEDULED; + } + + @Override + protected void onAborted(Throwable cause) + { + aborted.set(cause); + ExceptionUtil.call(abortLatch::await, Throwable::printStackTrace); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + failure.set(cause); + } + + @Override + protected void onCompleted(Throwable causeOrNull) + { + completed.set(causeOrNull, true); + super.onCompleted(causeOrNull); + } + }; + + new Thread(icb::iterate).start(); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> icb.toString().contains("[PROCESSING, true,")); + + // we have aborted, but onAborted not yet called + assertThat(aborted.get(), nullValue()); + + int count = 0; + for (Event event : events) + { + switch (event) + { + case PROCESSED -> + { + processingLatch.countDown(); + // We can call aborted + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> aborted.get() != null); + } + case ABORTED -> + { + abortLatch.countDown(); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> !icb.toString().contains("AbortingException")); + } + case SUCCEEDED -> icb.succeeded(); + + case FAILED -> icb.failed(new Throwable("failure")); + } + + if (++count < 3) + { + // Not complete yet + assertThat(failure.get(), nullValue()); + assertFalse(completed.isMarked()); + } + + // Extra aborts ignored + assertFalse(icb.abort(new Throwable("ignored"))); + } + + // When the callback is succeeded, the completion events can be called + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(completed::isMarked); + assertThat(failure.get(), sameInstance(cause)); + assertThat(completed.getReference(), sameInstance(cause)); + } + + @Test + public void testICBSuccess() throws Exception + { + TestIteratingCB callback = new TestIteratingCB(); + callback.iterate(); + callback.succeeded(); + assertTrue(callback._completed.await(1, TimeUnit.SECONDS)); + assertThat(callback._onFailure.get(), nullValue()); + assertThat(callback._completion.getReference(), Matchers.nullValue()); + assertTrue(callback._completion.isMarked()); + + // Everything now a noop + assertFalse(callback.abort(new Throwable())); + callback.failed(new Throwable()); + assertThat(callback._completion.getReference(), Matchers.nullValue()); + assertThat(callback._completed.getCount(), is(0L)); + + callback.checkNoBadCalls(); + } + + @Test + public void testICBFailure() throws Exception + { + Throwable failure = new Throwable(); + TestIteratingCB callback = new TestIteratingCB(); + callback.iterate(); + callback.failed(failure); + assertTrue(callback._completed.await(1, TimeUnit.SECONDS)); + assertThat(callback._onFailure.get(), sameInstance(failure)); + assertThat(callback._completion.getReference(), Matchers.sameInstance(failure)); + assertTrue(callback._completion.isMarked()); + + // Everything now a noop, other than suppression + callback.succeeded(); + Throwable late = new Throwable(); + assertFalse(callback.abort(late)); + assertFalse(ExceptionUtil.areNotAssociated(failure, late)); + assertThat(callback._completion.getReference(), Matchers.sameInstance(failure)); + assertThat(callback._completed.getCount(), is(0L)); + + callback.checkNoBadCalls(); + } + + @Test + public void testICBAbortSuccess() throws Exception + { + TestIteratingCB callback = new TestIteratingCB(); + callback.iterate(); + + Throwable abort = new Throwable(); + callback.abort(abort); + assertFalse(callback._completed.await(100, TimeUnit.MILLISECONDS)); + assertThat(callback._onFailure.get(), sameInstance(abort)); + assertThat(callback._completion.getReference(), Matchers.sameInstance(abort)); + assertFalse(callback._completion.isMarked()); + + callback.succeeded(); + assertThat(callback._completion.getReference(), Matchers.sameInstance(abort)); + assertThat(callback._completed.getCount(), is(0L)); + + Throwable late = new Throwable(); + callback.failed(late); + assertFalse(callback.abort(late)); + assertTrue(ExceptionUtil.areAssociated(abort, late)); + assertTrue(ExceptionUtil.areAssociated(callback._onFailure.get(), late)); + assertThat(callback._completion.getReference(), Matchers.sameInstance(abort)); + assertThat(callback._completed.getCount(), is(0L)); + + callback.checkNoBadCalls(); + } + + public static Stream abortTests() + { + List tests = new ArrayList<>(); + + for (IteratingCallback.State state : IteratingCallback.State.values()) + { + String name = state.name(); + + if (name.contains("PROCESSING")) + { + for (IteratingCallback.Action action : IteratingCallback.Action.values()) + { + if (name.contains("CALLED")) + { + if (action == IteratingCallback.Action.SCHEDULED) + { + tests.add(Arguments.of(name, action.toString(), Boolean.TRUE)); + tests.add(Arguments.of(name, action.toString(), Boolean.FALSE)); + } + } + else if (action == IteratingCallback.Action.SCHEDULED) + { + tests.add(Arguments.of(name, action.toString(), Boolean.TRUE)); + tests.add(Arguments.of(name, action.toString(), Boolean.FALSE)); + } + else + { + tests.add(Arguments.of(name, action.toString(), null)); + } + } + } + else if (name.equals("COMPLETE") || name.contains("PENDING")) + { + tests.add(Arguments.of(name, null, Boolean.TRUE)); + tests.add(Arguments.of(name, null, Boolean.FALSE)); + } + else + { + tests.add(Arguments.of(name, null, null)); + } + } + + return tests.stream(); + } + + @ParameterizedTest + @MethodSource("abortTests") + public void testAbortInEveryState(String state, String action, Boolean success) throws Exception + { + CountDownLatch processLatch = new CountDownLatch(1); + + AtomicReference onAbort = new AtomicReference<>(); + AtomicReference onFailure = new AtomicReference<>(null); + AtomicMarkableReference onCompleted = new AtomicMarkableReference<>(null, false); + + Throwable cause = new Throwable("abort"); + Throwable failure = new Throwable("failure"); + AtomicInteger badCalls = new AtomicInteger(0); + + IteratingCallback callback = new IteratingCallback() + { + @Override + protected Action process() throws Throwable + { + if (state.contains("CALLED")) + { + if (success) + succeeded(); + else + failed(failure); + } + + if (state.contains("PENDING")) + return Action.SCHEDULED; + + if (state.equals("COMPLETE")) + { + if (success) + return Action.SUCCEEDED; + failed(new Throwable("Complete Failure")); + return Action.SCHEDULED; + } + + if (state.equals("CLOSED")) + { + close(); + return Action.SUCCEEDED; + } + + processLatch.await(); + return IteratingCallback.Action.valueOf(action); + } + + @Override + protected void onFailure(Throwable cause) + { + if (!onFailure.compareAndSet(null, cause)) + badCalls.incrementAndGet(); + } + + @Override + protected void onAborted(Throwable cause) + { + if (!onAbort.compareAndSet(null, cause)) + badCalls.incrementAndGet(); + } + + @Override + protected void onCompleted(Throwable causeOrNull) + { + onCompleted.set(causeOrNull, true); + super.onCompleted(causeOrNull); + } + }; + + if (!state.equals("IDLE")) + { + new Thread(callback::iterate).start(); + } + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> callback.toString().contains(state)); + assertThat(callback.toString(), containsString("[" + state + ",")); + onAbort.set(null); + + if (success == Boolean.FALSE && (state.equals("COMPLETE") || state.equals("CLOSED"))) + { + // We must be failed already + assertThat(onFailure.get(), notNullValue()); + } + + boolean aborted = callback.abort(cause); + + // Check abort in completed state + if (state.equals("COMPLETE") || state.equals("CLOSED")) + { + assertThat(aborted, is(false)); + assertThat(onAbort.get(), nullValue()); + assertTrue(onCompleted.isMarked()); + if (success == Boolean.TRUE) + assertThat(onCompleted.getReference(), nullValue()); + else + assertThat(onCompleted.getReference(), notNullValue()); + return; + } + + // Check abort in non completed state + assertThat(aborted, is(true)); + + if (state.contains("PROCESSING")) + { + processLatch.countDown(); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> !callback.toString().contains("PROCESSING")); + + if (action.equals("SCHEDULED")) + { + if (success) + { + callback.succeeded(); + } + else + { + Throwable failureAfterAbort = new Throwable("failure after abort"); + callback.failed(failureAfterAbort); + assertThat(onFailure.get(), not(sameInstance(failureAfterAbort))); + assertTrue(ExceptionUtil.areAssociated(onFailure.get(), failureAfterAbort)); + } + } + } + else if (state.contains("PENDING")) + { + if (success) + callback.succeeded(); + else + callback.failed(new Throwable("failure after abort")); + } + + assertTrue(onCompleted.isMarked()); + + if (state.contains("CALLED") && !success) + { + assertThat(onCompleted.getReference(), sameInstance(failure)); + assertThat(onAbort.get(), sameInstance(failure)); + } + else + { + assertThat(onCompleted.getReference(), sameInstance(cause)); + assertThat(onAbort.get(), sameInstance(cause)); + } + + assertThat(badCalls.get(), is(0)); + } + + private static class TestIteratingCB extends IteratingCallback + { + final AtomicInteger _count; + final AtomicInteger _badCalls = new AtomicInteger(0); + final AtomicBoolean _onSuccess = new AtomicBoolean(); + final AtomicReference _onFailure = new AtomicReference<>(); + final AtomicMarkableReference _completion = new AtomicMarkableReference<>(null, false); + final CountDownLatch _completed = new CountDownLatch(2); + + private TestIteratingCB() + { + this(1); + } + + private TestIteratingCB(int count) + { + _count = new AtomicInteger(count); + } + + @Override + protected Action process() + { + return _count.getAndDecrement() == 0 ? Action.SUCCEEDED : Action.SCHEDULED; + } + + @Override + protected void onAborted(Throwable cause) + { + _completion.compareAndSet(null, cause, false, false); + } + + @Override + protected void onSuccess() + { + if (!_onSuccess.compareAndSet(false, true)) + _badCalls.incrementAndGet(); + } + + @Override + protected void onFailure(Throwable cause) + { + if (!_onFailure.compareAndSet(null, cause)) + _badCalls.incrementAndGet(); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + if (_completion.compareAndSet(null, cause, false, true)) + _completed.countDown(); + + Throwable failure = _completion.getReference(); + if (failure != null && _completion.compareAndSet(failure, failure, false, true)) + _completed.countDown(); + } + + @Override + protected void onCompleteSuccess() + { + if (_completion.compareAndSet(null, null, false, true)) + _completed.countDown(); + } + + @Override + protected void onCompleted(Throwable causeOrNull) + { + if (_completion.isMarked()) + _badCalls.incrementAndGet(); + super.onCompleted(causeOrNull); + _completed.countDown(); + } + + public void checkNoBadCalls() + { + assertThat(_badCalls.get(), is(0)); + } } @Test public void testOnSuccessCalledDespiteISE() throws Exception { CountDownLatch latch = new CountDownLatch(1); + AtomicReference aborted = new AtomicReference<>(); IteratingCallback icb = new IteratingCallback() { @Override @@ -451,13 +951,22 @@ protected Action process() } @Override - protected void onSuccess() + protected void onAborted(Throwable cause) + { + aborted.set(cause); + super.onAborted(cause); + } + + @Override + protected void onCompleted(Throwable causeOrNull) { + super.onCompleted(causeOrNull); latch.countDown(); } }; - assertThrows(IllegalStateException.class, icb::iterate); + icb.iterate(); assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertThat(aborted.get(), instanceOf(IllegalStateException.class)); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java index 698507ece9db..095c01071a47 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java @@ -635,10 +635,10 @@ private Flusher(Scheduler scheduler, int bufferSize, Generator generator, EndPoi } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { coreSession.processConnectionError(x, NOOP); - super.onCompleteFailure(x); + super.onFailure(x); } } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index 3aee03b73b0c..876ec8ebb1bd 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -405,7 +405,7 @@ public void timeoutExpired() } @Override - public void onCompleteFailure(Throwable failure) + public void onFailure(Throwable failure) { if (batchBuffer != null) batchBuffer.clear(); diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java index 5e8d65573f41..54b7ab3d1393 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java @@ -494,10 +494,10 @@ private boolean inflate(Frame frame, Callback callback, boolean first) throws Da } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { releasePayload(_payloadRef); - super.onCompleteFailure(cause); + super.onFailure(cause); } private void releasePayload(AtomicReference reference) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java index 41e123b0f165..9a8a2afff7d0 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java @@ -178,7 +178,7 @@ protected Action process() throws Throwable } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { Throwable suppressed = _failure.getAndSet(cause); if (suppressed != null && suppressed != cause) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/TransformingFlusher.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/TransformingFlusher.java index edd5ee805ecd..51cc47495461 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/TransformingFlusher.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/TransformingFlusher.java @@ -170,7 +170,7 @@ protected Action process() throws Throwable } @Override - protected void onCompleteFailure(Throwable t) + protected void onFailure(Throwable t) { if (log.isDebugEnabled()) log.debug("onCompleteFailure {}", t.toString()); @@ -180,7 +180,7 @@ protected void onCompleteFailure(Throwable t) notifyCallbackFailure(current.callback, t); current = null; } - onFailure(t); + this.onFailure(t); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java index 2ae247437e02..7ba03da9e066 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java @@ -166,11 +166,11 @@ public void testWriteTimeout() throws Exception FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather) { @Override - public void onCompleteFailure(Throwable failure) + public void onFailure(Throwable failure) { error.set(failure); flusherFailure.countDown(); - super.onCompleteFailure(failure); + super.onFailure(failure); } }; diff --git a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java index b464b5a325d0..a4a2f459ebda 100644 --- a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java +++ b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java @@ -400,7 +400,7 @@ private void process(ByteBuffer content, Callback callback, boolean finished) th } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { onError(x); } diff --git a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java index 11e048ac8236..df91f4f27305 100644 --- a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java +++ b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java @@ -192,7 +192,7 @@ protected void onRequestContent(HttpServletRequest request, Request proxyRequest } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { onError(cause); } diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java index 498079c1c042..ae49f9343d5a 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java @@ -1425,7 +1425,7 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + public void onFailure(Throwable e) { onWriteComplete(_last, e); } @@ -1461,11 +1461,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + public void onFailure(Throwable e) { try { - super.onCompleteFailure(e); + super.onFailure(e); } catch (Throwable t) { @@ -1666,11 +1666,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { _buffer.release(); IO.close(_in); - super.onCompleteFailure(x); + super.onFailure(x); } } @@ -1739,11 +1739,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { _buffer.release(); IO.close(_in); - super.onCompleteFailure(x); + super.onFailure(x); } } diff --git a/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncMiddleManServlet.java b/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncMiddleManServlet.java index bb902313989e..5110d88b3f01 100644 --- a/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncMiddleManServlet.java +++ b/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncMiddleManServlet.java @@ -400,7 +400,7 @@ private void process(ByteBuffer content, Callback callback, boolean finished) th } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { onError(x); } diff --git a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/HttpOutput.java b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/HttpOutput.java index 557b17939fe3..d28444d8c7c4 100644 --- a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/HttpOutput.java +++ b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/HttpOutput.java @@ -1427,7 +1427,7 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + public void onFailure(Throwable e) { onWriteComplete(_last, e); } @@ -1463,11 +1463,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + public void onFailure(Throwable e) { try { - super.onCompleteFailure(e); + super.onFailure(e); } catch (Throwable t) { @@ -1668,11 +1668,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { _buffer.release(); IO.close(_in); - super.onCompleteFailure(x); + super.onFailure(x); } } @@ -1741,11 +1741,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { _buffer.release(); IO.close(_in); - super.onCompleteFailure(x); + super.onFailure(x); } } diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/FileBufferedResponseHandler.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/FileBufferedResponseHandler.java index 0eb943a384b8..efec2644bdf7 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/FileBufferedResponseHandler.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/FileBufferedResponseHandler.java @@ -221,7 +221,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { dispose(); callback.failed(cause); diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java index f85fba691612..02902d780da1 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java @@ -1612,7 +1612,7 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + public void onFailure(Throwable e) { onWriteComplete(_last, e); } @@ -1648,11 +1648,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + public void onFailure(Throwable e) { try { - super.onCompleteFailure(e); + super.onFailure(e); } catch (Throwable t) { @@ -1852,11 +1852,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { _buffer.release(); IO.close(_in); - super.onCompleteFailure(x); + super.onFailure(x); } } @@ -1923,11 +1923,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { _buffer.release(); IO.close(_in); - super.onCompleteFailure(x); + super.onFailure(x); } } diff --git a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncMiddleManServlet.java b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncMiddleManServlet.java index f8d678276a62..a02c716b8623 100644 --- a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncMiddleManServlet.java +++ b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncMiddleManServlet.java @@ -400,7 +400,7 @@ private void process(ByteBuffer content, Callback callback, boolean finished) th } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { onError(x); } diff --git a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java index 321da55fd150..6c90f26e7cdf 100644 --- a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java +++ b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java @@ -192,7 +192,7 @@ protected void onRequestContent(HttpServletRequest request, Request proxyRequest } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { onError(cause); }