diff --git a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java index d406ee1d3bd9..df5d8d6b6692 100644 --- a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java +++ b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java @@ -329,11 +329,11 @@ protected Action process() throws Throwable // Read a chunk. chunk = source.read(); - // No chunk, demand to be called back when there will be more chunks. + // If no chunk, schedule a demand callback when there are more chunks. if (chunk == null) { - source.demand(this::iterate); - return Action.IDLE; + source.demand(this::succeeded); + return Action.SCHEDULED; } // The read failed, re-throw the failure @@ -341,7 +341,7 @@ protected Action process() throws Throwable if (Content.Chunk.isFailure(chunk)) throw chunk.getFailure(); - // Copy the chunk. + // Copy the chunk by scheduling an asynchronous write. sink.write(chunk.isLast(), chunk.getByteBuffer(), this); return Action.SCHEDULED; } @@ -349,8 +349,9 @@ protected Action process() throws Throwable @Override protected void onSuccess() { - // After every successful write, release the chunk. - chunk.release(); + // After every successful write, release the chunk + // and reset to the next chunk + chunk = Content.Chunk.releaseAndNext(chunk); } @Override @@ -361,14 +362,20 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable failure) + protected void onFailure(Throwable cause) { - // In case of a failure, either on the - // read or on the write, release the chunk. - chunk.release(); - // The copy is failed, fail the callback. - callback.failed(failure); + // This method is invoked before a write() has completed, so + // the chunk is not released here, but in onCompleteFailure(). + callback.failed(cause); + } + + @Override + protected void onCompleteFailure(Throwable failure) + { + // In case of a failure, this method is invoked when the write() + // is completed, and it is now possible to release the chunk. + chunk = Content.Chunk.releaseAndNext(chunk); } @Override diff --git a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/SelectorManagerDocs.java b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/SelectorManagerDocs.java index cc60b78e7f92..ed8c4cabf2a8 100644 --- a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/SelectorManagerDocs.java +++ b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/SelectorManagerDocs.java @@ -24,9 +24,12 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.ConnectionStatistics; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.Retainable; +import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -225,11 +228,13 @@ public void echoCorrect() // tag::echo-correct[] class EchoConnection extends AbstractConnection { + private final ByteBufferPool.Sized pool; private final IteratingCallback callback = new EchoIteratingCallback(); - public EchoConnection(EndPoint endp, Executor executor) + public EchoConnection(EndPoint endp, ByteBufferPool.Sized pool, Executor executor) { super(endp, executor); + this.pool = pool; } @Override @@ -250,20 +255,20 @@ public void onFillable() class EchoIteratingCallback extends IteratingCallback { - private ByteBuffer buffer; + private RetainableByteBuffer buffer; @Override protected Action process() throws Throwable { // Obtain a buffer if we don't already have one. if (buffer == null) - buffer = BufferUtil.allocate(1024); + buffer = pool.acquire(); - int filled = getEndPoint().fill(buffer); + int filled = getEndPoint().fill(buffer.getByteBuffer()); if (filled > 0) { // We have filled some bytes, echo them back. - getEndPoint().write(this, buffer); + getEndPoint().write(this, buffer.getByteBuffer()); // Signal that the iteration should resume // when the write() operation is completed. @@ -273,14 +278,15 @@ else if (filled == 0) { // We don't need the buffer anymore, so // don't keep it around while we are idle. - buffer = null; + buffer = Retainable.release(buffer); // No more bytes to read, declare // again interest for fill events. - fillInterested(); + fillInterested(this); - // Signal that the iteration is now IDLE. - return Action.IDLE; + // Signal that the iteration is now SCHEDULED + // for a fillable callback. + return Action.SCHEDULED; } else { @@ -291,17 +297,11 @@ else if (filled == 0) } @Override - protected void onCompleteSuccess() - { - // The iteration completed successfully. - getEndPoint().close(); - } - - @Override - protected void onCompleteFailure(Throwable cause) + protected void onCompleted(Throwable cause) { - // The iteration completed with a failure. + // The iteration completed. getEndPoint().close(cause); + buffer = Retainable.release(buffer); } @Override diff --git a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java index b0a65e64e336..eee7352a63fa 100644 --- a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java +++ b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java @@ -523,14 +523,16 @@ protected Action process() throws Throwable // <2> @Override public void succeed() { - // When the send succeeds, succeed this IteratingCallback. + // Map the o.e.j.websocket.api.Callback to o.e.j.util.Callback. + // When the send() succeeds, succeed this IteratingCallback. succeeded(); } @Override public void fail(Throwable x) { - // When the send fails, fail this IteratingCallback. + // Map the o.e.j.websocket.api.Callback to o.e.j.util.Callback. + // When the send() fails, fail this IteratingCallback. failed(x); } diff --git a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/server/ServerDocs.java b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/server/ServerDocs.java index a109be5ed6d3..b147d5eff245 100644 --- a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/server/ServerDocs.java +++ b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/server/ServerDocs.java @@ -176,6 +176,7 @@ public void onOpen() @Override public void onFillable() { + // Called from fillInterested() in onOpen() to start iteration. callback.iterate(); } @@ -206,11 +207,8 @@ protected Action process() throws Throwable // the application completed the request processing. return Action.SCHEDULED; } - else - { - // Did not receive enough JSON bytes, - // loop around to try to read more. - } + // Did not receive enough JSON bytes to complete the + // JSON parsing, loop around to try to read more bytes. } else if (filled == 0) { @@ -218,12 +216,11 @@ else if (filled == 0) // don't keep it around while we are idle. buffer = null; - // No more bytes to read, declare - // again interest for fill events. - fillInterested(); + // No more bytes to read, declare again interest for fill events. + fillInterested(this); - // Signal that the iteration is now IDLE. - return Action.IDLE; + // Signal that the iteration is now SCHEDULED for fill interest callback. + return Action.SCHEDULED; } else { diff --git a/documentation/jetty/modules/programming-guide/pages/arch/io.adoc b/documentation/jetty/modules/programming-guide/pages/arch/io.adoc index 9de123c71f41..d47c348335bb 100644 --- a/documentation/jetty/modules/programming-guide/pages/arch/io.adoc +++ b/documentation/jetty/modules/programming-guide/pages/arch/io.adoc @@ -187,7 +187,7 @@ In turn, this calls `IteratingCallback.process()`, an abstract method that must Method `process()` must return: * `Action.SCHEDULED`, to indicate whether the loop has performed a non-blocking, possibly asynchronous, operation -* `Action.IDLE`, to indicate that the loop should temporarily be suspended to be resumed later +* `Action.IDLE`, to indicate that the loop should temporarily be suspended to be resumed later with another call to iterate * `Action.SUCCEEDED` to indicate that the loop exited successfully Any exception thrown within `process()` exits the loops with a failure. @@ -209,13 +209,18 @@ If this was the only active network connection, the system would now be idle, wi Eventually, the Jetty I/O system will notify that the `write()` completed; this notifies the `IteratingCallback` that can now resume the loop and call `process()` again. -When `process()` is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the `Connection` to read, or it may send them after a long pause -- in both cases we do not want to retain the memory allocated by the buffer; next, you want to call `fillInterested()` to declare again interest for read events, and return `Action.IDLE` since there is nothing to write back and therefore the loop may be suspended. -When more bytes are again available to be read from the network, `onFillable()` will be called again and that will start the iteration again. +When `process()` is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the `Connection` to read, or it may send them after a long pause -- in both cases we do not want to retain the memory allocated by the buffer; next, you want to call `fillInterested(this)` to declare again interest for read events, and return `Action.SCHEDULED` since a callback is scheduled to occur once filling is possible. Another possibility is that during `process()` the read returns `-1` indicating that the other peer has closed the connection; this means that there will not be more bytes to read and the loop can be exited, so you return `Action.SUCCEEDED`; `IteratingCallback` will then call `onCompleteSuccess()` where you can close the `EndPoint`. The last case is that during `process()` an exception is thrown, for example by `EndPoint.fill(ByteBuffer)` or, in more advanced implementations, by code that parses the bytes that have been read and finds them unacceptable; any exception thrown within `process()` will be caught by `IteratingCallback` that will exit the loop with a failure and call `onCompleteFailure(Throwable)` with the exception that has been thrown, where you can close the `EndPoint`, passing the exception that is the reason for closing prematurely the `EndPoint`. +Note that some failures may occur whilst a scheduled operation is in progress. +Such failures are notified immediately via the `onFailure(Throwable)` method, but care must be taken to not release any resources that may still be in use by the scheduled operation. +The `onCompleteFailure(Throwable)` method is called when both a failure has occurred and any scheduled operation has completed. +An example of this issue is that a buffer used for a write operation cannot be returned to a pool in `onFailure(Throwable)` as the write may still be progressing. +Either the buffer must be removed from the pool in `onFailure(Throwable)` or the release of the buffer deferred until `onCompleteFailure(Throwable)` is called. + [IMPORTANT] ==== Asynchronous programming is hard. @@ -356,9 +361,9 @@ You must initiate a second write only when the first is finished, for example: include::code:example$src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java[tags=sinkMany] ---- -When you need to perform an unknown number of writes, you must use an `IteratingCallback`, explained in <>, to avoid ``StackOverFlowError``s. +When you need to perform an unknown number of writes, you may use an `IteratingCallback`, explained in <>, to avoid ``StackOverFlowError``s. -For example, to copy from a `Content.Source` to a `Content.Sink` you should use the convenience method `Content.copy(Content.Source, Content.Sink, Callback)`. +For example, to copy from a `Content.Source` to a `Content.Sink` you could use the convenience method `Content.copy(Content.Source, Content.Sink, Callback)`. For illustrative purposes, below you can find the implementation of `copy(Content.Source, Content.Sink, Callback)` that uses an `IteratingCallback`: [,java,indent=0] diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java index 55476b7f48ce..4008182a9f09 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java @@ -617,14 +617,8 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { - if (chunk != null) - { - chunk.release(); - chunk = Content.Chunk.next(chunk); - } - failRequest(x); internalAbort(exchange, x); @@ -633,6 +627,14 @@ protected void onCompleteFailure(Throwable x) promise.succeeded(true); } + @Override + protected void onCompleteFailure(Throwable x) + { + if (chunk != null) + chunk.release(); + chunk = Content.Chunk.next(chunk); + } + @Override public InvocationType getInvocationType() { diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java index 1f6bc4a3f756..39a7787ace58 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.Retainable; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -237,7 +238,9 @@ protected Action process() throws Exception @Override protected void onSuccess() { - release(); + headerBuffer = Retainable.release(headerBuffer); + chunkBuffer = Retainable.release(chunkBuffer); + contentByteBuffer = null; } @Override @@ -248,21 +251,16 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { - super.onCompleteFailure(cause); - release(); callback.failed(cause); } - private void release() + @Override + protected void onCompleteFailure(Throwable cause) { - if (headerBuffer != null) - headerBuffer.release(); - headerBuffer = null; - if (chunkBuffer != null) - chunkBuffer.release(); - chunkBuffer = null; + headerBuffer = Retainable.release(headerBuffer); + chunkBuffer = Retainable.release(chunkBuffer); contentByteBuffer = null; } } @@ -334,11 +332,16 @@ protected Action process() throws Exception } } + @Override + protected void onFailure(Throwable cause) + { + callback.failed(cause); + } + @Override protected void onCompleteFailure(Throwable cause) { release(); - callback.failed(cause); } private void release() diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java index 9c64ffb77943..891788aa5c8d 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java @@ -15,6 +15,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -104,43 +105,60 @@ protected void onCompleteSuccess() protected void onSuccess() { if (active != null) + { + active.release(); active.succeeded(); - active = null; + active = null; + } } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable cause) { if (active != null) - active.failed(x); - active = null; + active.failed(cause); + List entries; + try (AutoLock ignored = lock.lock()) + { + entries = new ArrayList<>(queue); + } + entries.forEach(entry -> entry.failed(cause)); + } - while (true) + @Override + protected void onCompleteFailure(Throwable cause) + { + if (active != null) + { + active.release(); + active = null; + } + List entries; + try (AutoLock ignored = lock.lock()) { - Entry entry = poll(); - if (entry == null) - break; - entry.failed(x); + entries = new ArrayList<>(queue); + queue.clear(); } + entries.forEach(Entry::release); } } - private record Entry(ByteBufferPool.Accumulator accumulator, Callback callback) implements Callback + private record Entry(ByteBufferPool.Accumulator accumulator, Callback callback) { - @Override public void succeeded() { - if (accumulator != null) - accumulator.release(); callback.succeeded(); } - @Override public void failed(Throwable x) + { + callback.failed(x); + } + + private void release() { if (accumulator != null) accumulator.release(); - callback.failed(x); } } } diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java index fdfbcf0d6781..ba51999d4ff1 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java @@ -259,8 +259,7 @@ private void releaseInputBuffer() boolean released = inputBuffer.release(); if (LOG.isDebugEnabled()) LOG.debug("releaseInputBuffer {} {}", released, this); - if (released) - inputBuffer = null; + inputBuffer = null; } private int fillInputBuffer() diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java index 1d3849f066f5..8ef2d83a6cb0 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java @@ -344,10 +344,8 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { - release(); - Throwable closed; Set allEntries; try (AutoLock ignored = lock.lock()) @@ -376,6 +374,12 @@ protected void onCompleteFailure(Throwable x) session.onWriteFailure(x); } + @Override + protected void onCompleteFailure(Throwable x) + { + release(); + } + public void terminate(Throwable cause) { Throwable closed; diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java index 59827ebe1958..77a4a1d0b722 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java @@ -519,7 +519,7 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { frameInfo.callback.failed(cause); } @@ -673,7 +673,7 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { frameInfo.callback.failed(cause); } diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java index deee66c03b7f..00c3a4ff0711 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java @@ -122,13 +122,11 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable failure) + protected void onFailure(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("failed to write {} on {}", entries, this, failure); - accumulator.release(); - List allEntries = new ArrayList<>(entries); entries.clear(); try (AutoLock ignored = lock.lock()) @@ -147,6 +145,12 @@ protected void onCompleteFailure(Throwable failure) endPoint.getQuicSession().getProtocolSession().outwardClose(error, "control_stream_failure"); } + @Override + protected void onCompleteFailure(Throwable cause) + { + accumulator.release(); + } + @Override public InvocationType getInvocationType() { diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java index 7a41ba177c6b..33e35217ab27 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java @@ -118,13 +118,11 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable failure) + protected void onFailure(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("failed to write buffers on {}", this, failure); - accumulator.release(); - try (AutoLock ignored = lock.lock()) { terminated = failure; @@ -138,6 +136,12 @@ protected void onCompleteFailure(Throwable failure) endPoint.getQuicSession().getProtocolSession().outwardClose(error, "instruction_stream_failure"); } + @Override + protected void onCompleteFailure(Throwable cause) + { + accumulator.release(); + } + @Override public InvocationType getInvocationType() { diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java index f450bfaf0ec2..865ef6237257 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java @@ -118,13 +118,11 @@ protected void onSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { if (LOG.isDebugEnabled()) LOG.debug("failed to write {} on {}", entry, this, cause); - accumulator.release(); - if (entry != null) { entry.callback.failed(cause); @@ -132,6 +130,12 @@ protected void onCompleteFailure(Throwable cause) } } + @Override + protected void onCompleteFailure(Throwable cause) + { + accumulator.release(); + } + @Override public InvocationType getInvocationType() { diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index 3f40f11c3902..97d0831e4def 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -145,6 +145,20 @@ public void fillInterested() getEndPoint().fillInterested(_readCallback); } + /** + *

Utility method to be called to register read interest.

+ *

After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)} + * will be called back as appropriate.

+ * + * @see #onFillable() + */ + public void fillInterested(Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("fillInterested {} {}", callback, this); + getEndPoint().fillInterested(callback); + } + public void tryFillInterested(Callback callback) { getEndPoint().tryFillInterested(callback); diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index 01cbe9a3515d..38f168180061 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -224,7 +224,7 @@ public RetainableByteBuffer.Mutable acquire(int size, boolean direct) } @Override - public boolean removeAndRelease(RetainableByteBuffer buffer) + public boolean releaseAndRemove(RetainableByteBuffer buffer) { RetainableByteBuffer actual = buffer; while (actual instanceof RetainableByteBuffer.Wrapper wrapper) @@ -244,7 +244,7 @@ public boolean removeAndRelease(RetainableByteBuffer buffer) return buffer.release(); } - return ByteBufferPool.super.removeAndRelease(buffer); + return ByteBufferPool.super.releaseAndRemove(buffer); } private void reserve(RetainedBucket bucket, ByteBuffer byteBuffer) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 6a835fae7bb6..72df4e946428 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -64,11 +64,9 @@ public interface ByteBufferPool * If the buffer is not in a pool, calling this method is equivalent to calling {@link RetainableByteBuffer#release()}. * Calling this method satisfies any contract that requires a call to {@link RetainableByteBuffer#release()}. * @return {@code true} if a call to {@link RetainableByteBuffer#release()} would have returned {@code true}. - * @see RetainableByteBuffer#release() - * @deprecated This API is experimental and may be removed in future releases + * @see RetainableByteBuffer#releaseAndRemove() */ - @Deprecated - default boolean removeAndRelease(RetainableByteBuffer buffer) + default boolean releaseAndRemove(RetainableByteBuffer buffer) { return buffer != null && buffer.release(); } @@ -96,6 +94,12 @@ public ByteBufferPool getWrapped() return wrapped; } + @Override + public boolean releaseAndRemove(RetainableByteBuffer buffer) + { + return getWrapped().releaseAndRemove(buffer); + } + @Override public RetainableByteBuffer.Mutable acquire(int size, boolean direct) { diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java index 520f6f8dde1a..be0aeb452b0f 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java @@ -1080,6 +1080,27 @@ static Chunk next(Chunk chunk) return null; } + /** + * Convenience method to release a chunk and return {@link #next(Chunk)}. + * Equivalent to: + *
{@code
+         * if (chunk != null)
+         * {
+         *     chunk.release();
+         *     chunk = Chunk.next(chunk);
+         * }
+         * }
+ * @param chunk The chunk to release or {@code null} + * @return The {@link #next(Chunk)} chunk; + */ + static Chunk releaseAndNext(Chunk chunk) + { + if (chunk == null) + return null; + chunk.release(); + return next(chunk); + } + /** * @param chunk The chunk to test for an {@link Chunk#getFailure() failure}. * @return True if the chunk is non-null and {@link Chunk#getFailure() chunk.getError()} returns non-null. diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java index 531465570647..367c0985d37c 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/IOResources.java @@ -321,12 +321,18 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) + { + IO.close(channel); + super.onFailure(x); + } + + @Override + protected void onCompleteFailure(Throwable cause) { if (retainableByteBuffer != null) retainableByteBuffer.release(); - IO.close(channel); - super.onCompleteFailure(x); + super.onCompleteFailure(cause); } } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java index d92bb6edd866..d20f1d2f19d2 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java @@ -247,4 +247,30 @@ public String toString() return String.format("%s@%x[r=%d]", getClass().getSimpleName(), hashCode(), get()); } } + + /** + * Convenience method that replaces code like: + *
{@code
+     *   if (buffer != null)
+     *   {
+     *       buffer.release();
+     *       buffer = null;
+     *   }
+     * }
+     * 
+ * with: + *
{@code
+     *   buffer = Retainable.release(buffer);
+     * }
+     * 
+ * @param retainable The retainable to release, if not {@code null}. + * @param The type of the retainable + * @return always returns {@code null} + */ + static R release(R retainable) + { + if (retainable != null) + retainable.release(); + return null; + } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java index 243975879eb1..8c6cc2fd0c13 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java @@ -164,6 +164,19 @@ default Mutable asMutable() throws ReadOnlyBufferException throw new ReadOnlyBufferException(); } + /** + * {@link #release() Releases} the buffer in a way that ensures it will not be recycled in a buffer pool. + * This method should be used in cases where it is unclear if operations on the buffer have completed + * (for example, when a write operation has been aborted asynchronously or timed out, but the write + * operation may still be pending). + * @return whether if the buffer was released. + * @see ByteBufferPool#releaseAndRemove(RetainableByteBuffer) + */ + default boolean releaseAndRemove() + { + return release(); + } + /** * Appends and consumes the contents of this buffer to the passed buffer, limited by the capacity of the target buffer. * @param buffer The buffer to append bytes to, whose limit will be updated. @@ -657,6 +670,12 @@ public RetainableByteBuffer getWrapped() return (RetainableByteBuffer)super.getWrapped(); } + @Override + public boolean releaseAndRemove() + { + return getWrapped().releaseAndRemove(); + } + @Override public boolean isRetained() { @@ -1301,6 +1320,12 @@ protected Pooled(ByteBufferPool pool, ByteBuffer byteBuffer, Retainable retainab _pool = pool; } + @Override + public boolean releaseAndRemove() + { + return _pool.releaseAndRemove(this); + } + @Override public RetainableByteBuffer slice(long length) { @@ -1939,6 +1964,22 @@ public boolean release() return false; } + @Override + public boolean releaseAndRemove() + { + if (LOG.isDebugEnabled()) + LOG.debug("release {}", this); + if (super.release()) + { + for (RetainableByteBuffer buffer : _buffers) + buffer.releaseAndRemove(); + _buffers.clear(); + _aggregate = null; + return true; + } + return false; + } + @Override public void clear() { @@ -2333,10 +2374,6 @@ public void writeTo(Content.Sink sink, boolean last, Callback callback) @Override protected Action process() { - // release the last buffer written - if (_buffer != null) - _buffer.release(); - // write next buffer if (_index < _buffers.size()) { @@ -2357,6 +2394,20 @@ protected Action process() _buffers.clear(); return Action.SUCCEEDED; } + + @Override + protected void onSuccess() + { + // release the last buffer written + _buffer = Retainable.release(_buffer); + } + + @Override + protected void onCompleteFailure(Throwable x) + { + // release the last buffer written + _buffer = Retainable.release(_buffer); + } }.iterate(); } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java index a4b824fd7758..334217967df5 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java @@ -145,7 +145,7 @@ public static class WithRetainable extends ByteBufferChunk public WithRetainable(ByteBuffer byteBuffer, boolean last, Retainable retainable) { super(byteBuffer, last); - this.retainable = retainable; + this.retainable = Objects.requireNonNull(retainable); } @Override diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java index b4b59f851d4b..abc14bb5673f 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java @@ -27,7 +27,7 @@ public class ContentCopier extends IteratingNestedCallback private final Content.Source source; private final Content.Sink sink; private final Content.Chunk.Processor chunkProcessor; - private Content.Chunk current; + private Content.Chunk chunk; private boolean terminated; public ContentCopier(Content.Source source, Content.Sink sink, Content.Chunk.Processor chunkProcessor, Callback callback) @@ -47,43 +47,47 @@ public InvocationType getInvocationType() @Override protected Action process() throws Throwable { - if (current != null) - current.release(); - if (terminated) return Action.SUCCEEDED; - current = source.read(); + chunk = source.read(); - if (current == null) + if (chunk == null) { source.demand(this::succeeded); return Action.SCHEDULED; } - if (chunkProcessor != null && chunkProcessor.process(current, this)) + if (chunkProcessor != null && chunkProcessor.process(chunk, this)) return Action.SCHEDULED; - terminated = current.isLast(); + terminated = chunk.isLast(); - if (Content.Chunk.isFailure(current)) + if (Content.Chunk.isFailure(chunk)) { - failed(current.getFailure()); + failed(chunk.getFailure()); return Action.SCHEDULED; } - sink.write(current.isLast(), current.getByteBuffer(), this); + sink.write(chunk.isLast(), chunk.getByteBuffer(), this); return Action.SCHEDULED; } + @Override + protected void onSuccess() + { + chunk = Content.Chunk.releaseAndNext(chunk); + } + + @Override + protected void onFailure(Throwable cause) + { + ExceptionUtil.callAndThen(cause, source::fail, super::onFailure); + } + @Override protected void onCompleteFailure(Throwable x) { - if (current != null) - { - current.release(); - current = Content.Chunk.next(current); - } - ExceptionUtil.callAndThen(x, source::fail, super::onCompleteFailure); + chunk = Content.Chunk.releaseAndNext(chunk); } } diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java index 25e424f47bc4..fe6f190c392b 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java @@ -448,7 +448,7 @@ public void testReleaseExcessMemory() } @Test - public void testRemoveAndRelease() + public void testReleaseAndRemove() { ArrayByteBufferPool pool = new ArrayByteBufferPool(); @@ -471,9 +471,9 @@ public void testRemoveAndRelease() retained1 = pool.acquire(1024, false); retained1.retain(); - assertTrue(pool.removeAndRelease(reserved1)); - assertTrue(pool.removeAndRelease(acquired1)); - assertFalse(pool.removeAndRelease(retained1)); + assertTrue(reserved1.releaseAndRemove()); + assertTrue(acquired1.releaseAndRemove()); + assertFalse(retained1.releaseAndRemove()); assertTrue(retained1.release()); assertThat(pool.getHeapByteBufferCount(), is(2L)); diff --git a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java index e5c1d340b659..3dba959a0d6d 100644 --- a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java +++ b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java @@ -377,7 +377,7 @@ public InvocationType getInvocationType() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { entry.callback.failed(cause); QuicConnection.this.close(); diff --git a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index b6a54be7346c..c46bc8e00ce3 100644 --- a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -38,6 +38,7 @@ import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.CyclicTimeout; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.Retainable; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.quic.quiche.QuicheConnection; import org.eclipse.jetty.quic.quiche.QuicheConnectionId; @@ -533,7 +534,7 @@ protected void onSuccess() { if (LOG.isDebugEnabled()) LOG.debug("written cipher bytes on {}", QuicSession.this); - cipherBuffer.release(); + cipherBuffer = Retainable.release(cipherBuffer); } @Override @@ -547,22 +548,24 @@ protected void onCompleteSuccess() { if (LOG.isDebugEnabled()) LOG.debug("connection closed {}", QuicSession.this); - finish(new ClosedChannelException()); + cipherBuffer = Retainable.release(cipherBuffer); + finishOutwardClose(new ClosedChannelException()); + timeout.destroy(); } @Override - protected void onCompleteFailure(Throwable failure) + protected void onFailure(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("failed to write cipher bytes, closing session on {}", QuicSession.this, failure); - finish(failure); + finishOutwardClose(failure); + timeout.destroy(); } - private void finish(Throwable failure) + @Override + protected void onCompleteFailure(Throwable cause) { - cipherBuffer.release(); - finishOutwardClose(failure); - timeout.destroy(); + cipherBuffer = Retainable.release(cipherBuffer); } } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index 51fdb7e10117..985cd618caf4 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -37,6 +37,7 @@ import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.Retainable; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.SocketChannelEndPoint; @@ -736,18 +737,17 @@ protected Action process() write(connection.getEndPoint(), byteBuffer, this); return Action.SCHEDULED; } - else if (filled == 0) - { - buffer.release(); - fillInterested(); - return Action.IDLE; - } - else + + buffer = Retainable.release(buffer); + + if (filled == 0) { - buffer.release(); - connection.getEndPoint().shutdownOutput(); - return Action.SUCCEEDED; + fillInterested(this); + return Action.SCHEDULED; } + + connection.getEndPoint().shutdownOutput(); + return Action.SUCCEEDED; } catch (IOException x) { @@ -764,18 +764,23 @@ protected void onSuccess() { if (LOG.isDebugEnabled()) LOG.debug("Wrote {} bytes {}", filled, TunnelConnection.this); - buffer.release(); + buffer = Retainable.release(buffer); } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("Failed to write {} bytes {}", filled, TunnelConnection.this, x); - buffer.release(); disconnect(x); } + @Override + protected void onCompleteFailure(Throwable cause) + { + buffer = Retainable.release(buffer); + } + private void disconnect(Throwable x) { TunnelConnection.this.close(x); diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java index d32e4255c39a..c6621c90fc79 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java @@ -254,14 +254,14 @@ else if (charsets.contains(StandardCharsets.ISO_8859_1)) } response.getHeaders().put(type.getContentTypeField(charset)); - response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, byteBufferPool, buffer)); + response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, buffer)); return true; } catch (Throwable x) { if (buffer != null) - byteBufferPool.removeAndRelease(buffer); + buffer.releaseAndRemove(); throw x; } } @@ -586,13 +586,11 @@ public String toString() private static class WriteErrorCallback implements Callback { private final AtomicReference _callback; - private final ByteBufferPool _pool; private final RetainableByteBuffer _buffer; - public WriteErrorCallback(Callback callback, ByteBufferPool pool, RetainableByteBuffer retainable) + public WriteErrorCallback(Callback callback, RetainableByteBuffer retainable) { _callback = new AtomicReference<>(callback); - _pool = pool; _buffer = retainable; } @@ -600,7 +598,9 @@ public WriteErrorCallback(Callback callback, ByteBufferPool pool, RetainableByte public void succeeded() { Callback callback = _callback.getAndSet(null); - if (callback != null) + if (callback == null) + _buffer.release(); + else ExceptionUtil.callAndThen(_buffer::release, callback::succeeded); } @@ -608,8 +608,10 @@ public void succeeded() public void failed(Throwable x) { Callback callback = _callback.getAndSet(null); - if (callback != null) - ExceptionUtil.callAndThen(x, t -> _pool.removeAndRelease(_buffer), callback::failed); + if (callback == null) + _buffer.releaseAndRemove(); + else + ExceptionUtil.callAndThen(x, t -> _buffer.releaseAndRemove(), callback::failed); } } } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipResponseAndCallback.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipResponseAndCallback.java index 221dfbe78a85..76e3fe6fc1de 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipResponseAndCallback.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipResponseAndCallback.java @@ -321,13 +321,6 @@ public GzipBufferCB(boolean complete, Callback callback, ByteBuffer content) LOG.debug("GzipBufferCB(complete={}, callback={}, content={})", complete, callback, BufferUtil.toDetailString(content)); } - @Override - protected void onCompleteFailure(Throwable x) - { - cleanup(); - super.onCompleteFailure(x); - } - @Override protected Action process() throws Exception { @@ -373,6 +366,13 @@ protected Action process() throws Exception }; } + @Override + protected void onCompleteFailure(Throwable x) + { + cleanup(); + super.onCompleteFailure(x); + } + private void cleanup() { if (_deflaterEntry != null) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 2027b6542fc0..ca6825d9d87c 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -878,15 +878,19 @@ public Action process() throws Exception } } - private Callback release() + private Callback resetCallback() { Callback complete = _callback; _callback = null; _info = null; _content = null; + return complete; + } + + private void release() + { releaseHeader(); releaseChunk(); - return complete; } private void releaseHeader() @@ -906,13 +910,22 @@ private void releaseChunk() @Override protected void onCompleteSuccess() { - release().succeeded(); + Callback callback = resetCallback(); + release(); + callback.succeeded(); + } + + @Override + public void onFailure(final Throwable x) + { + Callback callback = resetCallback(); + failedCallback(callback, x); } @Override - public void onCompleteFailure(final Throwable x) + protected void onCompleteFailure(Throwable cause) { - failedCallback(release(), x); + release(); } @Override diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/AbstractTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/AbstractTest.java index 8f4932f39fa8..6e86f94a4cab 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/AbstractTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/AbstractTest.java @@ -26,7 +26,6 @@ import java.util.stream.Stream; import javax.management.MBeanServer; -import org.awaitility.Awaitility; import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClientTransport; @@ -66,7 +65,6 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tags; @@ -75,6 +73,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.fail; @ExtendWith(WorkDirExtension.class) @@ -129,9 +130,9 @@ public void dispose(TestInfo testInfo) throws Exception try { if (serverBufferPool != null && !isLeakTrackingDisabled(testInfo, "server")) - assertNoLeaks(serverBufferPool, testInfo, "server-", "\n---\nServer Leaks: " + serverBufferPool.dumpLeaks() + "---\n"); + assertNoLeaks(serverBufferPool, testInfo, "server-", "Server Leaks: "); if (clientBufferPool != null && !isLeakTrackingDisabled(testInfo, "client")) - assertNoLeaks(clientBufferPool, testInfo, "client-", "\n---\nClient Leaks: " + clientBufferPool.dumpLeaks() + "---\n"); + assertNoLeaks(clientBufferPool, testInfo, "client-", "Client Leaks: "); } finally { @@ -149,7 +150,7 @@ private void assertNoLeaks(ArrayByteBufferPool.Tracking bufferPool, TestInfo tes { try { - Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> bufferPool.getLeaks().size(), Matchers.is(0)); + await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertThat("\n---\n" + msg + bufferPool.dumpLeaks(), bufferPool.getLeaks().size(), is(0))); } catch (Exception e) { diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java index a3a7ecf9a464..7b535efcbfe9 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java @@ -335,7 +335,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { // There was a write error, close the Gateway Channel. channel.close(cause); @@ -378,7 +378,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { // There was a write error, close the Gateway Channel. channel.close(cause); diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 155577f58e24..96b05f42735d 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -14,8 +14,12 @@ package org.eclipse.jetty.util; import java.io.IOException; +import java.util.Objects; +import java.util.function.Consumer; import org.eclipse.jetty.util.thread.AutoLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This specialized callback implements a pattern that allows @@ -51,10 +55,12 @@ */ public abstract class IteratingCallback implements Callback { + private static final Logger LOG = LoggerFactory.getLogger(IteratingCallback.class); + /** * The internal states of this callback. */ - private enum State + enum State { /** * This callback is idle, ready to iterate. @@ -64,48 +70,35 @@ private enum State /** * This callback is just about to call {@link #process()}, * or within it, or just exited from it, either normally - * or by throwing. + * or by throwing. Further actions are waiting for the + * {@link #process()} method to return. */ PROCESSING, /** - * Method {@link #process()} returned {@link Action#SCHEDULED} - * and this callback is waiting for the asynchronous sub-task - * to complete. - */ - PENDING, - - /** - * The asynchronous sub-task was completed successfully - * via a call to {@link #succeeded()} while in - * {@link #PROCESSING} state. + * The asynchronous sub-task was completed either with + * a call to {@link #succeeded()} or {@link #failed(Throwable)}, whilst in + * {@link #PROCESSING} state. Further actions are waiting for the + * {@link #process()} method to return. */ - CALLED, + PROCESSING_CALLED, /** - * The iteration terminated successfully as indicated by - * {@link Action#SUCCEEDED} returned from - * {@link IteratingCallback#process()}. - */ - SUCCEEDED, - - /** - * The iteration terminated with a failure via a call - * to {@link IteratingCallback#failed(Throwable)}. + * Method {@link #process()} returned {@link Action#SCHEDULED} + * and this callback is waiting for the asynchronous sub-task + * to complete via a callback to {@link #succeeded()} or {@link #failed(Throwable)} */ - FAILED, + PENDING, /** - * This callback has been {@link #close() closed} and - * cannot be {@link #reset() reset}. + * This callback is complete. */ - CLOSED, + COMPLETE, /** - * This callback has been {@link #abort(Throwable) aborted}, - * and cannot be {@link #reset() reset}. + * Complete and can't be reset. */ - ABORTED + CLOSED } /** @@ -120,6 +113,7 @@ protected enum Action * for additional events to trigger more work. */ IDLE, + /** * Indicates that {@link #process()} has initiated an asynchronous * sub-task, where the execution has started but the callback @@ -127,6 +121,7 @@ protected enum Action * may have not yet been invoked. */ SCHEDULED, + /** * Indicates that {@link #process()} has completed the whole * iteration successfully. @@ -135,9 +130,13 @@ protected enum Action } private final AutoLock _lock = new AutoLock(); + private final Runnable _onSuccess = this::onSuccess; + private final Runnable _processing = this::processing; + private final Consumer _onCompleted = this::onCompleted; private State _state; private Throwable _failure; - private boolean _iterate; + private boolean _reprocess; + private boolean _aborted; protected IteratingCallback() { @@ -146,7 +145,7 @@ protected IteratingCallback() protected IteratingCallback(boolean needReset) { - _state = needReset ? State.SUCCEEDED : State.IDLE; + _state = needReset ? State.COMPLETE : State.IDLE; } /** @@ -180,7 +179,32 @@ protected void onSuccess() } /** - * Invoked when the overall task has completed successfully. + * Invoked when the overall task has been {@link #abort(Throwable) aborted} or {@link #failed(Throwable) failed}. + *

+ * Calls to this method are serialized with respect to {@link #onAborted(Throwable)}, {@link #process()}, + * {@link #onCompleteFailure(Throwable)} and {@link #onCompleted(Throwable)}. + *

+ * Because {@code onFailure} can be called due to an {@link #abort(Throwable)} or {@link #close()} operation, it is + * possible that any resources passed to a {@link Action#SCHEDULED} operation may still be in use, and thus should not + * be recycled by this call. For example any buffers passed to a write operation should not be returned to a buffer + * pool by implementations of {@code onFailure}. Such resources may be discarded here, or safely recycled in a + * subsequent call to {@link #onCompleted(Throwable)} or {@link #onCompleteFailure(Throwable)}, when + * the {@link Action#SCHEDULED} operation has completed. + * @param cause The cause of the failure or abort + * @see #onCompleted(Throwable) + * @see #onCompleteFailure(Throwable) + */ + protected void onFailure(Throwable cause) + { + } + + /** + * Invoked when the overall task has completed successfully, specifically after any {@link Action#SCHEDULED} operations + * have {@link Callback#succeeded()} and {@link #process()} has returned {@link Action#SUCCEEDED}. + *

+ * Calls to this method are serialized with respect to {@link #process()}, {@link #onAborted(Throwable)} + * and {@link #onCompleted(Throwable)}. + * If this method is called, then {@link #onCompleteFailure(Throwable)} ()} will never be called. * * @see #onCompleteFailure(Throwable) */ @@ -190,6 +214,10 @@ protected void onCompleteSuccess() /** * Invoked when the overall task has completed with a failure. + *

+ * Calls to this method are serialized with respect to {@link #process()}, {@link #onAborted(Throwable)} + * and {@link #onCompleted(Throwable)}. + * If this method is called, then {@link #onCompleteSuccess()} will never be called. * * @param cause the throwable to indicate cause of failure * @see #onCompleteSuccess() @@ -198,6 +226,101 @@ protected void onCompleteFailure(Throwable cause) { } + /** + * Invoked when the overall task has been aborted. + *

+ * Calls to this method are serialized with respect to {@link #process()}, {@link #onCompleteFailure(Throwable)} + * and {@link #onCompleted(Throwable)}. + * If this method is called, then {@link #onCompleteSuccess()} will never be called. + *

+ * The default implementation of this method calls {@link #failed(Throwable)}. Overridden implementations of + * this method SHOULD NOT call {@code super.onAborted(Throwable)}. + *

+ * Because {@code onAborted} can be called due to an {@link #abort(Throwable)} or {@link #close()} operation, it is + * possible that any resources passed to a {@link Action#SCHEDULED} operation may still be in use, and thus should not + * be recycled by this call. For example any buffers passed to a write operation should not be returned to a buffer + * pool by implementations of {@code onFailure}. Such resources may be discarded here, or safely recycled in a + * subsequent call to {@link #onCompleted(Throwable)} or {@link #onCompleteFailure(Throwable)}, when + * the {@link Action#SCHEDULED} operation has completed. + * @param cause The cause of the abort + * @see #onCompleted(Throwable) + * @see #onCompleteFailure(Throwable) + */ + protected void onAborted(Throwable cause) + { + } + + /** + * Invoked when the overall task has completed. + *

+ * Calls to this method are serialized with respect to {@link #process()} and {@link #onAborted(Throwable)}. + * The default implementation of this method will call either {@link #onCompleteSuccess()} or {@link #onCompleteFailure(Throwable)} + * thus implementations of this method should always call {@code super.onCompleted(Throwable)}. + * + * @param causeOrNull the cause of any {@link #abort(Throwable) abort} or {@link #failed(Throwable) failure}, + * else {@code null} for {@link #succeeded() success}. + */ + protected void onCompleted(Throwable causeOrNull) + { + if (causeOrNull == null) + onCompleteSuccess(); + else + onCompleteFailure(causeOrNull); + } + + private void doOnSuccessProcessing() + { + ExceptionUtil.callAndThen(_onSuccess, _processing); + } + + private void doCompleteSuccess() + { + onCompleted(null); + } + + private void doOnCompleted(Throwable cause) + { + ExceptionUtil.call(cause, _onCompleted); + } + + private void doOnFailureOnCompleted(Throwable cause) + { + ExceptionUtil.callAndThen(cause, this::onFailure, _onCompleted); + } + + private void doOnAbortedOnFailure(Throwable cause) + { + ExceptionUtil.callAndThen(cause, this::onAborted, this::onFailure); + } + + private void doOnAbortedOnFailureOnCompleted(Throwable cause) + { + ExceptionUtil.callAndThen(cause, this::doOnAbortedOnFailure, _onCompleted); + } + + private void doOnAbortedOnFailureIfNotPendingDoCompleted(Throwable cause) + { + ExceptionUtil.callAndThen(cause, this::doOnAbortedOnFailure, this::ifNotPendingDoCompleted); + } + + private void ifNotPendingDoCompleted() + { + Throwable completeFailure = null; + try (AutoLock ignored = _lock.lock()) + { + _failure = _failure.getCause(); + + if (Objects.requireNonNull(_state) != State.PENDING) + { + // the callback completed, one way or another, so it is up to us to do the completion + completeFailure = _failure; + } + } + + if (completeFailure != null) + doOnCompleted(completeFailure); + } + /** * This method must be invoked by applications to start the processing * of asynchronous sub-tasks. @@ -215,28 +338,18 @@ public void iterate() { switch (_state) { - case PENDING: - case CALLED: - // process will be called when callback is handled - break; - case IDLE: _state = State.PROCESSING; process = true; break; case PROCESSING: - _iterate = true; - break; - - case FAILED: - case SUCCEEDED: + case PROCESSING_CALLED: + _reprocess = true; break; - case CLOSED: - case ABORTED: default: - throw new IllegalStateException(toString()); + break; } } if (process) @@ -248,21 +361,24 @@ private void processing() // This should only ever be called when in processing state, however a failed or close call // may happen concurrently, so state is not assumed. - boolean notifyCompleteSuccess = false; - Throwable notifyCompleteFailure = null; + boolean completeSuccess = false; + Throwable onAbortedOnFailureOnCompleted = null; + Throwable onFailureOnCompleted = null; + Throwable onAbortedOnFailureIfNotPendingDoCompleted = null; // While we are processing processing: while (true) { // Call process to get the action that we have to take. - Action action = null; + Action action; try { action = process(); } catch (Throwable x) { + action = null; failed(x); // Fall through to possibly invoke onCompleteFailure(). } @@ -271,72 +387,104 @@ private void processing() // acted on the action we have just received try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("processing {} {}", action, this); + switch (_state) { case PROCESSING: { - if (action != null) + if (action == null) + break processing; + switch (action) { - switch (action) + case IDLE: { - case IDLE: + if (_aborted) { - // Has iterate been called while we were processing? - if (_iterate) - { - // yes, so skip idle and keep processing - _iterate = false; - continue; - } - - // No, so we can go idle - _state = State.IDLE; + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + onAbortedOnFailureOnCompleted = _failure; break processing; } - case SCHEDULED: + + // Has iterate been called while we were processing? + if (_reprocess) { - // we won the race against the callback, so the callback has to process and we can break processing - _state = State.PENDING; - break processing; + // yes, so skip idle and keep processing + _reprocess = false; + continue; } - case SUCCEEDED: + + // No, so we can go idle + _state = State.IDLE; + break processing; + } + case SCHEDULED: + { + // we won the race against the callback, so the callback has to process and we can break processing + _state = State.PENDING; + if (_aborted) { - // we lost the race against the callback, - _iterate = false; - _state = State.SUCCEEDED; - notifyCompleteSuccess = true; - break processing; + onAbortedOnFailureIfNotPendingDoCompleted = _failure; + _failure = new AbortingException(onAbortedOnFailureIfNotPendingDoCompleted); + } + break processing; + } + case SUCCEEDED: + { + // we lost the race against the callback, + _reprocess = false; + if (_aborted) + { + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + onAbortedOnFailureOnCompleted = _failure; } - default: + else { - break; + _state = State.COMPLETE; + completeSuccess = true; } + break processing; + } + default: + { + break; } } throw new IllegalStateException(String.format("%s[action=%s]", this, action)); } - case CALLED: + case PROCESSING_CALLED: { + if (action != Action.SCHEDULED && action != null) + { + _state = State.CLOSED; + onAbortedOnFailureOnCompleted = new IllegalStateException("Action not scheduled"); + if (_failure == null) + { + _failure = onAbortedOnFailureOnCompleted; + } + else + { + ExceptionUtil.addSuppressedIfNotAssociated(_failure, onAbortedOnFailureIfNotPendingDoCompleted); + onAbortedOnFailureOnCompleted = _failure; + } + break processing; + } + if (_failure != null) + { + if (_aborted) + onAbortedOnFailureOnCompleted = _failure; + else + onFailureOnCompleted = _failure; + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + break processing; + } callOnSuccess = true; - if (action != Action.SCHEDULED) - throw new IllegalStateException(String.format("%s[action=%s]", this, action)); - // we lost the race, so we have to keep processing _state = State.PROCESSING; - continue; + break; } - case FAILED: - case CLOSED: - case ABORTED: - notifyCompleteFailure = _failure; - break processing; - - case SUCCEEDED: - break processing; - - case IDLE: - case PENDING: default: throw new IllegalStateException(String.format("%s[action=%s]", this, action)); } @@ -347,47 +495,74 @@ private void processing() onSuccess(); } } - - if (notifyCompleteSuccess) - onCompleteSuccess(); - else if (notifyCompleteFailure != null) - onCompleteFailure(notifyCompleteFailure); + if (onAbortedOnFailureOnCompleted != null) + doOnAbortedOnFailureOnCompleted(onAbortedOnFailureOnCompleted); + else if (completeSuccess) + doCompleteSuccess(); + else if (onFailureOnCompleted != null) + doOnFailureOnCompleted(onFailureOnCompleted); + else if (onAbortedOnFailureIfNotPendingDoCompleted != null) + doOnAbortedOnFailureIfNotPendingDoCompleted(onAbortedOnFailureIfNotPendingDoCompleted); } /** * Method to invoke when the asynchronous sub-task succeeds. *

- * This method should be considered final for all practical purposes. - *

+ * For most purposes, this method should be considered {@code final} and should only be + * overridden in extraordinary circumstances. + * Subclasses that override this method must always call {@code super.succeeded()}. + * Such overridden methods are not serialized with respect to {@link #process()}, {@link #onCompleteSuccess()}, + * {@link #onCompleteFailure(Throwable)}, nor {@link #onAborted(Throwable)}. They should not act on nor change any + * fields that may be used by those methods. * Eventually, {@link #onSuccess()} is * called, either by the caller thread or by the processing * thread. */ @Override - public void succeeded() + public final void succeeded() { - boolean process = false; + boolean onSuccessProcessing = false; + Throwable onCompleted = null; try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("succeeded {}", this); switch (_state) { case PROCESSING: { - _state = State.CALLED; + // Another thread is processing, so we just tell it the state + _state = State.PROCESSING_CALLED; break; } case PENDING: { - _state = State.PROCESSING; - process = true; + if (_aborted) + { + if (_failure instanceof AbortingException) + { + // Another thread is still calling onAborted, so we will let it do the completion + _state = _failure.getCause() instanceof ClosedException ? State.CLOSED : State.COMPLETE; + } + else + { + // The onAborted call is complete, so we must do the completion + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + onCompleted = _failure; + } + } + else + { + // No other thread is processing, so we will do the processing + _state = State.PROCESSING; + onSuccessProcessing = true; + } break; } - case FAILED: - case CLOSED: - case ABORTED: + case COMPLETE, CLOSED: { - // Too late! - break; + // Too late + return; } default: { @@ -395,10 +570,13 @@ public void succeeded() } } } - if (process) + if (onSuccessProcessing) { - onSuccess(); - processing(); + doOnSuccessProcessing(); + } + else if (onCompleted != null) + { + doOnCompleted(onCompleted); } } @@ -407,47 +585,84 @@ public void succeeded() * or to fail the overall asynchronous task and therefore * terminate the iteration. *

- * This method should be considered final for all practical purposes. - *

* Eventually, {@link #onCompleteFailure(Throwable)} is * called, either by the caller thread or by the processing * thread. - * + *

+ * For most purposes, this method should be considered {@code final} and should only be + * overridden in extraordinary circumstances. + * Subclasses that override this method must always call {@code super.succeeded()}. + * Such overridden methods are not serialized with respect to {@link #process()}, {@link #onCompleteSuccess()}, + * {@link #onCompleteFailure(Throwable)}, nor {@link #onAborted(Throwable)}. They should not act on nor change any + * fields that may be used by those methods. * @see #isFailed() */ @Override - public void failed(Throwable x) + public final void failed(Throwable cause) { - boolean failure = false; + cause = Objects.requireNonNullElseGet(cause, IOException::new); + + Throwable onFailureOnCompleted = null; + Throwable onCompleted = null; try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("failed {}", this, cause); switch (_state) { - case CALLED: - case SUCCEEDED: - case FAILED: - case CLOSED: - case ABORTED: - // Too late! + case PROCESSING: + { + // Another thread is processing, so we just tell it the state + _state = State.PROCESSING_CALLED; + if (_failure == null) + _failure = cause; + else + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); break; + } case PENDING: { - _state = State.FAILED; - failure = true; + if (_aborted) + { + if (_failure instanceof AbortingException) + { + // Another thread is still calling onAborted, so we will let it do the completion + ExceptionUtil.addSuppressedIfNotAssociated(_failure.getCause(), cause); + _state = _failure.getCause() instanceof ClosedException ? State.CLOSED : State.COMPLETE; + } + else + { + // The onAborted call is complete, so we must do the completion + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + _state = _failure instanceof ClosedException ? State.CLOSED : State.COMPLETE; + onCompleted = _failure; + } + } + else + { + // No other thread is processing, so we will do the processing + _state = State.COMPLETE; + _failure = cause; + onFailureOnCompleted = _failure; + } break; } - case PROCESSING: + case COMPLETE, CLOSED: { - _state = State.FAILED; - _failure = x; - break; + // Too late + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + return; } default: + { throw new IllegalStateException(toString()); + } } } - if (failure) - onCompleteFailure(x); + if (onFailureOnCompleted != null) + doOnFailureOnCompleted(onFailureOnCompleted); + else if (onCompleted != null) + doOnCompleted(onCompleted); } /** @@ -459,37 +674,63 @@ public void failed(Throwable x) * * @see #isClosed() */ - public void close() + public final void close() { - String failure = null; + Throwable onAbortedOnFailureIfNotPendingDoCompleted = null; + Throwable onAbortedOnFailureOnCompleted = null; + try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("close {}", this); switch (_state) { - case IDLE: - case SUCCEEDED: - case FAILED: - _state = State.CLOSED; - break; - - case PROCESSING: - _failure = new IOException(String.format("Close %s in state %s", this, _state)); + case IDLE -> + { + // Nothing happening so we can abort and complete _state = State.CLOSED; - break; + _failure = new ClosedException(); + onAbortedOnFailureOnCompleted = _failure; + } + case PROCESSING, PROCESSING_CALLED -> + { + // Another thread is processing, so we just tell it the state and let it handle it + if (_aborted) + { + ExceptionUtil.addSuppressedIfNotAssociated(_failure, new ClosedException()); + } + else + { + _aborted = true; + _failure = new ClosedException(); + } + } - case CLOSED: - case ABORTED: - break; + case PENDING -> + { + // We are waiting for the callback, so we can only call onAbort and then keep waiting + onAbortedOnFailureIfNotPendingDoCompleted = new ClosedException(); + _failure = new AbortingException(onAbortedOnFailureIfNotPendingDoCompleted); + _aborted = true; + } - default: - failure = String.format("Close %s in state %s", this, _state); + case COMPLETE -> + { _state = State.CLOSED; - break; + } + + case CLOSED -> + { + // too late + return; + } } } - if (failure != null) - onCompleteFailure(new IOException(failure)); + if (onAbortedOnFailureIfNotPendingDoCompleted != null) + doOnAbortedOnFailureIfNotPendingDoCompleted(onAbortedOnFailureIfNotPendingDoCompleted); + else if (onAbortedOnFailureOnCompleted != null) + doOnAbortedOnFailureOnCompleted(onAbortedOnFailureOnCompleted); } /** @@ -498,49 +739,83 @@ public void close() * ultimately be invoked, either during this call or later after * any call to {@link #process()} has returned.

* - * @param failure the cause of the abort + * @param cause the cause of the abort + * @return {@code true} if abort was called before the callback was complete. * @see #isAborted() */ - public void abort(Throwable failure) + public final boolean abort(Throwable cause) { - boolean abort = false; + cause = Objects.requireNonNullElseGet(cause, Throwable::new); + + boolean onAbort = false; + boolean onAbortedOnFailureOnCompleted = false; try (AutoLock ignored = _lock.lock()) { + if (LOG.isDebugEnabled()) + LOG.debug("abort {}", this, cause); + + // Are we already aborted? + if (_aborted) + { + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + return false; + } + switch (_state) { - case SUCCEEDED: - case FAILED: - case CLOSED: - case ABORTED: + case IDLE: { - // Too late. + // Nothing happening so we can abort and complete + _state = State.COMPLETE; + _failure = cause; + _aborted = true; + onAbortedOnFailureOnCompleted = true; break; } - case IDLE: - case PENDING: + case PROCESSING: { - _failure = failure; - _state = State.ABORTED; - abort = true; + // Another thread is processing, so we just tell it the state and let it handle everything + _failure = cause; + _aborted = true; break; } - case PROCESSING: - case CALLED: + case PROCESSING_CALLED: { - _failure = failure; - _state = State.ABORTED; + // Another thread is processing, but we have already succeeded or failed. + if (_failure == null) + _failure = cause; + else + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + _aborted = true; break; } - default: - throw new IllegalStateException(toString()); + case PENDING: + { + // We are waiting for the callback, so we can only call onAbort and then keep waiting + onAbort = true; + _failure = new AbortingException(cause); + _aborted = true; + break; + } + + case COMPLETE, CLOSED: + { + // too late + ExceptionUtil.addSuppressedIfNotAssociated(_failure, cause); + return false; + } } } - if (abort) - onCompleteFailure(failure); + if (onAbortedOnFailureOnCompleted) + doOnAbortedOnFailureOnCompleted(cause); + else if (onAbort) + doOnAbortedOnFailureIfNotPendingDoCompleted(cause); + + return true; } /** @@ -561,7 +836,7 @@ public boolean isClosed() { try (AutoLock ignored = _lock.lock()) { - return _state == State.CLOSED; + return _state == State.CLOSED || _failure instanceof ClosedException; } } @@ -572,7 +847,7 @@ public boolean isFailed() { try (AutoLock ignored = _lock.lock()) { - return _state == State.FAILED; + return _failure != null; } } @@ -585,7 +860,7 @@ public boolean isSucceeded() { try (AutoLock ignored = _lock.lock()) { - return _state == State.SUCCEEDED; + return _state == State.COMPLETE && _failure == null; } } @@ -596,7 +871,7 @@ public boolean isAborted() { try (AutoLock ignored = _lock.lock()) { - return _state == State.ABORTED; + return _aborted; } } @@ -618,11 +893,10 @@ public boolean reset() case IDLE: return true; - case SUCCEEDED: - case FAILED: + case COMPLETE: _state = State.IDLE; _failure = null; - _iterate = false; + _reprocess = false; return true; default: @@ -634,6 +908,31 @@ public boolean reset() @Override public String toString() { - return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), _state); + try (AutoLock ignored = _lock.lock()) + { + return String.format("%s@%x[%s, %b, %s]", getClass().getSimpleName(), hashCode(), _state, _aborted, _failure); + } + } + + private static class ClosedException extends Exception + { + ClosedException() + { + super("Closed"); + } + + ClosedException(Throwable suppressed) + { + this(); + ExceptionUtil.addSuppressedIfNotAssociated(this, suppressed); + } + } + + private static class AbortingException extends Exception + { + AbortingException(Throwable cause) + { + super(cause.getMessage(), cause); + } } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java index 745e8b800d7d..1d409cd859ed 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingNestedCallback.java @@ -53,7 +53,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { _callback.failed(x); } diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java index 00f6b007b7dd..686e1397704b 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java @@ -13,19 +13,37 @@ package org.eclipse.jetty.util; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicMarkableReference; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import org.awaitility.Awaitility; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; +import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; - +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class IteratingCallbackTest @@ -202,46 +220,31 @@ protected Action process() { processed++; - switch (i--) + return switch (i--) { - case 5: + case 5, 2 -> + { succeeded(); - return Action.SCHEDULED; - - case 4: + yield Action.SCHEDULED; + } + case 4, 1 -> + { scheduler.schedule(successTask, 5, TimeUnit.MILLISECONDS); - return Action.SCHEDULED; - - case 3: - scheduler.schedule(new Runnable() - { - @Override - public void run() - { - idle.countDown(); - } - }, 5, TimeUnit.MILLISECONDS); - return Action.IDLE; - - case 2: - succeeded(); - return Action.SCHEDULED; - - case 1: - scheduler.schedule(successTask, 5, TimeUnit.MILLISECONDS); - return Action.SCHEDULED; - - case 0: - return Action.SUCCEEDED; - - default: - throw new IllegalStateException(); - } + yield Action.SCHEDULED; + } + case 3 -> + { + scheduler.schedule(idle::countDown, 5, TimeUnit.MILLISECONDS); + yield Action.IDLE; + } + case 0 -> Action.SUCCEEDED; + default -> throw new IllegalStateException(); + }; } }; cb.iterate(); - idle.await(10, TimeUnit.SECONDS); + assertTrue(idle.await(10, TimeUnit.SECONDS)); assertTrue(cb.isIdle()); cb.iterate(); @@ -252,25 +255,21 @@ public void run() @Test public void testCloseDuringProcessingReturningScheduled() throws Exception { - testCloseDuringProcessing(IteratingCallback.Action.SCHEDULED); - } - - @Test - public void testCloseDuringProcessingReturningSucceeded() throws Exception - { - testCloseDuringProcessing(IteratingCallback.Action.SUCCEEDED); - } - - private void testCloseDuringProcessing(final IteratingCallback.Action action) throws Exception - { + final CountDownLatch abortLatch = new CountDownLatch(1); final CountDownLatch failureLatch = new CountDownLatch(1); IteratingCallback callback = new IteratingCallback() { @Override - protected Action process() throws Exception + protected Action process() { close(); - return action; + return Action.SCHEDULED; + } + + @Override + protected void onAborted(Throwable cause) + { + abortLatch.countDown(); } @Override @@ -282,27 +281,45 @@ protected void onCompleteFailure(Throwable cause) callback.iterate(); - assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + assertFalse(failureLatch.await(100, TimeUnit.MILLISECONDS)); + assertTrue(abortLatch.await(1000000000, TimeUnit.SECONDS)); + assertTrue(callback.isClosed()); + + callback.succeeded(); + assertTrue(failureLatch.await(1, TimeUnit.SECONDS)); + assertTrue(callback.isFailed()); + assertTrue(callback.isClosed()); } - private abstract static class TestCB extends IteratingCallback + @Test + public void testCloseDuringProcessingReturningSucceeded() throws Exception { - protected Runnable successTask = new Runnable() + final CountDownLatch failureLatch = new CountDownLatch(1); + IteratingCallback callback = new IteratingCallback() { @Override - public void run() + protected Action process() { - succeeded(); + close(); + return Action.SUCCEEDED; } - }; - protected Runnable failTask = new Runnable() - { + @Override - public void run() + protected void onCompleteFailure(Throwable cause) { - failed(new Exception("testing failure")); + failureLatch.countDown(); } }; + + callback.iterate(); + + assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + } + + private abstract static class TestCB extends IteratingCallback + { + protected Runnable successTask = this::succeeded; + protected Runnable failTask = () -> failed(new Exception("testing failure")); protected CountDownLatch completed = new CountDownLatch(1); protected int processed = 0; @@ -320,8 +337,7 @@ public void onCompleteFailure(Throwable x) boolean waitForComplete() throws InterruptedException { - completed.await(10, TimeUnit.SECONDS); - return isSucceeded(); + return completed.await(10, TimeUnit.SECONDS) && isSucceeded(); } } @@ -342,7 +358,6 @@ protected Action process() throws Throwable @Override protected void onCompleteFailure(Throwable cause) { - super.onCompleteFailure(cause); failure.incrementAndGet(); } }; @@ -390,57 +405,544 @@ protected void onCompleteFailure(Throwable cause) assertEquals(1, count.get()); - // Aborting should not iterate. icb.abort(new Exception()); assertTrue(ocfLatch.await(5, TimeUnit.SECONDS)); + assertTrue(icb.isFailed()); assertTrue(icb.isAborted()); assertEquals(1, count.get()); } @Test - public void testWhenProcessingAbortSerializesOnCompleteFailure() throws Exception + public void testWhenPendingAbortSerializesOnCompleteFailure() throws Exception { - AtomicInteger count = new AtomicInteger(); - CountDownLatch ocfLatch = new CountDownLatch(1); + AtomicReference aborted = new AtomicReference<>(); + CountDownLatch abortLatch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicMarkableReference completed = new AtomicMarkableReference<>(null, false); + IteratingCallback icb = new IteratingCallback() { @Override protected Action process() throws Throwable { - count.incrementAndGet(); - abort(new Exception()); + return Action.SCHEDULED; + } - // After calling abort, onCompleteFailure() must not be called yet. - assertFalse(ocfLatch.await(1, TimeUnit.SECONDS)); + @Override + protected void onAborted(Throwable cause) + { + aborted.set(cause); + ExceptionUtil.call(abortLatch::await, Throwable::printStackTrace); + } - return Action.SCHEDULED; + @Override + protected void onCompleteSuccess() + { + completed.set(null, true); } @Override protected void onCompleteFailure(Throwable cause) { - ocfLatch.countDown(); + completed.set(cause, true); + failure.set(cause); } }; icb.iterate(); - assertEquals(1, count.get()); + assertThat(icb.toString(), containsString("[PENDING, false,")); - assertTrue(ocfLatch.await(5, TimeUnit.SECONDS)); - assertTrue(icb.isAborted()); + Throwable cause = new Throwable("test abort"); + new Thread(() -> icb.abort(cause)).start(); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> icb.toString().contains("[PENDING, true,")); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> aborted.get() != null); - // Calling succeeded() won't cause further iterations. icb.succeeded(); - assertEquals(1, count.get()); + // We are now complete, but callbacks have not yet been done + assertThat(icb.toString(), containsString("[COMPLETE, true,")); + assertThat(failure.get(), nullValue()); + assertFalse(completed.isMarked()); + + abortLatch.countDown(); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(completed::isMarked); + assertThat(failure.get(), sameInstance(cause)); + assertThat(completed.getReference(), sameInstance(cause)); + } + + public enum Event + { + PROCESSED, + ABORTED, + SUCCEEDED, + FAILED + } + + public static Stream> serializedEvents() + { + return Stream.of( + List.of(Event.PROCESSED, Event.ABORTED, Event.SUCCEEDED), + List.of(Event.PROCESSED, Event.SUCCEEDED, Event.ABORTED), + + List.of(Event.SUCCEEDED, Event.PROCESSED, Event.ABORTED), + List.of(Event.SUCCEEDED, Event.ABORTED, Event.PROCESSED), + + List.of(Event.ABORTED, Event.SUCCEEDED, Event.PROCESSED), + List.of(Event.ABORTED, Event.PROCESSED, Event.SUCCEEDED), + + List.of(Event.PROCESSED, Event.ABORTED, Event.FAILED), + List.of(Event.PROCESSED, Event.FAILED, Event.ABORTED), + + List.of(Event.FAILED, Event.PROCESSED, Event.ABORTED), + List.of(Event.FAILED, Event.ABORTED, Event.PROCESSED), + + List.of(Event.ABORTED, Event.FAILED, Event.PROCESSED), + List.of(Event.ABORTED, Event.PROCESSED, Event.FAILED) + ); + } + + @ParameterizedTest + @MethodSource("serializedEvents") + public void testSerializesProcessAbortCompletion(List events) throws Exception + { + AtomicReference aborted = new AtomicReference<>(); + CountDownLatch processingLatch = new CountDownLatch(1); + CountDownLatch abortLatch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicMarkableReference completed = new AtomicMarkableReference<>(null, false); + + + Throwable cause = new Throwable("test abort"); + + IteratingCallback icb = new IteratingCallback() + { + @Override + protected Action process() throws Throwable + { + abort(cause); + ExceptionUtil.call(processingLatch::await, Throwable::printStackTrace); + return Action.SCHEDULED; + } + + @Override + protected void onAborted(Throwable cause) + { + aborted.set(cause); + ExceptionUtil.call(abortLatch::await, Throwable::printStackTrace); + } + + @Override + protected void onCompleteSuccess() + { + completed.set(null, true); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + completed.set(cause, true); + failure.set(cause); + } + }; + + new Thread(icb::iterate).start(); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> icb.toString().contains("[PROCESSING, true,")); + + // we have aborted, but onAborted not yet called + assertThat(aborted.get(), nullValue()); + + int count = 0; + for (Event event : events) + { + switch (event) + { + case PROCESSED -> + { + processingLatch.countDown(); + // We can call aborted + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> aborted.get() != null); + } + case ABORTED -> + { + abortLatch.countDown(); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> !icb.toString().contains("AbortingException")); + } + case SUCCEEDED -> icb.succeeded(); + + case FAILED -> icb.failed(new Throwable("failure")); + } + + if (++count < 3) + { + // Not complete yet + assertThat(failure.get(), nullValue()); + assertFalse(completed.isMarked()); + } + + // Extra aborts ignored + assertFalse(icb.abort(new Throwable("ignored"))); + } + + // When the callback is succeeded, the completion events can be called + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(completed::isMarked); + assertThat(failure.get(), sameInstance(cause)); + assertThat(completed.getReference(), sameInstance(cause)); + } + + @Test + public void testICBSuccess() throws Exception + { + TestIteratingCB callback = new TestIteratingCB(); + callback.iterate(); + callback.succeeded(); + assertTrue(callback._completed.await(1, TimeUnit.SECONDS)); + assertThat(callback._onFailure.get(), nullValue()); + assertThat(callback._completion.getReference(), Matchers.nullValue()); + assertTrue(callback._completion.isMarked()); + + // Everything now a noop + assertFalse(callback.abort(new Throwable())); + callback.failed(new Throwable()); + assertThat(callback._completion.getReference(), Matchers.nullValue()); + assertThat(callback._completed.getCount(), is(0L)); + + callback.checkNoBadCalls(); + } + + @Test + public void testICBFailure() throws Exception + { + Throwable failure = new Throwable(); + TestIteratingCB callback = new TestIteratingCB(); + callback.iterate(); + callback.failed(failure); + assertTrue(callback._completed.await(1, TimeUnit.SECONDS)); + assertThat(callback._onFailure.get(), sameInstance(failure)); + assertThat(callback._completion.getReference(), Matchers.sameInstance(failure)); + assertTrue(callback._completion.isMarked()); + + // Everything now a noop, other than suppression + callback.succeeded(); + Throwable late = new Throwable(); + assertFalse(callback.abort(late)); + assertFalse(ExceptionUtil.areNotAssociated(failure, late)); + assertThat(callback._completion.getReference(), Matchers.sameInstance(failure)); + assertThat(callback._completed.getCount(), is(0L)); + + callback.checkNoBadCalls(); + } + + @Test + public void testICBAbortSuccess() throws Exception + { + TestIteratingCB callback = new TestIteratingCB(); + callback.iterate(); + + Throwable abort = new Throwable(); + callback.abort(abort); + assertFalse(callback._completed.await(100, TimeUnit.MILLISECONDS)); + assertThat(callback._onFailure.get(), sameInstance(abort)); + assertThat(callback._completion.getReference(), Matchers.sameInstance(abort)); + assertFalse(callback._completion.isMarked()); + + callback.succeeded(); + assertThat(callback._completion.getReference(), Matchers.sameInstance(abort)); + assertThat(callback._completed.getCount(), is(0L)); + + Throwable late = new Throwable(); + callback.failed(late); + assertFalse(callback.abort(late)); + assertTrue(ExceptionUtil.areAssociated(abort, late)); + assertTrue(ExceptionUtil.areAssociated(callback._onFailure.get(), late)); + assertThat(callback._completion.getReference(), Matchers.sameInstance(abort)); + assertThat(callback._completed.getCount(), is(0L)); + + callback.checkNoBadCalls(); + } + + public static Stream abortTests() + { + List tests = new ArrayList<>(); + + for (IteratingCallback.State state : IteratingCallback.State.values()) + { + String name = state.name(); + + if (name.contains("PROCESSING")) + { + for (IteratingCallback.Action action : IteratingCallback.Action.values()) + { + if (name.contains("CALLED")) + { + if (action == IteratingCallback.Action.SCHEDULED) + { + tests.add(Arguments.of(name, action.toString(), Boolean.TRUE)); + tests.add(Arguments.of(name, action.toString(), Boolean.FALSE)); + } + } + else if (action == IteratingCallback.Action.SCHEDULED) + { + tests.add(Arguments.of(name, action.toString(), Boolean.TRUE)); + tests.add(Arguments.of(name, action.toString(), Boolean.FALSE)); + } + else + { + tests.add(Arguments.of(name, action.toString(), null)); + } + } + } + else if (name.equals("COMPLETE") || name.contains("PENDING")) + { + tests.add(Arguments.of(name, null, Boolean.TRUE)); + tests.add(Arguments.of(name, null, Boolean.FALSE)); + } + else + { + tests.add(Arguments.of(name, null, null)); + } + } + + return tests.stream(); + } + + @ParameterizedTest + @MethodSource("abortTests") + public void testAbortInEveryState(String state, String action, Boolean success) throws Exception + { + CountDownLatch processLatch = new CountDownLatch(1); + + AtomicReference onAbort = new AtomicReference<>(); + AtomicReference onFailure = new AtomicReference<>(null); + AtomicMarkableReference onCompleted = new AtomicMarkableReference<>(null, false); + + Throwable cause = new Throwable("abort"); + Throwable failure = new Throwable("failure"); + AtomicInteger badCalls = new AtomicInteger(0); + + IteratingCallback callback = new IteratingCallback() + { + @Override + protected Action process() throws Throwable + { + if (state.contains("CALLED")) + { + if (success) + succeeded(); + else + failed(failure); + } + + if (state.contains("PENDING")) + return Action.SCHEDULED; + + if (state.equals("COMPLETE")) + { + if (success) + return Action.SUCCEEDED; + failed(new Throwable("Complete Failure")); + return Action.SCHEDULED; + } + + if (state.equals("CLOSED")) + { + close(); + return Action.SUCCEEDED; + } + + processLatch.await(); + return IteratingCallback.Action.valueOf(action); + } + + @Override + protected void onFailure(Throwable cause) + { + if (!onFailure.compareAndSet(null, cause)) + badCalls.incrementAndGet(); + } + + @Override + protected void onAborted(Throwable cause) + { + if (!onAbort.compareAndSet(null, cause)) + badCalls.incrementAndGet(); + } + + @Override + protected void onCompleteSuccess() + { + onCompleted.set(null, true); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + onCompleted.set(cause, true); + } + }; + + if (!state.equals("IDLE")) + { + new Thread(callback::iterate).start(); + } + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> callback.toString().contains(state)); + assertThat(callback.toString(), containsString("[" + state + ",")); + onAbort.set(null); + + if (success == Boolean.FALSE && (state.equals("COMPLETE") || state.equals("CLOSED"))) + { + // We must be failed already + assertThat(onFailure.get(), notNullValue()); + } + + boolean aborted = callback.abort(cause); + + // Check abort in completed state + if (state.equals("COMPLETE") || state.equals("CLOSED")) + { + assertThat(aborted, is(false)); + assertThat(onAbort.get(), nullValue()); + assertTrue(onCompleted.isMarked()); + if (success == Boolean.TRUE) + assertThat(onCompleted.getReference(), nullValue()); + else + assertThat(onCompleted.getReference(), notNullValue()); + return; + } + + // Check abort in non completed state + assertThat(aborted, is(true)); + + if (state.contains("PROCESSING")) + { + processLatch.countDown(); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(() -> !callback.toString().contains("PROCESSING")); + + if (action.equals("SCHEDULED")) + { + if (success) + { + callback.succeeded(); + } + else + { + Throwable failureAfterAbort = new Throwable("failure after abort"); + callback.failed(failureAfterAbort); + assertThat(onFailure.get(), not(sameInstance(failureAfterAbort))); + assertTrue(ExceptionUtil.areAssociated(onFailure.get(), failureAfterAbort)); + } + } + } + else if (state.contains("PENDING")) + { + if (success) + callback.succeeded(); + else + callback.failed(new Throwable("failure after abort")); + } + + assertTrue(onCompleted.isMarked()); + + if (state.contains("CALLED") && !success) + { + assertThat(onCompleted.getReference(), sameInstance(failure)); + assertThat(onAbort.get(), sameInstance(failure)); + } + else + { + assertThat(onCompleted.getReference(), sameInstance(cause)); + assertThat(onAbort.get(), sameInstance(cause)); + } + + assertThat(badCalls.get(), is(0)); + } + + private static class TestIteratingCB extends IteratingCallback + { + final AtomicInteger _count; + final AtomicInteger _badCalls = new AtomicInteger(0); + final AtomicBoolean _onSuccess = new AtomicBoolean(); + final AtomicReference _onFailure = new AtomicReference<>(); + final AtomicMarkableReference _completion = new AtomicMarkableReference<>(null, false); + final CountDownLatch _completed = new CountDownLatch(1); + + private TestIteratingCB() + { + this(1); + } + + private TestIteratingCB(int count) + { + _count = new AtomicInteger(count); + } + + @Override + protected Action process() + { + return _count.getAndDecrement() == 0 ? Action.SUCCEEDED : Action.SCHEDULED; + } + + @Override + protected void onAborted(Throwable cause) + { + _completion.compareAndSet(null, cause, false, false); + } + + @Override + protected void onSuccess() + { + if (!_onSuccess.compareAndSet(false, true)) + _badCalls.incrementAndGet(); + } + + @Override + protected void onFailure(Throwable cause) + { + if (!_onFailure.compareAndSet(null, cause)) + _badCalls.incrementAndGet(); + } + + @Override + protected void onCompleteSuccess() + { + if (_completion.isMarked()) + _badCalls.incrementAndGet(); + + if (_completion.compareAndSet(null, null, false, true)) + _completed.countDown(); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + if (_completion.isMarked()) + _badCalls.incrementAndGet(); + + if (_completion.compareAndSet(null, cause, false, true)) + _completed.countDown(); + + // Try again the CAS if there was a call to onAborted(). + Throwable failure = _completion.getReference(); + if (failure != null && _completion.compareAndSet(failure, failure, false, true)) + _completed.countDown(); + } + + public void checkNoBadCalls() + { + assertThat(_badCalls.get(), is(0)); + } } @Test public void testOnSuccessCalledDespiteISE() throws Exception { CountDownLatch latch = new CountDownLatch(1); + AtomicReference aborted = new AtomicReference<>(); IteratingCallback icb = new IteratingCallback() { @Override @@ -451,13 +953,27 @@ protected Action process() } @Override - protected void onSuccess() + protected void onAborted(Throwable cause) + { + aborted.set(cause); + super.onAborted(cause); + } + + @Override + protected void onCompleteSuccess() + { + latch.countDown(); + } + + @Override + protected void onCompleteFailure(Throwable cause) { latch.countDown(); } }; - assertThrows(IllegalStateException.class, icb::iterate); + icb.iterate(); assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertThat(aborted.get(), instanceOf(IllegalStateException.class)); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java index 698507ece9db..095c01071a47 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java @@ -635,10 +635,10 @@ private Flusher(Scheduler scheduler, int bufferSize, Generator generator, EndPoi } @Override - public void onCompleteFailure(Throwable x) + public void onFailure(Throwable x) { coreSession.processConnectionError(x, NOOP); - super.onCompleteFailure(x); + super.onFailure(x); } } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index 3aee03b73b0c..719ec026ab1d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -405,11 +405,8 @@ public void timeoutExpired() } @Override - public void onCompleteFailure(Throwable failure) + public void onFailure(Throwable failure) { - if (batchBuffer != null) - batchBuffer.clear(); - releaseAggregate(); try (AutoLock l = lock.lock()) { failedEntries.addAll(queue); @@ -418,9 +415,6 @@ public void onCompleteFailure(Throwable failure) failedEntries.addAll(entries); entries.clear(); - releasableBuffers.forEach(RetainableByteBuffer::release); - releasableBuffers.clear(); - if (closedCause == null) closedCause = failure; else if (closedCause != failure) @@ -436,6 +430,19 @@ else if (closedCause != failure) endPoint.close(closedCause); } + @Override + protected void onCompleteFailure(Throwable cause) + { + if (batchBuffer != null) + batchBuffer.clear(); + releaseAggregate(); + try (AutoLock l = lock.lock()) + { + releasableBuffers.forEach(RetainableByteBuffer::release); + releasableBuffers.clear(); + } + } + private void releaseAggregate() { if (batchBuffer != null && batchBuffer.isEmpty()) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java index 5e8d65573f41..c0a9a22e2a3b 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java @@ -497,7 +497,6 @@ private boolean inflate(Frame frame, Callback callback, boolean first) throws Da protected void onCompleteFailure(Throwable cause) { releasePayload(_payloadRef); - super.onCompleteFailure(cause); } private void releasePayload(AtomicReference reference) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java index 41e123b0f165..4465873e3f39 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java @@ -153,7 +153,7 @@ protected Action process() throws Throwable throw failure; if (!_demand.get()) - break; + return Action.IDLE; if (_needContent) { @@ -173,12 +173,10 @@ protected Action process() throws Throwable _callback = null; } } - - return Action.IDLE; } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { Throwable suppressed = _failure.getAndSet(cause); if (suppressed != null && suppressed != cause) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/TransformingFlusher.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/TransformingFlusher.java index edd5ee805ecd..542992d94cd6 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/TransformingFlusher.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/TransformingFlusher.java @@ -170,7 +170,7 @@ protected Action process() throws Throwable } @Override - protected void onCompleteFailure(Throwable t) + protected void onFailure(Throwable t) { if (log.isDebugEnabled()) log.debug("onCompleteFailure {}", t.toString()); @@ -180,7 +180,7 @@ protected void onCompleteFailure(Throwable t) notifyCallbackFailure(current.callback, t); current = null; } - onFailure(t); + TransformingFlusher.this.onFailure(t); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java index 2ae247437e02..7ba03da9e066 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java @@ -166,11 +166,11 @@ public void testWriteTimeout() throws Exception FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather) { @Override - public void onCompleteFailure(Throwable failure) + public void onFailure(Throwable failure) { error.set(failure); flusherFailure.countDown(); - super.onCompleteFailure(failure); + super.onFailure(failure); } }; diff --git a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java index 8d5394954543..b5fba5f0ca7a 100644 --- a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java +++ b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncMiddleManServlet.java @@ -400,7 +400,7 @@ private void process(ByteBuffer content, Callback callback, boolean finished) th } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { onError(x); } diff --git a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java index 11e048ac8236..df91f4f27305 100644 --- a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java +++ b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java @@ -192,7 +192,7 @@ protected void onRequestContent(HttpServletRequest request, Request proxyRequest } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { onError(cause); } diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java index ed81c81277d8..6bd878ed64c5 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java @@ -132,7 +132,6 @@ enum ApiState private State _state = State.OPEN; private boolean _softClose = false; private long _written; - private long _flushed; private long _firstByteNanoTime = -1; private ByteBufferPool.Sized _pool; private RetainableByteBuffer _aggregate; @@ -222,7 +221,8 @@ private void onWriteComplete(boolean last, Throwable failure) _state = State.CLOSED; closedCallback = _closedCallback; _closedCallback = null; - lockedReleaseBuffer(failure != null); + if (failure == null) + lockedReleaseBuffer(); wake = updateApiState(failure); } else if (_state == State.CLOSE) @@ -444,7 +444,7 @@ public void completed(Throwable failure) try (AutoLock ignored = _channelState.lock()) { _state = State.CLOSED; - lockedReleaseBuffer(failure != null); + lockedReleaseBuffer(); } } @@ -598,18 +598,13 @@ private RetainableByteBuffer lockedAcquireBuffer() return _aggregate; } - private void lockedReleaseBuffer(boolean failure) + private void lockedReleaseBuffer() { assert _channelState.isLockHeldByCurrentThread(); - if (_aggregate != null) { - if (failure && _pool != null) - _pool.removeAndRelease(_aggregate); - else - _aggregate.release(); + _aggregate.release(); _aggregate = null; - _pool = null; } } @@ -1251,7 +1246,7 @@ public void recycle() { try (AutoLock ignored = _channelState.lock()) { - lockedReleaseBuffer(_state != State.CLOSED); + lockedReleaseBuffer(); _state = State.OPEN; _apiState = ApiState.BLOCKING; _softClose = true; // Stay closed until next request @@ -1264,7 +1259,6 @@ public void recycle() _writeListener = null; _onError = null; _firstByteNanoTime = -1; - _flushed = 0; _closedCallback = null; } } @@ -1404,10 +1398,19 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + protected void onFailure(Throwable e) { onWriteComplete(_last, e); } + + @Override + protected void onCompleteFailure(Throwable cause) + { + try (AutoLock ignored = _channelState.lock()) + { + lockedReleaseBuffer(); + } + } } private abstract class NestedChannelWriteCB extends ChannelWriteCB @@ -1440,11 +1443,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + protected void onFailure(Throwable e) { try { - super.onCompleteFailure(e); + super.onFailure(e); } catch (Throwable t) { @@ -1467,7 +1470,7 @@ private AsyncFlush(boolean last) } @Override - protected Action process() throws Exception + protected Action process() { if (_aggregate != null && _aggregate.hasRemaining()) { @@ -1518,7 +1521,7 @@ private AsyncWrite(ByteBuffer buffer, boolean last) } @Override - protected Action process() throws Exception + protected Action process() { // flush any content from the aggregate if (_aggregate != null && _aggregate.hasRemaining()) @@ -1641,15 +1644,18 @@ protected void onCompleteSuccess() { _buffer.release(); IO.close(_in); - super.onCompleteSuccess(); + } + + @Override + protected void onFailure(Throwable cause) + { + IO.close(_in); } @Override public void onCompleteFailure(Throwable x) { _buffer.release(); - IO.close(_in); - super.onCompleteFailure(x); } } @@ -1714,15 +1720,18 @@ protected void onCompleteSuccess() { _buffer.release(); IO.close(_in); - super.onCompleteSuccess(); + } + + @Override + protected void onFailure(Throwable cause) + { + IO.close(_in); } @Override public void onCompleteFailure(Throwable x) { _buffer.release(); - IO.close(_in); - super.onCompleteFailure(x); } } diff --git a/jetty-ee10/jetty-ee10-webapp/src/main/java/org/eclipse/jetty/ee10/webapp/WebInfConfiguration.java b/jetty-ee10/jetty-ee10-webapp/src/main/java/org/eclipse/jetty/ee10/webapp/WebInfConfiguration.java index 7ae00e196f58..8998dd25dcec 100644 --- a/jetty-ee10/jetty-ee10-webapp/src/main/java/org/eclipse/jetty/ee10/webapp/WebInfConfiguration.java +++ b/jetty-ee10/jetty-ee10-webapp/src/main/java/org/eclipse/jetty/ee10/webapp/WebInfConfiguration.java @@ -18,11 +18,8 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; import jakarta.servlet.ServletContext; -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.FileID; import org.eclipse.jetty.util.IO; diff --git a/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncMiddleManServlet.java b/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncMiddleManServlet.java index b7f9d294456f..865094f0a93e 100644 --- a/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncMiddleManServlet.java +++ b/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncMiddleManServlet.java @@ -400,7 +400,7 @@ private void process(ByteBuffer content, Callback callback, boolean finished) th } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { onError(x); } diff --git a/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncProxyServlet.java b/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncProxyServlet.java index 00350bc2491a..ce317f0f0b4d 100644 --- a/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncProxyServlet.java +++ b/jetty-ee11/jetty-ee11-proxy/src/main/java/org/eclipse/jetty/ee11/proxy/AsyncProxyServlet.java @@ -192,9 +192,8 @@ protected void onRequestContent(HttpServletRequest request, Request proxyRequest } @Override - public void failed(Throwable x) + public void onFailure(Throwable x) { - super.failed(x); onError(x); } } diff --git a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/HttpOutput.java b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/HttpOutput.java index 09fc8b7bb042..755da4df1be5 100644 --- a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/HttpOutput.java +++ b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/HttpOutput.java @@ -132,7 +132,6 @@ enum ApiState private State _state = State.OPEN; private boolean _softClose = false; private long _written; - private long _flushed; private long _firstByteNanoTime = -1; private ByteBufferPool.Sized _pool; private RetainableByteBuffer _aggregate; @@ -222,7 +221,8 @@ private void onWriteComplete(boolean last, Throwable failure) _state = State.CLOSED; closedCallback = _closedCallback; _closedCallback = null; - lockedReleaseBuffer(failure != null); + if (failure == null) + lockedReleaseBuffer(); wake = updateApiState(failure); } else if (_state == State.CLOSE) @@ -325,10 +325,9 @@ public ByteBuffer takeContentAndClose() { if (_state != State.OPEN) throw new IllegalStateException(stateString()); - // TODO avoid this copy. ByteBuffer content = _aggregate != null && _aggregate.hasRemaining() ? BufferUtil.copy(_aggregate.getByteBuffer()) : BufferUtil.EMPTY_BUFFER; _state = State.CLOSED; - lockedReleaseBuffer(false); + lockedReleaseBuffer(); return content; } } @@ -458,7 +457,7 @@ public void completed(Throwable failure) try (AutoLock ignored = _channelState.lock()) { _state = State.CLOSED; - lockedReleaseBuffer(failure != null); + lockedReleaseBuffer(); } } @@ -612,18 +611,13 @@ private RetainableByteBuffer lockedAcquireBuffer() return _aggregate; } - private void lockedReleaseBuffer(boolean failure) + private void lockedReleaseBuffer() { assert _channelState.isLockHeldByCurrentThread(); - if (_aggregate != null) { - if (failure && _pool != null) - _pool.removeAndRelease(_aggregate); - else - _aggregate.release(); + _aggregate.release(); _aggregate = null; - _pool = null; } } @@ -1265,7 +1259,7 @@ public void recycle() { try (AutoLock ignored = _channelState.lock()) { - lockedReleaseBuffer(_state != State.CLOSED); + lockedReleaseBuffer(); _state = State.OPEN; _apiState = ApiState.BLOCKING; _softClose = true; // Stay closed until next request @@ -1278,7 +1272,6 @@ public void recycle() _writeListener = null; _onError = null; _firstByteNanoTime = -1; - _flushed = 0; _closedCallback = null; } } @@ -1418,10 +1411,19 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + protected void onFailure(Throwable e) { onWriteComplete(_last, e); } + + @Override + protected void onCompleteFailure(Throwable cause) + { + try (AutoLock ignored = _channelState.lock()) + { + lockedReleaseBuffer(); + } + } } private abstract class NestedChannelWriteCB extends ChannelWriteCB @@ -1454,11 +1456,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + protected void onFailure(Throwable e) { try { - super.onCompleteFailure(e); + super.onFailure(e); } catch (Throwable t) { @@ -1481,7 +1483,7 @@ private AsyncFlush(boolean last) } @Override - protected Action process() throws Exception + protected Action process() { if (_aggregate != null && _aggregate.hasRemaining()) { @@ -1532,7 +1534,7 @@ private AsyncWrite(ByteBuffer buffer, boolean last) } @Override - protected Action process() throws Exception + protected Action process() { // flush any content from the aggregate if (_aggregate != null && _aggregate.hasRemaining()) @@ -1655,15 +1657,18 @@ protected void onCompleteSuccess() { _buffer.release(); IO.close(_in); - super.onCompleteSuccess(); + } + + @Override + protected void onFailure(Throwable cause) + { + IO.close(_in); } @Override public void onCompleteFailure(Throwable x) { _buffer.release(); - IO.close(_in); - super.onCompleteFailure(x); } } @@ -1728,15 +1733,18 @@ protected void onCompleteSuccess() { _buffer.release(); IO.close(_in); - super.onCompleteSuccess(); + } + + @Override + protected void onFailure(Throwable cause) + { + IO.close(_in); } @Override public void onCompleteFailure(Throwable x) { _buffer.release(); - IO.close(_in); - super.onCompleteFailure(x); } } diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/FileBufferedResponseHandler.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/FileBufferedResponseHandler.java index 0eb943a384b8..efec2644bdf7 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/FileBufferedResponseHandler.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/FileBufferedResponseHandler.java @@ -221,7 +221,7 @@ protected void onCompleteSuccess() } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { dispose(); callback.failed(cause); diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java index 90352d561949..c6629ba20321 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java @@ -196,7 +196,6 @@ default void resetBuffer() throws IllegalStateException private boolean _softClose = false; private Interceptor _interceptor; private long _written; - private long _flushed; private long _firstByteNanoTime = -1; private ByteBufferPool.Sized _pool; private RetainableByteBuffer _aggregate; @@ -300,7 +299,8 @@ private void onWriteComplete(boolean last, Throwable failure) _state = State.CLOSED; closedCallback = _closedCallback; _closedCallback = null; - lockedReleaseBuffer(failure != null); + if (failure == null) + lockedReleaseBuffer(); wake = updateApiState(failure); } else if (_state == State.CLOSE) @@ -521,7 +521,7 @@ public void completed(Throwable failure) try (AutoLock l = _channelState.lock()) { _state = State.CLOSED; - lockedReleaseBuffer(failure != null); + lockedReleaseBuffer(); } } @@ -672,16 +672,13 @@ private RetainableByteBuffer lockedAcquireBuffer() return _aggregate; } - private void lockedReleaseBuffer(boolean failure) + private void lockedReleaseBuffer() { assert _channelState.isLockHeldByCurrentThread(); if (_aggregate != null) { - if (failure && _pool != null) - _pool.removeAndRelease(_aggregate); - else - _aggregate.release(); + _aggregate.release(); _aggregate = null; _pool = null; } @@ -1455,7 +1452,7 @@ public void recycle() { try (AutoLock l = _channelState.lock()) { - lockedReleaseBuffer(_state != State.CLOSED); + lockedReleaseBuffer(); _state = State.OPEN; _apiState = ApiState.BLOCKING; _softClose = true; // Stay closed until next request @@ -1469,7 +1466,6 @@ public void recycle() _writeListener = null; _onError = null; _firstByteNanoTime = -1; - _flushed = 0; _closedCallback = null; } } @@ -1616,10 +1612,19 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + protected void onFailure(Throwable e) { onWriteComplete(_last, e); } + + @Override + protected void onCompleteFailure(Throwable cause) + { + try (AutoLock l = _channelState.lock()) + { + lockedReleaseBuffer(); + } + } } private abstract class NestedChannelWriteCB extends ChannelWriteCB @@ -1652,11 +1657,11 @@ protected void onCompleteSuccess() } @Override - public void onCompleteFailure(Throwable e) + protected void onFailure(Throwable e) { try { - super.onCompleteFailure(e); + super.onFailure(e); } catch (Throwable t) { @@ -1679,7 +1684,7 @@ private AsyncFlush(boolean last) } @Override - protected Action process() throws Exception + protected Action process() { if (_aggregate != null && _aggregate.hasRemaining()) { @@ -1730,7 +1735,7 @@ private AsyncWrite(ByteBuffer buffer, boolean last) } @Override - protected Action process() throws Exception + protected Action process() { // flush any content from the aggregate if (_aggregate != null && _aggregate.hasRemaining()) @@ -1850,17 +1855,23 @@ protected Action process() throws Exception @Override protected void onCompleteSuccess() { + super.onCompleteSuccess(); _buffer.release(); IO.close(_in); - super.onCompleteSuccess(); } @Override - public void onCompleteFailure(Throwable x) + protected void onFailure(Throwable cause) { - _buffer.release(); + super.onFailure(cause); IO.close(_in); + } + + @Override + public void onCompleteFailure(Throwable x) + { super.onCompleteFailure(x); + _buffer.release(); } } @@ -1921,17 +1932,23 @@ protected Action process() throws Exception @Override protected void onCompleteSuccess() { + super.onCompleteSuccess(); _buffer.release(); IO.close(_in); - super.onCompleteSuccess(); } @Override - public void onCompleteFailure(Throwable x) + protected void onFailure(Throwable cause) { - _buffer.release(); + super.onFailure(cause); IO.close(_in); + } + + @Override + public void onCompleteFailure(Throwable x) + { super.onCompleteFailure(x); + _buffer.release(); } } diff --git a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncMiddleManServlet.java b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncMiddleManServlet.java index b37e0cfbb89c..c5a5095bed97 100644 --- a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncMiddleManServlet.java +++ b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncMiddleManServlet.java @@ -400,7 +400,7 @@ private void process(ByteBuffer content, Callback callback, boolean finished) th } @Override - protected void onCompleteFailure(Throwable x) + protected void onFailure(Throwable x) { onError(x); } diff --git a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java index 321da55fd150..6c90f26e7cdf 100644 --- a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java +++ b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java @@ -192,7 +192,7 @@ protected void onRequestContent(HttpServletRequest request, Request proxyRequest } @Override - protected void onCompleteFailure(Throwable cause) + protected void onFailure(Throwable cause) { onError(cause); } diff --git a/jetty-ee9/jetty-ee9-servlet/src/test/java/org/eclipse/jetty/ee9/servlet/ContextScopeListenerTest.java b/jetty-ee9/jetty-ee9-servlet/src/test/java/org/eclipse/jetty/ee9/servlet/ContextScopeListenerTest.java index ba36b7666061..569a5f9c7f5a 100644 --- a/jetty-ee9/jetty-ee9-servlet/src/test/java/org/eclipse/jetty/ee9/servlet/ContextScopeListenerTest.java +++ b/jetty-ee9/jetty-ee9-servlet/src/test/java/org/eclipse/jetty/ee9/servlet/ContextScopeListenerTest.java @@ -103,6 +103,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) { throw new RuntimeException(e); } + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> _history.size() == 5); } }), "/"); diff --git a/jetty-ee9/jetty-ee9-webapp/src/main/java/org/eclipse/jetty/ee9/webapp/WebInfConfiguration.java b/jetty-ee9/jetty-ee9-webapp/src/main/java/org/eclipse/jetty/ee9/webapp/WebInfConfiguration.java index ccb0ad828510..95dacdff2148 100644 --- a/jetty-ee9/jetty-ee9-webapp/src/main/java/org/eclipse/jetty/ee9/webapp/WebInfConfiguration.java +++ b/jetty-ee9/jetty-ee9-webapp/src/main/java/org/eclipse/jetty/ee9/webapp/WebInfConfiguration.java @@ -20,12 +20,9 @@ import java.nio.file.Path; import jakarta.servlet.ServletContext; -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.FileID; import org.eclipse.jetty.util.IO; -import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.URIUtil; import org.eclipse.jetty.util.resource.MountedPathResource; import org.eclipse.jetty.util.resource.Resource; diff --git a/jetty-home/pom.xml b/jetty-home/pom.xml index 1972104a95e8..3816d1e97dbc 100644 --- a/jetty-home/pom.xml +++ b/jetty-home/pom.xml @@ -236,14 +236,6 @@ jar true - - org.eclipse.jetty.demos - jetty-servlet5-demo-jndi-webapp - ${project.version} - config - jar - true - org.eclipse.jetty.demos jetty-servlet5-demo-jsp-webapp