Skip to content

Commit

Permalink
Merge pull request #3291 from eclipse/jetty-10.0.x-3290-websocket-onOpen
Browse files Browse the repository at this point in the history
Issue #3290 async websocket onOpen, onError and onClose
  • Loading branch information
gregw authored Jan 30, 2019
2 parents 1d9e7b7 + f575161 commit 58b73c1
Show file tree
Hide file tree
Showing 42 changed files with 1,305 additions and 645 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,6 @@ private URI newURI(String uri)
@Override
public String toString()
{
return String.format("%s[%s %s %s]@%x", HttpRequest.class.getSimpleName(), getMethod(), getPath(), getVersion(), hashCode());
return String.format("%s[%s %s %s]@%x", this.getClass().getSimpleName(), getMethod(), getPath(), getVersion(), hashCode());
}
}
77 changes: 76 additions & 1 deletion jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ public InvocationType getInvocationType()
};
}

/**
* Create a callback from the passed success and failure
* @param success Called when the callback succeeds
* @param failure Called when the callback fails
* @return a new Callback
*/
static Callback from(Runnable success, Consumer<Throwable> failure)
{
return new Callback()
Expand All @@ -129,6 +135,10 @@ public void failed(Throwable x)
};
}

/** Creaste a callback that runs completed when it succeeds or fails
* @param completed The completion to run on success or failure
* @return a new callback
*/
static Callback from(Runnable completed)
{
return new Completing()
Expand All @@ -140,6 +150,67 @@ public void completed()
};
}

/**
* Create a nested callback that runs completed after
* completing the nested callback.
* @param callback The nested callback
* @param completed The completion to run after the nested callback is completed
* @return a new callback.
*/
static Callback from(Callback callback, Runnable completed)
{
return new Nested(callback)
{
public void completed()
{
completed.run();
}
};
}

/**
* Create a nested callback that runs completed before
* completing the nested callback.
* @param callback The nested callback
* @param completed The completion to run before the nested callback is completed. Any exceptions thrown
* from completed will result in a callback failure.
* @return a new callback.
*/
static Callback from(Runnable completed, Callback callback)
{
return new Callback()
{
@Override
public void succeeded()
{
try
{
completed.run();
callback.succeeded();
}
catch(Throwable t)
{
callback.failed(t);
}
}

@Override
public void failed(Throwable x)
{
try
{
completed.run();
}
catch(Throwable t)
{
x.addSuppressed(t);
}
callback.failed(x);
}
};
}


class Completing implements Callback
{
@Override
Expand All @@ -158,7 +229,11 @@ public void completed()
{
}
}


/**
* Nested Completing Callback that completes after
* completing the nested callback
*/
class Nested extends Completing
{
private final Callback callback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse
UpgradeRequest upgradeRequest = new DelegatedClientUpgradeRequest(this);
UpgradeResponse upgradeResponse = new DelegatedClientUpgradeResponse(response);

JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo,
upgradeRequest, upgradeResponse, futureJavaxSession);
JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureJavaxSession);

return frameHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import javax.websocket.ClientEndpoint;
Expand Down Expand Up @@ -171,8 +173,14 @@ private Session connect(ConfiguredEndpoint configuredEndpoint, URI destURI) thro
try
{
Future<Session> sessionFuture = connect(upgradeRequest);
// TODO: apply connect timeouts here?
return sessionFuture.get(); // TODO: unwrap IOException from ExecutionException?
long timeout = coreClient.getHttpClient().getConnectTimeout();
if (timeout>0)
return sessionFuture.get(timeout+1000, TimeUnit.MILLISECONDS);
return sessionFuture.get();
}
catch (TimeoutException e)
{
throw new IOException("Connection future not completed " + destURI, e);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,95 +192,55 @@ public void setMaxBinaryMessageBufferSize(int maxBinaryMessageBufferSize)
}

@Override
public void onClosed(CloseStatus closeStatus)
public void onOpen(CoreSession coreSession, Callback callback)
{
if (closeHandle != null)
try
{
try
{
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
closeHandle.invoke(closeReason);
}
catch (Throwable cause)
{
throw new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause);
}
}
this.coreSession = coreSession;
session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);

container.removeBean(session);
}
openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
closeHandle = InvokerUtils.bindTo(closeHandle, session);
errorHandle = InvokerUtils.bindTo(errorHandle, session);

@SuppressWarnings("Duplicates")
@Override
public void onError(Throwable cause)
{
futureSession.completeExceptionally(cause);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata);

if (errorHandle == null)
{
LOG.warn("Unhandled Error: " + endpointInstance, cause);
return;
}

try
{
errorHandle.invoke(cause);
}
catch (Throwable t)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
throw wsError;
}
}
pongHandle = InvokerUtils.bindTo(pongHandle, session);

@Override
public void onOpen(CoreSession coreSession) throws Exception
{
this.coreSession = coreSession;
session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
if (actualTextMetadata != null)
{
actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session);
actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session);
textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata);

openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
closeHandle = InvokerUtils.bindTo(closeHandle, session);
errorHandle = InvokerUtils.bindTo(errorHandle, session);
textMetadata = actualTextMetadata;
}

JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata);
JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata);
if (actualBinaryMetadata != null)
{
actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session);
actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session);
binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata);

pongHandle = InvokerUtils.bindTo(pongHandle, session);
binaryMetadata = actualBinaryMetadata;
}

if (actualTextMetadata != null)
{
actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session);
actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session);
textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata);
if (openHandle != null)
openHandle.invoke();

textMetadata = actualTextMetadata;
container.addBean(session, true);
futureSession.complete(session);
callback.succeeded();
}

if (actualBinaryMetadata != null)
catch (Throwable cause)
{
actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session);
actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session);
binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata);

binaryMetadata = actualBinaryMetadata;
}
Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);

if (openHandle != null)
{
try
{
openHandle.invoke();
}
catch (Throwable cause)
{
throw new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
}
// TODO This feels like double handling of the exception? Review need for futureSession
futureSession.completeExceptionally(wse);
callback.failed(wse);
}

container.addBean(session, true);
futureSession.complete(session);
}

@Override
Expand Down Expand Up @@ -314,6 +274,50 @@ public void onFrame(Frame frame, Callback callback)
dataType = OpCode.UNDEFINED;
}


@Override
public void onClosed(CloseStatus closeStatus, Callback callback)
{
try
{
if (closeHandle != null)
{
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
closeHandle.invoke(closeReason);
}
container.removeBean(session);
callback.succeeded();
}
catch (Throwable cause)
{
callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause));
}
}

@Override
public void onError(Throwable cause, Callback callback)
{
try
{
futureSession.completeExceptionally(cause);

if (errorHandle != null)
errorHandle.invoke(cause);
else
LOG.warn("Unhandled Error: " + endpointInstance, cause);
callback.succeeded();
}
catch (Throwable t)
{
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
callback.failed(wsError);
// TODO should futureSession be failed here?
}
}



public Set<MessageHandler> getMessageHandlers()
{
if (messageHandlerMap.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ public void filterReturnType(Object obj)
{
getBasicRemote().sendObject(obj);
}
catch (Throwable cause)
catch (Exception cause)
{
// TODO: need way to fail Channel.
frameHandler.onError(cause);
// TODO review this
throw new RuntimeException(cause);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@

package org.eclipse.jetty.websocket.javax.common;

import java.util.concurrent.TimeUnit;

import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.Session;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;

import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.Session;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -44,12 +45,12 @@ private void assertOnCloseInvocation(TrackingSocket socket, Matcher<String> even
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);

// These invocations are the same for all tests
localEndpoint.onOpen(channel);
localEndpoint.onOpen(channel, Callback.NOOP);
CloseStatus status = new CloseStatus(CloseStatus.NORMAL, "Normal");
Frame closeFrame = status.toFrame();
localEndpoint.onFrame(closeFrame, Callback.from(() ->
{
localEndpoint.onClosed(status);
localEndpoint.onClosed(status, Callback.NOOP);
}, t ->
{
throw new RuntimeException(t);
Expand Down
Loading

0 comments on commit 58b73c1

Please sign in to comment.