From 05a3655c844e9474ec5768792355bdd21bfbdf77 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 13 Feb 2019 11:30:53 +1100 Subject: [PATCH] Issue #3170 - ProxyFrameHandler race between onOpen and onError - Introduced an EMPTY_SESSION in the ProxyFrameHandler as a terminal state to know whether a FailedCoreSession has been handled - Use while(true) loops to do the compareAndSet in ProxyFrameHandler - Improved the tests for the proxy so that it tests the frames received at every state (ie Client Proxy and Server) Signed-off-by: Lachlan Roberts --- .../core/proxy/BasicFrameHandler.java | 12 +- .../core/proxy/ProxyFrameHandler.java | 105 +++++++++++------- ...xyTest.java => ProxyFrameHandlerTest.java} | 46 ++++++-- 3 files changed, 107 insertions(+), 56 deletions(-) rename jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/{WebSocketProxyTest.java => ProxyFrameHandlerTest.java} (52%) diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/BasicFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/BasicFrameHandler.java index ad836f269670..69f8651bad0d 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/BasicFrameHandler.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/BasicFrameHandler.java @@ -66,9 +66,9 @@ public void sendText(String message) session.sendFrame(textFrame, Callback.NOOP, false); } - public void close() throws InterruptedException + public void close(String message) throws InterruptedException { - session.close(CloseStatus.NORMAL, "standard close", Callback.NOOP); + session.close(CloseStatus.NORMAL, message, Callback.NOOP); awaitClose(); } @@ -78,9 +78,9 @@ public void awaitClose() throws InterruptedException } - public static class EchoHandler extends BasicFrameHandler + public static class ServerEchoHandler extends BasicFrameHandler { - public EchoHandler(String name) + public ServerEchoHandler(String name) { super(name); } @@ -89,6 +89,7 @@ public EchoHandler(String name) public void onFrame(Frame frame, Callback callback) { System.err.println(name + " onFrame(): " + frame); + receivedFrames.offer(Frame.copy(frame)); if (frame.isDataFrame()) { @@ -96,9 +97,10 @@ public void onFrame(Frame frame, Callback callback) session.sendFrame(new Frame(frame.getOpCode(), frame.getPayload()), callback, false); } else + { callback.succeeded(); + } - receivedFrames.offer(Frame.copy(frame)); } } } \ No newline at end of file diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandler.java index 250680d53fba..b2634ef71913 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandler.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandler.java @@ -2,8 +2,10 @@ import java.io.IOException; import java.net.URI; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.Frame; @@ -16,23 +18,19 @@ class ProxyFrameHandler implements FrameHandler { private String name = "[ClientToProxy]"; private URI serverUri; - private WebSocketCoreClient client = new WebSocketCoreClient(); + private WebSocketCoreClient client; private CoreSession clientSession; private AtomicReference serverSession = new AtomicReference<>(); private AtomicReference closeFrameCallback = new AtomicReference<>(); - public ProxyFrameHandler() + private static CoreSession EMPTY_SESSION = new CoreSession.Empty(); + + protected BlockingQueue receivedFrames = new BlockingArrayQueue<>(); + + public ProxyFrameHandler(WebSocketCoreClient client, URI serverUri) { - try - { - serverUri = new URI("ws://localhost:8080/server"); - client.start(); - } - catch (Exception e) - { - e.printStackTrace(); - throw new RuntimeException(e); - } + this.client = client; + this.serverUri = serverUri; } @Override @@ -48,27 +46,35 @@ public void onOpen(CoreSession coreSession, Callback callback) { if (t != null) { - // We have failed to create the client so onClosed will never be called - // so it is our responsibility to close the WebSocketCoreClient - try - { - client.stop(); - } - catch (Exception e) - { - t.addSuppressed(e); - } - // If an onError callback was waiting to be completed in serverToProxyFH onOpen, then we must fail it. - CoreSession session = this.serverSession.get(); - if (session instanceof FailedCoreSession) + while (true) { - FailedCoreSession failedSession = (FailedCoreSession)session; - failedSession.failed(t); - t.addSuppressed(failedSession.getThrowable()); + CoreSession session = serverSession.get(); + + if (session == null) + { + if (serverSession.compareAndSet(null, EMPTY_SESSION)) + break; + } + else if (session == EMPTY_SESSION) + { + break; + } + else + { + if (serverSession.compareAndSet(session, EMPTY_SESSION)) + { + if (session instanceof FailedCoreSession) + { + FailedCoreSession failedSession = (FailedCoreSession)session; + failedSession.failed(t); + t.addSuppressed(failedSession.getThrowable()); + } + + break; + } + } } - else - throw new IllegalStateException("onOpen was called but this callback was failed?"); callback.failed(t); } @@ -80,7 +86,7 @@ public void onOpen(CoreSession coreSession, Callback callback) } catch (IOException e) { - clientSession.close(CloseStatus.SERVER_ERROR, e.getMessage(), Callback.from(callback,e)); + callback.failed(e); } } @@ -88,6 +94,7 @@ public void onOpen(CoreSession coreSession, Callback callback) public void onFrame(Frame frame, Callback callback) { System.err.println(name + " onFrame(): " + frame); + receivedFrames.offer(Frame.copy(frame)); onFrame(serverSession.get(), frame, callback); } @@ -128,8 +135,28 @@ public void onError(Throwable cause, Callback callback) System.err.println(name + " onError(): " + cause); cause.printStackTrace(); - if (!serverSession.compareAndSet(null, new FailedCoreSession(cause, callback))) - serverSession.get().close(CloseStatus.SHUTDOWN, cause.getMessage(), callback); + while (true) + { + CoreSession session = serverSession.get(); + if (session == EMPTY_SESSION) + { + callback.failed(cause); + break; + } + else if (session == null) + { + if (serverSession.compareAndSet(null, new FailedCoreSession(cause, callback))) + break; + } + else + { + if (serverSession.compareAndSet(session, EMPTY_SESSION)) + { + serverSession.get().close(CloseStatus.SHUTDOWN, cause.getMessage(), callback); + break; + } + } + } } @Override @@ -162,6 +189,7 @@ public void onOpen(CoreSession coreSession, Callback callback) public void onFrame(Frame frame, Callback callback) { System.err.println(name + " onFrame(): " + frame); + receivedFrames.offer(Frame.copy(frame)); ProxyFrameHandler.this.onFrame(clientSession, frame, callback); } @@ -177,16 +205,7 @@ public void onError(Throwable cause, Callback callback) public void onClosed(CloseStatus closeStatus, Callback callback) { System.err.println(name + " onClosed(): " + closeStatus); - - try - { - client.stop(); - callback.succeeded(); - } - catch (Exception e) - { - callback.failed(e); - } + callback.succeeded(); } } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandlerTest.java similarity index 52% rename from jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java rename to jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandlerTest.java index d5f8960c2005..adbe58756c0d 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxyTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/proxy/ProxyFrameHandlerTest.java @@ -8,6 +8,8 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession; import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; @@ -17,11 +19,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class WebSocketProxyTest +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ProxyFrameHandlerTest { Server _server; WebSocketCoreClient _client; + ProxyFrameHandler proxyFrameHandler; + BasicFrameHandler.ServerEchoHandler serverFrameHandler; @BeforeEach public void start() throws Exception @@ -34,22 +42,25 @@ public void start() throws Exception HandlerList handlers = new HandlerList(); ContextHandler serverContext = new ContextHandler("/server"); - WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> new BasicFrameHandler.EchoHandler("SERVER")); + serverFrameHandler = new BasicFrameHandler.ServerEchoHandler("SERVER"); + WebSocketNegotiator negotiator = WebSocketNegotiator.from((negotiation) -> serverFrameHandler); WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator); serverContext.setHandler(upgradeHandler); handlers.addHandler(serverContext); + _client = new WebSocketCoreClient(); + _client.start(); + URI uri = new URI("ws://localhost:8080/server"); + ContextHandler proxyContext = new ContextHandler("/proxy"); - negotiator = WebSocketNegotiator.from((negotiation) -> new ProxyFrameHandler()); + proxyFrameHandler = new ProxyFrameHandler(_client, uri); + negotiator = WebSocketNegotiator.from((negotiation) -> proxyFrameHandler); upgradeHandler = new WebSocketUpgradeHandler(negotiator); proxyContext.setHandler(upgradeHandler); handlers.addHandler(proxyContext); _server.setHandler(handlers); _server.start(); - - _client = new WebSocketCoreClient(); - _client.start(); } @AfterEach @@ -59,7 +70,6 @@ public void stop() throws Exception _server.stop(); } - @Test public void testHello() throws Exception { @@ -69,6 +79,26 @@ public void testHello() throws Exception CompletableFuture response = _client.connect(upgradeRequest); response.get(5, TimeUnit.SECONDS); clientHandler.sendText("hello world"); - clientHandler.close(); + clientHandler.close("standard close"); + + Frame frame; + + // Verify the the text frame was received + assertThat(proxyFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world")); + assertThat(serverFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world")); + assertThat(proxyFrameHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world")); + assertThat(clientHandler.receivedFrames.poll().getPayloadAsUTF8(), is("hello world")); + + // Verify the right close frame was received + assertThat(CloseStatus.getCloseStatus(proxyFrameHandler.receivedFrames.poll()).getReason(), is("standard close")); + assertThat(CloseStatus.getCloseStatus(serverFrameHandler.receivedFrames.poll()).getReason(), is("standard close")); + assertThat(CloseStatus.getCloseStatus(proxyFrameHandler.receivedFrames.poll()).getReason(), is("standard close")); + assertThat(CloseStatus.getCloseStatus(clientHandler.receivedFrames.poll()).getReason(), is("standard close")); + + // Verify no other frames were received + assertNull(proxyFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS)); + assertNull(serverFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS)); + assertNull(proxyFrameHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS)); + assertNull(clientHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS)); } }