Skip to content

Commit

Permalink
Issue jetty#3705 - WebSocket Session CompletableFuture refactor
Browse files Browse the repository at this point in the history
- sessionFutures for jetty and javax are now implemented using the
futureCoreSession which will occur after onOpen

- the request and response are set on the FrameHandler before the
upgrade

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Jun 5, 2019
1 parent edfdbec commit 24bd38a
Show file tree
Hide file tree
Showing 23 changed files with 139 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
package org.eclipse.jetty.websocket.javax.client;

import java.net.URI;
import java.util.concurrent.CompletableFuture;

import javax.websocket.Session;

import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
Expand All @@ -32,34 +31,29 @@
public class JavaxClientUpgradeRequest extends ClientUpgradeRequest
{
private final JavaxWebSocketClientContainer containerContext;
private final Object websocketPojo;
private final CompletableFuture<Session> futureSession;
private final JavaxWebSocketFrameHandler frameHandler;


public JavaxClientUpgradeRequest(JavaxWebSocketClientContainer clientContainer, WebSocketCoreClient coreClient, URI requestURI, Object websocketPojo)
{
super(coreClient, requestURI);
this.containerContext = clientContainer;
this.websocketPojo = websocketPojo;
this.futureSession = new CompletableFuture<>();

UpgradeRequest upgradeRequest = new DelegatedJavaxClientUpgradeRequest(this);
frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest);
}

@Override
protected void handleException(Throwable failure)
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
{
super.handleException(failure);
futureSession.completeExceptionally(failure);
frameHandler.setUpgradeRequest(new DelegatedJavaxClientUpgradeRequest(this));
frameHandler.setUpgradeResponse(new DelegatedJavaxClientUpgradeResponse(response));
super.upgrade(response, httpConnection);
}

@Override
public FrameHandler getFrameHandler(WebSocketCoreClient coreClient)
public FrameHandler getFrameHandler()
{
UpgradeRequest upgradeRequest = new DelegatedJavaxClientUpgradeRequest(this);
JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest, futureSession);
return frameHandler;
}

public CompletableFuture<Session> getFutureSession()
{
return futureSession;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.eclipse.jetty.websocket.javax.common.InvalidWebSocketException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketExtensionConfig;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory;

/**
Expand Down Expand Up @@ -105,17 +106,29 @@ protected WebSocketCoreClient getWebSocketCoreClient()
private CompletableFuture<Session> connect(JavaxClientUpgradeRequest upgradeRequest)
{
upgradeRequest.setConfiguration(defaultCustomizer);
CompletableFuture<Session> fut = upgradeRequest.getFutureSession();
CompletableFuture<Session> futureSession = new CompletableFuture<>();

try
{
getWebSocketCoreClient().connect(upgradeRequest);
return fut;
WebSocketCoreClient coreClient = getWebSocketCoreClient();
coreClient.connect(upgradeRequest).whenComplete((coreSession, error)->
{
if (error != null)
{
futureSession.completeExceptionally(error);
return;
}

JavaxWebSocketFrameHandler frameHandler = (JavaxWebSocketFrameHandler)upgradeRequest.getFrameHandler();
futureSession.complete(frameHandler.getSession());
});
}
catch (Exception e)
{
fut.completeExceptionally(e);
return fut;
futureSession.completeExceptionally(e);
}

return futureSession;
}

private Session connect(ConfiguredEndpoint configuredEndpoint, URI destURI) throws IOException
Expand Down Expand Up @@ -143,7 +156,7 @@ private Session connect(ConfiguredEndpoint configuredEndpoint, URI destURI) thro
try
{
Future<Session> sessionFuture = connect(upgradeRequest);
long timeout = coreClient.getHttpClient().getConnectTimeout();
long timeout = getWebSocketCoreClient().getHttpClient().getConnectTimeout();
if (timeout>0)
return sessionFuture.get(timeout+1000, TimeUnit.MILLISECONDS);
return sessionFuture.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

import javax.websocket.Extension;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.eclipse.jetty.io.ByteBufferPool;
Expand Down Expand Up @@ -156,10 +154,9 @@ public Set<javax.websocket.Session> getOpenSessions()
return sessionTracker.getSessions();
}

public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest,
CompletableFuture<Session> futureSession)
public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest)
{
return getFrameHandlerFactory().newJavaxWebSocketFrameHandler(websocketPojo, upgradeRequest, futureSession);
return getFrameHandlerFactory().newJavaxWebSocketFrameHandler(websocketPojo, upgradeRequest);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import javax.websocket.CloseReason;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage;
import javax.websocket.Session;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -96,16 +94,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
private JavaxWebSocketFrameHandlerMetadata.MessageMetadata binaryMetadata;
// TODO: need pingHandle ?
private MethodHandle pongHandle;
/**
* Immutable HandshakeRequest available via Session
*/
private final UpgradeRequest upgradeRequest;
/**
* Immutable javax.websocket.HandshakeResponse available via Session
*/
private final String id;

private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;

private final EndpointConfig endpointConfig;
private final CompletableFuture<Session> futureSession;
private MessageSink textSink;
private MessageSink binarySink;
private MessageSink activeMessageSink;
Expand All @@ -117,14 +110,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler

public JavaxWebSocketFrameHandler(JavaxWebSocketContainer container,
Object endpointInstance,
UpgradeRequest upgradeRequest,
MethodHandle openHandle, MethodHandle closeHandle, MethodHandle errorHandle,
JavaxWebSocketFrameHandlerMetadata.MessageMetadata textMetadata,
JavaxWebSocketFrameHandlerMetadata.MessageMetadata binaryMetadata,
MethodHandle pongHandle,
String id,
EndpointConfig endpointConfig,
CompletableFuture<Session> futureSession)
EndpointConfig endpointConfig)
{
this.LOG = Log.getLogger(endpointInstance.getClass());

Expand All @@ -136,7 +126,6 @@ public JavaxWebSocketFrameHandler(JavaxWebSocketContainer container,
throw oops;
}
this.endpointInstance = endpointInstance;
this.upgradeRequest = upgradeRequest;

this.openHandle = openHandle;
this.closeHandle = closeHandle;
Expand All @@ -145,9 +134,7 @@ public JavaxWebSocketFrameHandler(JavaxWebSocketContainer container,
this.binaryMetadata = binaryMetadata;
this.pongHandle = pongHandle;

this.id = id;
this.endpointConfig = endpointConfig;
this.futureSession = futureSession;
this.messageHandlerMap = new HashMap<>();
}

Expand All @@ -172,7 +159,7 @@ public void onOpen(CoreSession coreSession, Callback callback)
try
{
this.coreSession = coreSession;
session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
session = new JavaxWebSocketSession(container, coreSession, this, endpointConfig);

openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
closeHandle = InvokerUtils.bindTo(closeHandle, session);
Expand Down Expand Up @@ -212,13 +199,11 @@ public void onOpen(CoreSession coreSession, Callback callback)

container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionOpened(session));
callback.succeeded();
futureSession.complete(session);
}
catch (Throwable cause)
{
Exception wse = new WebSocketException(endpointInstance.getClass().getSimpleName() + " OPEN method error: " + cause.getMessage(), cause);
callback.failed(wse);
futureSession.completeExceptionally(wse);
}
}

Expand Down Expand Up @@ -279,8 +264,6 @@ public void onError(Throwable cause, Callback callback)
{
try
{
futureSession.completeExceptionally(cause);

if (errorHandle != null)
errorHandle.invoke(cause);
else
Expand All @@ -292,7 +275,6 @@ public void onError(Throwable cause, Callback callback)
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getSimpleName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
callback.failed(wsError);
// TODO should futureSession be failed here?
}
}

Expand Down Expand Up @@ -628,4 +610,24 @@ public void onContinuation(Frame frame, Callback callback)
throw new ProtocolException("Unable to process continuation during dataType " + dataType);
}
}

public void setUpgradeRequest(UpgradeRequest upgradeRequest)
{
this.upgradeRequest = upgradeRequest;
}

public void setUpgradeResponse(UpgradeResponse upgradeResponse)
{
this.upgradeResponse = upgradeResponse;
}

public UpgradeRequest getUpgradeRequest()
{
return upgradeRequest;
}

public UpgradeResponse getUpgradeResponse()
{
return upgradeResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.CloseReason;
Expand Down Expand Up @@ -108,8 +107,7 @@ public JavaxWebSocketFrameHandlerMetadata getMetadata(Class<?> endpointClass, En

public abstract JavaxWebSocketFrameHandlerMetadata createMetadata(Class<?> endpointClass, EndpointConfig endpointConfig);

public JavaxWebSocketFrameHandler newJavaxWebSocketFrameHandler(Object endpointInstance, UpgradeRequest upgradeRequest,
CompletableFuture<Session> futureSession)
public JavaxWebSocketFrameHandler newJavaxWebSocketFrameHandler(Object endpointInstance, UpgradeRequest upgradeRequest)
{
Object endpoint;
EndpointConfig config;
Expand Down Expand Up @@ -161,22 +159,13 @@ public JavaxWebSocketFrameHandler newJavaxWebSocketFrameHandler(Object endpointI
errorHandle = InvokerUtils.bindTo(errorHandle, endpoint);
pongHandle = InvokerUtils.bindTo(pongHandle, endpoint);

CompletableFuture<Session> future = futureSession;
if (future == null)
future = new CompletableFuture<>();

String id = upgradeRequest.toString();

JavaxWebSocketFrameHandler frameHandler = new JavaxWebSocketFrameHandler(
container,
endpoint,
upgradeRequest,
openHandle, closeHandle, errorHandle,
textMetadata, binaryMetadata,
pongHandle,
id,
config,
future);
config);

return frameHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
protected final SharedBlockingCallback blocking = new SharedBlockingCallback();
private final JavaxWebSocketContainer container;
private final FrameHandler.CoreSession coreSession;
private final Principal principal;
private final JavaxWebSocketFrameHandler frameHandler;
private final EndpointConfig config;
private final String id;
private final AvailableDecoders availableDecoders;
private final AvailableEncoders availableEncoders;
private final Map<String, String> pathParameters;
Expand All @@ -77,15 +75,11 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
public JavaxWebSocketSession(JavaxWebSocketContainer container,
FrameHandler.CoreSession coreSession,
JavaxWebSocketFrameHandler frameHandler,
Principal upgradeRequestPrincipal,
String id,
EndpointConfig endpointConfig)
{
this.container = container;
this.coreSession = coreSession;
this.frameHandler = frameHandler;
this.principal = upgradeRequestPrincipal;
this.id = id;

this.config = endpointConfig == null?new BasicEndpointConfig():endpointConfig;

Expand Down Expand Up @@ -139,7 +133,6 @@ public <T> void addMessageHandler(Class<T> clazz, MessageHandler.Whole<T> handle
}

frameHandler.addMessageHandler(this, clazz, handler);

}

/**
Expand Down Expand Up @@ -308,7 +301,7 @@ public JavaxWebSocketFrameHandler getFrameHandler()
@Override
public String getId()
{
return this.id;
return this.frameHandler.getUpgradeRequest().toString();
}

/**
Expand Down Expand Up @@ -516,7 +509,7 @@ public URI getRequestURI()
@Override
public Principal getUserPrincipal()
{
return this.principal;
return this.frameHandler.getUpgradeRequest().getUserPrincipal();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import javax.websocket.EndpointConfig;

Expand Down Expand Up @@ -68,8 +67,7 @@ protected JavaxWebSocketFrameHandler newJavaxFrameHandler(Object websocket)
ConfiguredEndpoint endpoint = new ConfiguredEndpoint(websocket, config);
UpgradeRequest upgradeRequest = new UpgradeRequestAdapter();

JavaxWebSocketFrameHandler localEndpoint = factory.newJavaxWebSocketFrameHandler(endpoint,
upgradeRequest, new CompletableFuture<>());
JavaxWebSocketFrameHandler localEndpoint = factory.newJavaxWebSocketFrameHandler(endpoint, upgradeRequest);

return localEndpoint;
}
Expand Down
Loading

0 comments on commit 24bd38a

Please sign in to comment.