diff --git a/Jmh_Jenkinsfile b/Jmh_Jenkinsfile index 5f6dd5803755..220ee0fb6771 100644 --- a/Jmh_Jenkinsfile +++ b/Jmh_Jenkinsfile @@ -1,6 +1,6 @@ #!groovy -def branch = params.get("JETTY_BRANCH" ,"jetty-9.4.x") +def branch = params.get("JETTY_BRANCH" ,"jetty-10.0.x") def owner = params.get("REPO_OWNER", "eclipse") node("linux") { @@ -26,7 +26,7 @@ node("linux") { timeout(time: 15, unit: 'MINUTES') { withMaven( maven: mvnName, - jdk: "jdk8", + jdk: "jdk11", publisherStrategy: 'EXPLICIT', globalMavenSettingsConfig: settingsName, mavenOpts: mavenOpts, diff --git a/jetty-distribution/src/main/resources/demo-base/webapps/ROOT/index.html b/jetty-distribution/src/main/resources/demo-base/webapps/ROOT/index.html index 1091c20b0405..7be710afceda 100644 --- a/jetty-distribution/src/main/resources/demo-base/webapps/ROOT/index.html +++ b/jetty-distribution/src/main/resources/demo-base/webapps/ROOT/index.html @@ -13,7 +13,7 @@
-

Welcome to Jetty 9

+

Welcome to Jetty 10

The Jetty project is a 100% Java + + org.apache.maven.plugins + maven-surefire-plugin + + + ${settings.localRepository} + + + @{argLine} ${jetty.surefire.argLine} + --add-modules org.slf4j + + + - - - - org.apache.maven.plugins - maven-surefire-plugin - - - ${settings.localRepository} - - - @{argLine} ${jetty.surefire.argLine} - --add-modules jetty.servlet.api,org.slf4j - - - - - - - org.eclipse.jetty.toolchain - jetty-servlet-api - provided - org.eclipse.jetty.toolchain jetty-perf-helper diff --git a/jetty-websocket/javax-websocket-server/pom.xml b/jetty-websocket/javax-websocket-server/pom.xml index fb8cd88ff592..5b5b1135b1b6 100644 --- a/jetty-websocket/javax-websocket-server/pom.xml +++ b/jetty-websocket/javax-websocket-server/pom.xml @@ -50,7 +50,7 @@ @{argLine} ${jetty.surefire.argLine} - --add-opens org.eclipse.jetty.websocket.javax.server/org.eclipse.jetty.websocket.javax.server.examples=org.eclipse.jetty.websocket.javax.common + --add-exports org.eclipse.jetty.websocket.javax.server/org.eclipse.jetty.websocket.javax.server.examples=org.eclipse.jetty.websocket.javax.common --add-reads org.eclipse.jetty.websocket.javax.server=org.eclipse.jetty.security --add-reads org.eclipse.jetty.websocket.javax.common=org.eclipse.jetty.websocket.javax.server diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index a437263538a1..ff44658de099 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Objects; -import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; @@ -52,6 +51,7 @@ public class FrameFlusher extends IteratingCallback private final List entries; private final List buffers; private ByteBuffer batchBuffer = null; + private boolean canEnqueue = true; private Throwable closedCause; public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather) @@ -78,22 +78,48 @@ public boolean enqueue(Frame frame, Callback callback, boolean batch) { Entry entry = new Entry(frame, callback, batch); byte opCode = frame.getOpCode(); - Throwable failure = null; + + Throwable dead; synchronized (this) { - if (closedCause != null) - failure = closedCause; - else if (opCode == OpCode.PING || opCode == OpCode.PONG) - queue.offerFirst(entry); + if (canEnqueue) + { + dead = closedCause; + if (dead == null) + { + if (opCode == OpCode.PING || opCode == OpCode.PONG) + { + queue.offerFirst(entry); + } + else + { + queue.offerLast(entry); + } + + if (opCode == OpCode.CLOSE) + { + this.canEnqueue = false; + } + } + } else - queue.offerLast(entry); + { + dead = new ClosedChannelException(); + } } - if (failure != null) - callback.failed(failure); + if (dead == null) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("Enqueued {} to {}", entry, this); + } + return true; + } - return failure==null; + notifyCallbackFailure(callback, dead); + return false; } public void onClose(Throwable cause) diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java new file mode 100644 index 000000000000..3b031ece7594 --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java @@ -0,0 +1,180 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.core.internal; + +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.WritePendingException; +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.core.WebSocketConstants; +import org.junit.jupiter.api.Test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FrameFlusherTest +{ + public ByteBufferPool bufferPool = new MappedByteBufferPool(); + + /** + * Ensure post-close frames have their associated callbacks properly notified. + */ + @Test + public void testPostCloseFrameCallbacks() throws ExecutionException, InterruptedException, TimeoutException + { + Generator generator = new Generator(bufferPool); + CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool); + int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE; + int maxGather = 1; + FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather); + + Frame closeFrame = new Frame(OpCode.CLOSE).setPayload(CloseStatus.asPayloadBuffer(CloseStatus.MESSAGE_TOO_LARGE, "Message be to big")); + Frame textFrame = new Frame(OpCode.TEXT).setPayload("Hello").setFin(true); + + FutureCallback closeCallback = new FutureCallback(); + FutureCallback textFrameCallback = new FutureCallback(); + + assertTrue(frameFlusher.enqueue(closeFrame, closeCallback, false)); + assertFalse(frameFlusher.enqueue(textFrame, textFrameCallback, false)); + frameFlusher.iterate(); + + closeCallback.get(5, TimeUnit.SECONDS); + // If this throws a TimeoutException then the callback wasn't called. + ExecutionException x = assertThrows(ExecutionException.class, + ()-> textFrameCallback.get(5, TimeUnit.SECONDS)); + assertThat(x.getCause(), instanceOf(ClosedChannelException.class)); + } + + /** + * Ensure that FrameFlusher honors the correct order of websocket frames. + * + * @see eclipse/jetty.project#2491 + */ + @Test + public void testLargeSmallText() throws ExecutionException, InterruptedException + { + Generator generator = new Generator(bufferPool); + CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool); + int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE; + int maxGather = 8; + FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather); + + int largeMessageSize = 60000; + byte[] buf = new byte[largeMessageSize]; + Arrays.fill(buf, (byte) 'x'); + String largeMessage = new String(buf, UTF_8); + + int messageCount = 10000; + + CompletableFuture serverTask = new CompletableFuture<>(); + + CompletableFuture.runAsync(() -> { + // Run Server Task + try + { + for (int i = 0; i < messageCount; i++) + { + FutureCallback callback = new FutureCallback(); + Frame frame; + + if (i % 2 == 0) + { + frame = new Frame(OpCode.TEXT).setPayload(largeMessage).setFin(true); + } + else + { + frame = new Frame(OpCode.TEXT).setPayload("Short Message: " + i).setFin(true); + } + frameFlusher.enqueue(frame, callback, false); + frameFlusher.iterate(); + callback.get(); + } + } + catch (Throwable t) + { + serverTask.completeExceptionally(t); + } + serverTask.complete(null); + }); + + serverTask.get(); + System.out.printf("Received: %,d frames%n", endPoint.incomingFrames.size()); + } + + public static class CapturingEndPoint extends MockEndpoint + { + public Parser parser; + public LinkedBlockingQueue incomingFrames = new LinkedBlockingQueue<>(); + + public CapturingEndPoint(ByteBufferPool bufferPool) + { + parser = new Parser(bufferPool); + } + + @Override + public void shutdownOutput() + { + // ignore + } + + @Override + public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException + { + Objects.requireNonNull(callback); + try + { + for (ByteBuffer buffer : buffers) + { + Parser.ParsedFrame frame = parser.parse(buffer); + if(frame != null) + { + incomingFrames.offer(frame); + } + } + callback.succeeded(); + } + catch (WritePendingException e) + { + throw e; + } + catch (Throwable t) + { + callback.failed(t); + } + } + } +} diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/MockEndpoint.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/MockEndpoint.java new file mode 100644 index 000000000000..76b1e6866b14 --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/MockEndpoint.java @@ -0,0 +1,178 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.core.internal; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ReadPendingException; +import java.nio.channels.WritePendingException; + +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.Callback; + +public class MockEndpoint implements EndPoint +{ + public static final String NOT_SUPPORTED = "Not supported by MockEndPoint"; + + @Override + public InetSocketAddress getLocalAddress() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public InetSocketAddress getRemoteAddress() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public boolean isOpen() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public long getCreatedTimeStamp() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void shutdownOutput() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public boolean isOutputShutdown() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public boolean isInputShutdown() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void close() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void close(Throwable cause) + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public int fill(ByteBuffer buffer) throws IOException + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public boolean flush(ByteBuffer... buffer) throws IOException + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public Object getTransport() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public long getIdleTimeout() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void setIdleTimeout(long idleTimeout) + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void fillInterested(Callback callback) throws ReadPendingException + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public boolean tryFillInterested(Callback callback) + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public boolean isFillInterested() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public Connection getConnection() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void setConnection(Connection connection) + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void onOpen() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void onClose(Throwable cause) + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public boolean isOptimizedForDirectBuffers() + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + + @Override + public void upgrade(Connection newConnection) + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } +}