Skip to content

Commit

Permalink
Issue #3428 - changes to AbstractDecodedMessageSink signature from re…
Browse files Browse the repository at this point in the history
…view

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Jun 29, 2020
1 parent acf4762 commit 4fdf52b
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.lang.invoke.MethodHandle;
import java.util.List;
import java.util.stream.Collectors;
import javax.websocket.CloseReason;
import javax.websocket.Decoder;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.exception.CloseException;
import org.eclipse.jetty.websocket.javax.common.decoders.RegisteredDecoder;
import org.eclipse.jetty.websocket.util.messages.MessageSink;
import org.slf4j.Logger;
Expand All @@ -35,18 +37,16 @@ public abstract class AbstractDecodedMessageSink implements MessageSink
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractDecodedMessageSink.class);

private final CoreSession _coreSession;
private final MethodHandle _methodHandle;
private final MessageSink _messageSink;

public AbstractDecodedMessageSink(CoreSession coreSession, MethodHandle methodHandle)
{
_coreSession = coreSession;
_methodHandle = methodHandle;

try
{
_messageSink = getMessageSink();
_messageSink = newMessageSink(coreSession);
}
catch (Exception e)
{
Expand All @@ -55,21 +55,27 @@ public AbstractDecodedMessageSink(CoreSession coreSession, MethodHandle methodHa
}
}

public CoreSession getCoreSession()
{
return _coreSession;
}

public MethodHandle getMethodHandle()
/**
* Invoke the MessageSink with the decoded message.
* @param args the decoded message.
*/
public void invoke(Object... args)
{
return _methodHandle;
try
{
_methodHandle.invoke(args);
}
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
}

/**
* @return a message sink which will first decode the message then pass it to {@link #_methodHandle}.
* @throws Exception for any error in creating the message sink.
*/
abstract MessageSink getMessageSink() throws Exception;
abstract MessageSink newMessageSink(CoreSession coreSession) throws Exception;

@Override
public void accept(Frame frame, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public DecodedBinaryMessageSink(CoreSession session, MethodHandle methodHandle,
}

@Override
MessageSink getMessageSink() throws Exception
MessageSink newMessageSink(CoreSession coreSession) throws Exception
{
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup()
.findVirtual(DecodedBinaryMessageSink.class, "onWholeMessage", MethodType.methodType(void.class, ByteBuffer.class))
.bindTo(this);
return new ByteBufferMessageSink(getCoreSession(), methodHandle);
return new ByteBufferMessageSink(coreSession, methodHandle);
}

@SuppressWarnings("Duplicates")
Expand All @@ -63,17 +63,13 @@ public void onWholeMessage(ByteBuffer wholeMessage)
try
{
T obj = decoder.decode(wholeMessage);
getMethodHandle().invoke(obj);
invoke(obj);
return;
}
catch (DecodeException e)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e);
}
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.eclipse.jetty.websocket.javax.common.messages;

import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
Expand All @@ -41,12 +42,12 @@ public DecodedBinaryStreamMessageSink(CoreSession session, MethodHandle methodHa
}

@Override
MessageSink getMessageSink() throws Exception
MessageSink newMessageSink(CoreSession coreSession) throws Exception
{
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup().findVirtual(DecodedBinaryStreamMessageSink.class,
"onStreamStart", MethodType.methodType(void.class, InputStream.class))
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup()
.findVirtual(DecodedBinaryStreamMessageSink.class, "onStreamStart", MethodType.methodType(void.class, InputStream.class))
.bindTo(this);
return new InputStreamMessageSink(getCoreSession(), methodHandle);
return new InputStreamMessageSink(coreSession, methodHandle);
}

@SuppressWarnings("Duplicates")
Expand All @@ -55,15 +56,11 @@ public void onStreamStart(InputStream stream)
try
{
T obj = _decoder.decode(stream);
getMethodHandle().invoke(obj);
invoke(obj);
}
catch (DecodeException e)
catch (DecodeException | IOException e)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e);
}
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public DecodedTextMessageSink(CoreSession session, MethodHandle methodHandle, Li
}

@Override
MessageSink getMessageSink() throws NoSuchMethodException, IllegalAccessException
MessageSink newMessageSink(CoreSession coreSession) throws NoSuchMethodException, IllegalAccessException
{
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup()
.findVirtual(getClass(), "onMessage", MethodType.methodType(void.class, String.class))
.bindTo(this);
return new StringMessageSink(getCoreSession(), methodHandle);
return new StringMessageSink(coreSession, methodHandle);
}

@SuppressWarnings("Duplicates")
Expand All @@ -62,17 +62,13 @@ public void onMessage(String wholeMessage)
try
{
T obj = decoder.decode(wholeMessage);
getMethodHandle().invoke(obj);
invoke(obj);
return;
}
catch (DecodeException e)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e);
}
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.eclipse.jetty.websocket.javax.common.messages;

import java.io.IOException;
import java.io.Reader;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodType;
Expand All @@ -41,12 +42,12 @@ public DecodedTextStreamMessageSink(CoreSession session, MethodHandle methodHand
}

@Override
MessageSink getMessageSink() throws Exception
MessageSink newMessageSink(CoreSession coreSession) throws Exception
{
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup().findVirtual(DecodedTextStreamMessageSink.class,
"onStreamStart", MethodType.methodType(void.class, Reader.class))
MethodHandle methodHandle = JavaxWebSocketFrameHandlerFactory.getServerMethodHandleLookup()
.findVirtual(DecodedTextStreamMessageSink.class, "onStreamStart", MethodType.methodType(void.class, Reader.class))
.bindTo(this);
return new ReaderMessageSink(getCoreSession(), methodHandle);
return new ReaderMessageSink(coreSession, methodHandle);
}

@SuppressWarnings("Duplicates")
Expand All @@ -55,15 +56,11 @@ public void onStreamStart(Reader reader)
try
{
T obj = _decoder.decode(reader);
getMethodHandle().invoke(obj);
invoke(obj);
}
catch (DecodeException e)
catch (DecodeException | IOException e)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Unable to decode", e);
}
catch (Throwable t)
{
throw new CloseException(CloseReason.CloseCodes.CANNOT_ACCEPT.getCode(), "Endpoint notification error", t);
}
}
}

0 comments on commit 4fdf52b

Please sign in to comment.