Skip to content

Commit

Permalink
remove duplicate TestFrameHandler from websocket core tests
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Nov 28, 2019
1 parent 7ebf673 commit e9c3eb0
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class TestAsyncFrameHandler implements FrameHandler

public CoreSession coreSession;
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
public CloseStatus closeStatus;
public volatile Throwable error;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -72,6 +73,7 @@ public void onClosed(CloseStatus closeStatus, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("[{}] onClosed {}", name, closeStatus);
this.closeStatus = closeStatus;
closeLatch.countDown();
callback.succeeded();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class TestFrameHandler implements SynchronousFrameHandler

protected CoreSession coreSession;
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
protected CloseStatus closeStatus;
protected Throwable failure;

public CountDownLatch open = new CountDownLatch(1);
Expand Down Expand Up @@ -76,6 +77,7 @@ public void onClosed(CloseStatus closeStatus)
{
if (LOG.isDebugEnabled())
LOG.debug("onClosed {}", closeStatus);
this.closeStatus = closeStatus;
closed.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.eclipse.jetty.websocket.core;

import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
Expand All @@ -51,7 +49,7 @@ public class WebSocketOpenTest extends WebSocketTester
private static Logger LOG = Log.getLogger(WebSocketOpenTest.class);

private WebSocketServer server;
private TestFrameHandler serverHandler;
private DemandingAsyncFrameHandler serverHandler;
private Socket client;

@AfterEach
Expand All @@ -63,7 +61,7 @@ public void after() throws Exception

public void setup(BiFunction<FrameHandler.CoreSession, Callback, Void> onOpen) throws Exception
{
serverHandler = new TestFrameHandler(onOpen);
serverHandler = new DemandingAsyncFrameHandler(onOpen);
server = new WebSocketServer(serverHandler);
server.start();
client = newClient(server.getLocalPort());
Expand All @@ -75,7 +73,7 @@ public void testSendFrameInOnOpen() throws Exception
setup((s, c) ->
{
assertThat(s.toString(), containsString("CONNECTED"));
WebSocketOpenTest.TestFrameHandler.sendText(s, "Hello", Callback.NOOP);
s.sendFrame(new Frame(OpCode.TEXT, "Hello"), NOOP, false);
c.succeeded();
s.demand(1);
return null;
Expand All @@ -84,7 +82,7 @@ public void testSendFrameInOnOpen() throws Exception
assertThat(frame.getPayloadAsUTF8(), is("Hello"));

client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NORMAL));

frame = receiveFrame(client.getInputStream());
Expand All @@ -104,10 +102,10 @@ public void testFailureInOnOpen() throws Exception
return null;
});

assertTrue(serverHandler.onError.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.error, notNullValue());

assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));

Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
Expand All @@ -131,7 +129,7 @@ public void testCloseInOnOpen() throws Exception
assertThat(new CloseStatus(frame).getCode(), is(CloseStatus.SHUTDOWN));

client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SHUTDOWN));
}

Expand Down Expand Up @@ -160,18 +158,18 @@ public void testAsyncOnOpen() throws Exception
Thread.sleep(100);

// Can send while onOpen is active
WebSocketOpenTest.TestFrameHandler.sendText(coreSession, "Hello", NOOP);
coreSession.sendFrame(new Frame(OpCode.TEXT, "Hello"), NOOP, false);
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getPayloadAsUTF8(), is("Hello"));

// But cannot receive
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(serverHandler.onClosed.await(1, TimeUnit.SECONDS));
assertFalse(serverHandler.closeLatch.await(1, TimeUnit.SECONDS));

// Can't demand until open
assertThrows(Throwable.class, () -> coreSession.demand(1));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(serverHandler.onClosed.await(1, TimeUnit.SECONDS));
assertFalse(serverHandler.closeLatch.await(1, TimeUnit.SECONDS));

// Succeeded moves to OPEN state and still does not read CLOSE frame
onOpenCallback.succeeded();
Expand All @@ -180,111 +178,39 @@ public void testAsyncOnOpen() throws Exception
// Demand start receiving frames
coreSession.demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS));

// Closed handled normally
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NORMAL));
frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(), is(OpCode.CLOSE));
assertThat(new CloseStatus(frame).getCode(), is(CloseStatus.NORMAL));
}

static class TestFrameHandler implements SynchronousFrameHandler
static class DemandingAsyncFrameHandler extends TestAsyncFrameHandler
{
private CoreSession coreSession;
private BiFunction<CoreSession, Callback, Void> onOpen;
private CloseStatus closeStatus;
private CountDownLatch onClosed = new CountDownLatch(1);
private Throwable error;
private CountDownLatch onError = new CountDownLatch(1);
private Frame frame;
private CountDownLatch onFrame = new CountDownLatch(1);

public CoreSession getCoreSession()
{
synchronized (this)
{
return coreSession;
}
}

TestFrameHandler(BiFunction<CoreSession, Callback, Void> onOpen)
DemandingAsyncFrameHandler(BiFunction<CoreSession, Callback, Void> onOpen)
{
this.onOpen = onOpen;
}

@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
LOG.info("onOpen {}", coreSession);
synchronized (this)
{
this.coreSession = coreSession;
}
if (LOG.isDebugEnabled())
LOG.debug("[{}] onOpen {}", name, coreSession);
this.coreSession = coreSession;
onOpen.apply(coreSession, callback);
}

@Override
public void onFrame(Frame frame, Callback callback)
{
LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload()));
callback.succeeded();
if (onFrame.getCount() == 1)
{
this.frame = frame;
onFrame.countDown();
}
}

@Override
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null ? null : cause.toString());
if (onError.getCount() != 1)
throw new IllegalStateException();
error = cause;
onError.countDown();
}

@Override
public void onClosed(CloseStatus closeStatus)
{
LOG.info("onClosed {}", closeStatus);
if (onClosed.getCount() != 1)
throw new IllegalStateException();
this.closeStatus = closeStatus;
onClosed.countDown();
openLatch.countDown();
}

@Override
public boolean isDemanding()
{
return true;
}

public void sendText(String text)
{
sendText(coreSession, text);
}

public void sendText(String text, Callback callback)
{
sendText(coreSession, text, callback);
}

static void sendText(FrameHandler.CoreSession coreSession, String text)
{
sendText(coreSession, text, NOOP);
}

static void sendText(FrameHandler.CoreSession coreSession, String text, Callback callback)
{
Frame frame = new Frame(OpCode.TEXT);
frame.setFin(true);
frame.setPayload(text);

coreSession.sendFrame(frame, callback, false);
}
}
}

0 comments on commit e9c3eb0

Please sign in to comment.