Skip to content

Commit

Permalink
Merge branch 'jetty-10.0.x' into jetty-10-issue-3379-websocket-sessio…
Browse files Browse the repository at this point in the history
…n-tracking
  • Loading branch information
joakime committed Feb 27, 2019
2 parents 82adc20 + 47b84fe commit 177a438
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 36 deletions.
4 changes: 2 additions & 2 deletions Jmh_Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!groovy

def branch = params.get("JETTY_BRANCH" ,"jetty-9.4.x")
def branch = params.get("JETTY_BRANCH" ,"jetty-10.0.x")
def owner = params.get("REPO_OWNER", "eclipse")

node("linux") {
Expand All @@ -26,7 +26,7 @@ node("linux") {
timeout(time: 15, unit: 'MINUTES') {
withMaven(
maven: mvnName,
jdk: "jdk8",
jdk: "jdk11",
publisherStrategy: 'EXPLICIT',
globalMavenSettingsConfig: settingsName,
mavenOpts: mavenOpts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<div id="header"></div>

<div id="content">
<h1>Welcome to Jetty 9</h1>
<h1>Welcome to Jetty 10</h1>

<p>
The Jetty project is a 100% Java <a
Expand Down
35 changes: 13 additions & 22 deletions jetty-util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,31 +54,22 @@
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<mavenRepoPath>${settings.localRepository}</mavenRepoPath>
</systemPropertyVariables>
<argLine>
@{argLine} ${jetty.surefire.argLine}
--add-modules org.slf4j
</argLine>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<mavenRepoPath>${settings.localRepository}</mavenRepoPath>
</systemPropertyVariables>
<argLine>
@{argLine} ${jetty.surefire.argLine}
--add-modules jetty.servlet.api,org.slf4j
</argLine>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-perf-helper</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion jetty-websocket/javax-websocket-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<configuration>
<argLine>
@{argLine} ${jetty.surefire.argLine}
--add-opens org.eclipse.jetty.websocket.javax.server/org.eclipse.jetty.websocket.javax.server.examples=org.eclipse.jetty.websocket.javax.common
--add-exports org.eclipse.jetty.websocket.javax.server/org.eclipse.jetty.websocket.javax.server.examples=org.eclipse.jetty.websocket.javax.common
--add-reads org.eclipse.jetty.websocket.javax.server=org.eclipse.jetty.security
--add-reads org.eclipse.jetty.websocket.javax.common=org.eclipse.jetty.websocket.javax.server
</argLine>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Objects;

import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
Expand All @@ -52,6 +51,7 @@ public class FrameFlusher extends IteratingCallback
private final List<Entry> entries;
private final List<ByteBuffer> buffers;
private ByteBuffer batchBuffer = null;
private boolean canEnqueue = true;
private Throwable closedCause;

public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
Expand All @@ -78,22 +78,48 @@ public boolean enqueue(Frame frame, Callback callback, boolean batch)
{
Entry entry = new Entry(frame, callback, batch);
byte opCode = frame.getOpCode();
Throwable failure = null;

Throwable dead;

synchronized (this)
{
if (closedCause != null)
failure = closedCause;
else if (opCode == OpCode.PING || opCode == OpCode.PONG)
queue.offerFirst(entry);
if (canEnqueue)
{
dead = closedCause;
if (dead == null)
{
if (opCode == OpCode.PING || opCode == OpCode.PONG)
{
queue.offerFirst(entry);
}
else
{
queue.offerLast(entry);
}

if (opCode == OpCode.CLOSE)
{
this.canEnqueue = false;
}
}
}
else
queue.offerLast(entry);
{
dead = new ClosedChannelException();
}
}

if (failure != null)
callback.failed(failure);
if (dead == null)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Enqueued {} to {}", entry, this);
}
return true;
}

return failure==null;
notifyCallbackFailure(callback, dead);
return false;
}

public void onClose(Throwable cause)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.websocket.core.internal;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritePendingException;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.junit.jupiter.api.Test;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class FrameFlusherTest
{
public ByteBufferPool bufferPool = new MappedByteBufferPool();

/**
* Ensure post-close frames have their associated callbacks properly notified.
*/
@Test
public void testPostCloseFrameCallbacks() throws ExecutionException, InterruptedException, TimeoutException
{
Generator generator = new Generator(bufferPool);
CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool);
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
int maxGather = 1;
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather);

Frame closeFrame = new Frame(OpCode.CLOSE).setPayload(CloseStatus.asPayloadBuffer(CloseStatus.MESSAGE_TOO_LARGE, "Message be to big"));
Frame textFrame = new Frame(OpCode.TEXT).setPayload("Hello").setFin(true);

FutureCallback closeCallback = new FutureCallback();
FutureCallback textFrameCallback = new FutureCallback();

assertTrue(frameFlusher.enqueue(closeFrame, closeCallback, false));
assertFalse(frameFlusher.enqueue(textFrame, textFrameCallback, false));
frameFlusher.iterate();

closeCallback.get(5, TimeUnit.SECONDS);
// If this throws a TimeoutException then the callback wasn't called.
ExecutionException x = assertThrows(ExecutionException.class,
()-> textFrameCallback.get(5, TimeUnit.SECONDS));
assertThat(x.getCause(), instanceOf(ClosedChannelException.class));
}

/**
* Ensure that FrameFlusher honors the correct order of websocket frames.
*
* @see <a href="https://github.com/eclipse/jetty.project/issues/2491">eclipse/jetty.project#2491</a>
*/
@Test
public void testLargeSmallText() throws ExecutionException, InterruptedException
{
Generator generator = new Generator(bufferPool);
CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool);
int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE;
int maxGather = 8;
FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather);

int largeMessageSize = 60000;
byte[] buf = new byte[largeMessageSize];
Arrays.fill(buf, (byte) 'x');
String largeMessage = new String(buf, UTF_8);

int messageCount = 10000;

CompletableFuture<Void> serverTask = new CompletableFuture<>();

CompletableFuture.runAsync(() -> {
// Run Server Task
try
{
for (int i = 0; i < messageCount; i++)
{
FutureCallback callback = new FutureCallback();
Frame frame;

if (i % 2 == 0)
{
frame = new Frame(OpCode.TEXT).setPayload(largeMessage).setFin(true);
}
else
{
frame = new Frame(OpCode.TEXT).setPayload("Short Message: " + i).setFin(true);
}
frameFlusher.enqueue(frame, callback, false);
frameFlusher.iterate();
callback.get();
}
}
catch (Throwable t)
{
serverTask.completeExceptionally(t);
}
serverTask.complete(null);
});

serverTask.get();
System.out.printf("Received: %,d frames%n", endPoint.incomingFrames.size());
}

public static class CapturingEndPoint extends MockEndpoint
{
public Parser parser;
public LinkedBlockingQueue<Frame> incomingFrames = new LinkedBlockingQueue<>();

public CapturingEndPoint(ByteBufferPool bufferPool)
{
parser = new Parser(bufferPool);
}

@Override
public void shutdownOutput()
{
// ignore
}

@Override
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
Objects.requireNonNull(callback);
try
{
for (ByteBuffer buffer : buffers)
{
Parser.ParsedFrame frame = parser.parse(buffer);
if(frame != null)
{
incomingFrames.offer(frame);
}
}
callback.succeeded();
}
catch (WritePendingException e)
{
throw e;
}
catch (Throwable t)
{
callback.failed(t);
}
}
}
}
Loading

0 comments on commit 177a438

Please sign in to comment.