diff --git a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpExchangeDelegate.java b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpExchangeDelegate.java
index 33d0f68869fd..48de184c007e 100644
--- a/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpExchangeDelegate.java
+++ b/jetty-http-spi/src/main/java/org/eclipse/jetty/http/spi/JettyHttpExchangeDelegate.java
@@ -209,8 +209,11 @@ public void setAttribute(String name, Object value)
@Override
public void setStreams(InputStream i, OutputStream o)
{
- _is = i;
- _os = o;
+ assert _is != null;
+ if (i != null)
+ _is = i;
+ if (o != null)
+ _os = o;
}
@Override
diff --git a/jetty-http/src/main/resources/org/eclipse/jetty/http/mime.properties b/jetty-http/src/main/resources/org/eclipse/jetty/http/mime.properties
index f689e3c239e9..5d2cb60aa106 100644
--- a/jetty-http/src/main/resources/org/eclipse/jetty/http/mime.properties
+++ b/jetty-http/src/main/resources/org/eclipse/jetty/http/mime.properties
@@ -178,7 +178,7 @@ wml=text/vnd.wap.wml
wmlc=application/vnd.wap.wmlc
wmls=text/vnd.wap.wmlscript
wmlsc=application/vnd.wap.wmlscriptc
-woff=application/font-woff
+woff=font/woff
woff2=font/woff2
wrl=model/vrml
wtls-ca-certificate=application/vnd.wap.wtls-ca-certificate
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
index c28079b7e098..54f58924b7b0 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
@@ -21,8 +21,10 @@
import org.eclipse.jetty.util.Callback;
/**
- * This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
- * these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
+ *
This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
+ * these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
+ *
+ * This class is not thread safe and callers must do mutual exclusion.
*/
public class ByteBufferCallbackAccumulator
{
@@ -89,11 +91,14 @@ public void writeTo(ByteBuffer buffer)
public void fail(Throwable t)
{
- for (Entry entry : _entries)
+ // In some usages the callback recursively fails the accumulator.
+ // So we copy and clear to avoid double completing the callback.
+ ArrayList entries = new ArrayList<>(_entries);
+ _entries.clear();
+ _length = 0;
+ for (Entry entry : entries)
{
entry.callback.failed(t);
}
- _entries.clear();
- _length = 0;
}
}
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java
index 34a2529e799c..eb8901706d77 100644
--- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AbstractProxyServlet.java
@@ -49,6 +49,7 @@
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
@@ -145,11 +146,13 @@ public void destroy()
{
try
{
- _client.stop();
+ LifeCycle.stop(_client);
}
catch (Exception x)
{
- if (_log.isDebugEnabled())
+ if (_log == null)
+ x.printStackTrace();
+ else if (_log.isDebugEnabled())
_log.debug("Failed to stop client", x);
}
}
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AbstractProxyServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AbstractProxyServletTest.java
new file mode 100644
index 000000000000..f9c5f5af220a
--- /dev/null
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AbstractProxyServletTest.java
@@ -0,0 +1,39 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.proxy;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.client.api.Response.CompleteListener;
+import org.junit.jupiter.api.Test;
+
+public class AbstractProxyServletTest
+{
+
+ @Test
+ public void testNewDestroy() throws Exception
+ {
+ new AbstractProxyServlet()
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected CompleteListener newProxyResponseListener(HttpServletRequest clientRequest, HttpServletResponse proxyResponse)
+ {
+ return null;
+ }
+ }.destroy();
+ }
+}
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
index 365ece15cdde..ffe2bf51ecc5 100644
--- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
@@ -73,6 +73,7 @@
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.ajax.JSON;
+import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -159,9 +160,15 @@ private void startClient() throws Exception
@AfterEach
public void dispose() throws Exception
{
- client.stop();
- proxy.stop();
- server.stop();
+ LifeCycle.stop(client);
+ LifeCycle.stop(proxy);
+ LifeCycle.stop(proxy);
+ }
+
+ @Test
+ public void testNewDestroy() throws Exception
+ {
+ new AsyncMiddleManServlet().destroy();
}
@Test
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
index 4ba8bc67753b..634dc52a0335 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
@@ -400,6 +400,7 @@ public void failed(Throwable x)
break;
case PENDING:
{
+ _state = State.FAILED;
failure = true;
break;
}
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java
index 668d9bf1f9da..d588ab518bc1 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java
@@ -15,6 +15,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -322,4 +323,44 @@ boolean waitForComplete() throws InterruptedException
return isSucceeded();
}
}
+
+ @Test
+ public void testMultipleFailures() throws Exception
+ {
+ AtomicInteger process = new AtomicInteger();
+ AtomicInteger failure = new AtomicInteger();
+ IteratingCallback icb = new IteratingCallback()
+ {
+ @Override
+ protected Action process() throws Throwable
+ {
+ process.incrementAndGet();
+ return Action.SCHEDULED;
+ }
+
+ @Override
+ protected void onCompleteFailure(Throwable cause)
+ {
+ super.onCompleteFailure(cause);
+ failure.incrementAndGet();
+ }
+ };
+
+ icb.iterate();
+ assertEquals(1, process.get());
+ assertEquals(0, failure.get());
+
+ icb.failed(new Throwable("test1"));
+
+ assertEquals(1, process.get());
+ assertEquals(1, failure.get());
+
+ icb.succeeded();
+ assertEquals(1, process.get());
+ assertEquals(1, failure.get());
+
+ icb.failed(new Throwable("test2"));
+ assertEquals(1, process.get());
+ assertEquals(1, failure.get());
+ }
}
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java
index bc68f27ca569..3430c65697a6 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java
@@ -464,16 +464,23 @@ private boolean inflate(Frame frame, Callback callback, boolean first) throws Da
chunk.setPayload(payload);
chunk.setFin(frame.isFin() && complete);
- boolean succeedCallback = complete;
+ // If we are complete we return true, then DemandingFlusher.process() will null out the Frame and Callback.
+ // The application may decide to hold onto the buffer and delay completing the callback, so we need to capture
+ // references to these in the payloadCallback and not rely on state of the flusher which may have moved on.
+ // This flusher could be failed while the application already has the payloadCallback, so we need protection against
+ // the flusher failing and the application completing the callback, that's why we use the payload AtomicReference.
+ boolean completeCallback = complete;
AtomicReference payloadRef = _payloadRef;
Callback payloadCallback = Callback.from(() ->
{
getBufferPool().release(payloadRef.getAndSet(null));
- if (succeedCallback)
+ if (completeCallback)
callback.succeeded();
}, t ->
{
getBufferPool().release(payloadRef.getAndSet(null));
+ if (completeCallback)
+ callback.failed(t);
failFlusher(t);
});
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/AbstractMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/AbstractMessageSink.java
index f2c287d8b3fb..f02ffd1babb6 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/AbstractMessageSink.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/AbstractMessageSink.java
@@ -28,4 +28,9 @@ public AbstractMessageSink(CoreSession session, MethodHandle methodHandle)
this.session = Objects.requireNonNull(session, "CoreSession");
this.methodHandle = Objects.requireNonNull(methodHandle, "MethodHandle");
}
+
+ @Override
+ public void fail(Throwable failure)
+ {
+ }
}
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java
index 809ec121a8cd..466e9a4106df 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteArrayMessageSink.java
@@ -106,4 +106,11 @@ public void accept(Frame frame, Callback callback)
}
}
}
+
+ @Override
+ public void fail(Throwable failure)
+ {
+ if (out != null)
+ out.fail(failure);
+ }
}
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java
index c28eeeb0dba7..92768c029257 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/ByteBufferMessageSink.java
@@ -113,4 +113,11 @@ public void accept(Frame frame, Callback callback)
}
}
}
+
+ @Override
+ public void fail(Throwable failure)
+ {
+ if (out != null)
+ out.fail(failure);
+ }
}
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java
index 8a740b159ad4..acf44ca45e45 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/DispatchedMessageSink.java
@@ -164,4 +164,11 @@ public void succeeded()
typeSink.accept(frame, frameCallback);
}
+
+ @Override
+ public void fail(Throwable failure)
+ {
+ if (typeSink != null)
+ typeSink.fail(failure);
+ }
}
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageInputStream.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageInputStream.java
index 2b333d023add..7128089d0eaf 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageInputStream.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageInputStream.java
@@ -24,6 +24,7 @@
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.websocket.core.Frame;
import org.slf4j.Logger;
@@ -127,40 +128,6 @@ public int read(ByteBuffer buffer) throws IOException
return fillLen;
}
- @Override
- public void close() throws IOException
- {
- if (LOG.isDebugEnabled())
- LOG.debug("close()");
-
- ArrayList entries = new ArrayList<>();
- try (AutoLock l = lock.lock())
- {
- if (closed)
- return;
- closed = true;
-
- if (currentEntry != null)
- {
- entries.add(currentEntry);
- currentEntry = null;
- }
-
- // Clear queue and fail all entries.
- entries.addAll(buffers);
- buffers.clear();
- buffers.offer(CLOSED);
- }
-
- // Succeed all entries as we don't need them anymore (failing would close the connection).
- for (Entry e : entries)
- {
- e.callback.succeeded();
- }
-
- super.close();
- }
-
public void setTimeout(long timeoutMs)
{
this.timeoutMs = timeoutMs;
@@ -218,6 +185,49 @@ private Entry getCurrentEntry() throws IOException
}
}
+ @Override
+ public void close() throws IOException
+ {
+ fail(null);
+ }
+
+ @Override
+ public void fail(Throwable failure)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("close()");
+
+ ArrayList entries = new ArrayList<>();
+ try (AutoLock l = lock.lock())
+ {
+ if (closed)
+ return;
+ closed = true;
+
+ if (currentEntry != null)
+ {
+ entries.add(currentEntry);
+ currentEntry = null;
+ }
+
+ // Clear queue and fail all entries.
+ entries.addAll(buffers);
+ buffers.clear();
+ buffers.offer(CLOSED);
+ }
+
+ // Succeed all entries as we don't need them anymore (failing would close the connection).
+ for (Entry e : entries)
+ {
+ if (failure == null)
+ e.callback.succeeded();
+ else
+ e.callback.failed(failure);
+ }
+
+ IO.close(super::close);
+ }
+
private static class Entry
{
public ByteBuffer buffer;
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageReader.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageReader.java
index 5cd427c240e3..228ab4670a86 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageReader.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageReader.java
@@ -87,6 +87,12 @@ public void close() throws IOException
stream.close();
}
+ @Override
+ public void fail(Throwable failure)
+ {
+ stream.fail(failure);
+ }
+
@Override
public void accept(Frame frame, Callback callback)
{
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageSink.java
index 0e063dc8faea..0a4d43f62e36 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageSink.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/MessageSink.java
@@ -29,4 +29,12 @@ public interface MessageSink
* @param callback the callback for how the frame was consumed
*/
void accept(Frame frame, Callback callback);
+
+ /**
+ * Fail the message sink.
+ * Release any resources and fail all stored callbacks as {@link #accept(Frame, Callback)} will never be called again.
+ *
+ * @param failure the failure that occurred.
+ */
+ void fail(Throwable failure);
}
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteArrayMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteArrayMessageSink.java
index 88b5984bfa71..3fe7137b8a61 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteArrayMessageSink.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/PartialByteArrayMessageSink.java
@@ -22,7 +22,7 @@
public class PartialByteArrayMessageSink extends AbstractMessageSink
{
- private static byte[] EMPTY_BUFFER = new byte[0];
+ private static final byte[] EMPTY_BUFFER = new byte[0];
public PartialByteArrayMessageSink(CoreSession session, MethodHandle methodHandle)
{
diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java
index 984c272687c1..c79b69d660de 100644
--- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java
+++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/messages/StringMessageSink.java
@@ -69,4 +69,11 @@ public void accept(Frame frame, Callback callback)
}
}
}
+
+ @Override
+ public void fail(Throwable failure)
+ {
+ if (out != null)
+ out.reset();
+ }
}
diff --git a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java
index 6107f29fdaa5..5ebbfa9dfef1 100644
--- a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java
+++ b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java
@@ -38,6 +38,7 @@
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
+import org.eclipse.jetty.websocket.core.exception.CloseException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.internal.messages.MessageSink;
@@ -270,6 +271,12 @@ public void onClose(Frame frame, Callback callback)
@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
+ if (activeMessageSink != null)
+ {
+ activeMessageSink.fail(new CloseException(closeStatus.getCode(), closeStatus.getCause()));
+ activeMessageSink = null;
+ }
+
notifyOnClose(closeStatus, callback);
container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionClosed(session));
diff --git a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/AbstractDecodedMessageSink.java b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/AbstractDecodedMessageSink.java
index d679977be402..5b272ebd1a07 100644
--- a/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/AbstractDecodedMessageSink.java
+++ b/jetty-websocket/websocket-javax-common/src/main/java/org/eclipse/jetty/websocket/javax/common/messages/AbstractDecodedMessageSink.java
@@ -80,6 +80,12 @@ public void accept(Frame frame, Callback callback)
_messageSink.accept(frame, callback);
}
+ @Override
+ public void fail(Throwable failure)
+ {
+ _messageSink.fail(failure);
+ }
+
public abstract static class Basic extends AbstractDecodedMessageSink
{
protected final List _decoders;
diff --git a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
index b428de3f2c3d..e002794c012f 100644
--- a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
+++ b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
@@ -294,8 +294,14 @@ public void onClosed(CloseStatus closeStatus, Callback callback)
this.delayedCallback = null;
}
+ CloseException closeException = new CloseException(closeStatus.getCode(), closeStatus.getCause());
if (delayedCallback != null)
- delayedCallback.failed(new CloseException(closeStatus.getCode(), closeStatus.getCause()));
+ delayedCallback.failed(closeException);
+
+ if (textSink != null)
+ textSink.fail(closeException);
+ if (binarySink != null)
+ binarySink.fail(closeException);
notifyOnClose(closeStatus, callback);
container.notifySessionListeners((listener) -> listener.onWebSocketSessionClosed(session));
diff --git a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ClientDisconnectTest.java b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ClientDisconnectTest.java
new file mode 100644
index 000000000000..e7e6306a89c9
--- /dev/null
+++ b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ClientDisconnectTest.java
@@ -0,0 +1,115 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.tests;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool.LogarithmicRetainablePool;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientDisconnectTest
+{
+ private final CompletableFuture _serverSocketFuture = new CompletableFuture<>();
+ private final Duration _serverIdleTimeout = Duration.ofSeconds(5);
+ private final int _messageSize = 5 * 1024 * 1024;
+ private Server _server;
+ private ServerConnector _connector;
+ private WebSocketClient _client;
+
+ @WebSocket
+ public class ServerSocket extends EchoSocket
+ {
+ @Override
+ public void onOpen(Session session)
+ {
+ _serverSocketFuture.complete(this);
+ super.onOpen(session);
+ }
+ }
+
+ @BeforeEach
+ public void before() throws Exception
+ {
+ _client = new WebSocketClient();
+ _server = new Server();
+ _connector = new ServerConnector(_server);
+ _server.addConnector(_connector);
+
+ ServletContextHandler contextHandler = new ServletContextHandler();
+ JettyWebSocketServletContainerInitializer.configure(contextHandler, ((servletContext, container) ->
+ {
+ container.addMapping("/", (req, resp) -> new ServerSocket());
+ container.setIdleTimeout(_serverIdleTimeout);
+ container.setMaxBinaryMessageSize(_messageSize);
+ }));
+ _server.setHandler(contextHandler);
+
+ _server.start();
+ _client.start();
+ }
+
+ @AfterEach
+ public void after() throws Exception
+ {
+ _client.stop();
+ _server.stop();
+ }
+
+ @Test
+ public void testBuffersAfterIncompleteMessage() throws Exception
+ {
+ URI uri = URI.create("ws://localhost:" + _connector.getLocalPort());
+
+ // Open connection to the server.
+ Session session = _client.connect(new WebSocketAdapter(), uri).get(5, TimeUnit.SECONDS);
+ ServerSocket serverSocket = _serverSocketFuture.get(5, TimeUnit.SECONDS);
+ assertNotNull(serverSocket);
+
+ // Send partial payload to server then abruptly close the connection.
+ byte[] bytes = new byte[300_000];
+ Arrays.fill(bytes, (byte)'x');
+ session.setMaxBinaryMessageSize(_messageSize);
+ session.getRemote().sendPartialBytes(BufferUtil.toBuffer(bytes), false);
+ session.disconnect();
+
+ // Wait for the server to close its session.
+ assertTrue(serverSocket.closeLatch.await(_serverIdleTimeout.toSeconds() + 1, TimeUnit.SECONDS));
+
+ // We should have no buffers still used in the pool.
+ LogarithmicRetainablePool bufferPool = (LogarithmicRetainablePool)_server.getBean(ByteBufferPool.class).asRetainableByteBufferPool();
+ assertThat(bufferPool.getDirectByteBufferCount() - bufferPool.getAvailableDirectByteBufferCount(), equalTo(0L));
+ assertThat(bufferPool.getHeapByteBufferCount() - bufferPool.getAvailableHeapByteBufferCount(), equalTo(0L));
+ }
+}
diff --git a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/LargeDeflateTest.java b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/LargeDeflateTest.java
index 6c3852bc1f51..56f9f0181d7e 100644
--- a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/LargeDeflateTest.java
+++ b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/LargeDeflateTest.java
@@ -33,6 +33,7 @@
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -92,6 +93,27 @@ void testDeflate() throws Exception
assertThat(message, is(sentMessage));
}
+ @Test
+ void testDeflateLargerThanMaxMessage() throws Exception
+ {
+ ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
+ upgradeRequest.addExtensions("permessage-deflate");
+
+ EventSocket clientSocket = new EventSocket();
+ ByteBuffer message = largePayloads();
+ Session session = _client.connect(clientSocket, URI.create("ws://localhost:" + _connector.getLocalPort() + "/ws"), upgradeRequest).get();
+
+ // Set the maxBinaryMessageSize on the server to be lower than the size of the message.
+ assertTrue(_serverSocket.openLatch.await(5, TimeUnit.SECONDS));
+ _serverSocket.session.setMaxBinaryMessageSize(message.remaining() - 1024);
+
+ session.getRemote().sendBytes(message);
+ assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(_serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
+ assertThat(_serverSocket.closeCode, is(StatusCode.MESSAGE_TOO_LARGE));
+ assertThat(_serverSocket.closeReason, containsString("Binary message too large"));
+ }
+
private static ByteBuffer largePayloads()
{
var bytes = new byte[4 * 1024 * 1024];