Skip to content

Commit

Permalink
Merge branch '4.x' of https://github.com/vert-x3/vertx-stomp into tex…
Browse files Browse the repository at this point in the history
…t_payload
  • Loading branch information
freynder committed Jan 30, 2024
2 parents 96338cd + d818eed commit 50d9fa3
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 23 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
</parent>

<artifactId>vertx-stomp</artifactId>
<version>4.4.3-SNAPSHOT</version>
<version>4.5.2-SNAPSHOT</version>

<name>Vert.x Stomp</name>
<description>Stomp support for Vert.x 3</description>

<properties>
<stack.version>4.4.3-SNAPSHOT</stack.version>
<stack.version>4.5.2-SNAPSHOT</stack.version>
<jar.manifest>${project.basedir}/src/main/resources/META-INF/MANIFEST.MF</jar.manifest>
</properties>

Expand Down
6 changes: 3 additions & 3 deletions src/main/generated/io/vertx/ext/stomp/FrameConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class FrameConverter {
private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;

public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, Frame obj) {
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, Frame obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "ack":
Expand Down Expand Up @@ -63,11 +63,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
}
}

public static void toJson(Frame obj, JsonObject json) {
static void toJson(Frame obj, JsonObject json) {
toJson(obj, json.getMap());
}

public static void toJson(Frame obj, java.util.Map<String, Object> json) {
static void toJson(Frame obj, java.util.Map<String, Object> json) {
if (obj.getAck() != null) {
json.put("ack", obj.getAck());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class StompClientOptionsConverter {
private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;

public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, StompClientOptions obj) {
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, StompClientOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "acceptedVersions":
Expand Down Expand Up @@ -84,11 +84,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
}
}

public static void toJson(StompClientOptions obj, JsonObject json) {
static void toJson(StompClientOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}

public static void toJson(StompClientOptions obj, java.util.Map<String, Object> json) {
static void toJson(StompClientOptions obj, java.util.Map<String, Object> json) {
if (obj.getAcceptedVersions() != null) {
JsonArray array = new JsonArray();
obj.getAcceptedVersions().forEach(item -> array.add(item));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class StompServerOptionsConverter {
private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;

public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, StompServerOptions obj) {
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, StompServerOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "heartbeat":
Expand Down Expand Up @@ -104,11 +104,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
}
}

public static void toJson(StompServerOptions obj, JsonObject json) {
static void toJson(StompServerOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}

public static void toJson(StompServerOptions obj, java.util.Map<String, Object> json) {
static void toJson(StompServerOptions obj, java.util.Map<String, Object> json) {
if (obj.getHeartbeat() != null) {
json.put("heartbeat", obj.getHeartbeat());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class DefaultConnectHandler implements Handler<ServerFrame> {

@Override
public void handle(ServerFrame sf) {
// Server negotiation
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/vertx/ext/stomp/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.json.annotations.JsonGen;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.stomp.impl.FrameException;
Expand All @@ -41,7 +42,8 @@
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@DataObject(generateConverter = true)
@DataObject
@JsonGen(publicConverter = false)
public class Frame {

// General headers
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/vertx/ext/stomp/StompClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.vertx.ext.stomp;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.json.annotations.JsonGen;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetClientOptions;

Expand All @@ -30,7 +31,8 @@
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@DataObject(generateConverter = true)
@DataObject
@JsonGen(publicConverter = false)
public class StompClientOptions extends NetClientOptions implements StompOptions {

// The default value of reuse address for stomp client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,5 @@ public interface StompServerConnection {
* @param pingHandler the ping handler
*/
void configureHeartbeat(long ping, long pong, Handler<StompServerConnection> pingHandler);

}
4 changes: 3 additions & 1 deletion src/main/java/io/vertx/ext/stomp/StompServerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.vertx.ext.stomp;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.json.annotations.JsonGen;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetServerOptions;
Expand All @@ -29,7 +30,8 @@
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@DataObject(generateConverter = true)
@DataObject
@JsonGen(publicConverter = false)
public class StompServerOptions extends NetServerOptions implements StompOptions {

public static final int DEFAULT_MAX_HEADER_LENGTH = 1024 * 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class DefaultStompHandler implements StompServerHandler {
private final Vertx vertx;
private final Context context;

private Handler<ServerFrame> connectHandler = new DefaultConnectHandler();
private Handler<ServerFrame> connectHandler;

private Handler<ServerFrame> stompHandler;

Expand Down Expand Up @@ -125,6 +125,7 @@ public DefaultStompHandler(Vertx vertx) {
this.context = Vertx.currentContext();
this.destinations = vertx.sharedData().getLocalMap("stomp.destinations");
this.users = new ConcurrentHashMap<>();
this.connectHandler = new DefaultConnectHandler();
}

@Override
Expand Down
58 changes: 54 additions & 4 deletions src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.vertx.core.net.NetServer;
import io.vertx.ext.stomp.*;

import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Default implementation of the {@link StompServer}.
Expand Down Expand Up @@ -116,7 +118,17 @@ public StompServer listen(int port, String host, Handler<AsyncResult<StompServer
"server.");
server
.connectHandler(socket -> {
StompServerConnection connection = new StompServerTCPConnectionImpl(socket, this, writingFrameHandler);
AtomicBoolean connected = new AtomicBoolean();
AtomicBoolean firstFrame = new AtomicBoolean();
StompServerConnection connection = new StompServerTCPConnectionImpl(socket, this, frame -> {
if (frame.frame().getCommand() == Command.CONNECTED) {
connected.set(true);
}
Handler<ServerFrame> h = writingFrameHandler;
if (h != null) {
h.handle(frame);
}
});
FrameParser parser = new FrameParser(options);
socket.exceptionHandler((exception) -> {
LOGGER.error("The STOMP server caught a TCP socket error - closing connection", exception);
Expand All @@ -130,7 +142,21 @@ public StompServer listen(int port, String host, Handler<AsyncResult<StompServer
connection.close();
}
)
.handler(frame -> stomp.handle(new ServerFrameImpl(frame, connection)));
.handler(frame -> {
if (frame.getCommand() == Command.CONNECT || frame.getCommand() == Command.STOMP) {
if (firstFrame.compareAndSet(false, true)) {
stomp.handle(new ServerFrameImpl(frame, connection));
} else {
connection.write(Frames.createErrorFrame("Already connected", Collections.emptyMap(), ""));
connection.close();
}
} else if (connected.get()) {
stomp.handle(new ServerFrameImpl(frame, connection));
} else {
connection.write(Frames.createErrorFrame("Not connected", Collections.emptyMap(), ""));
connection.close();
}
});
socket.handler(parser);
})
.listen(port, host, ar -> {
Expand Down Expand Up @@ -227,7 +253,17 @@ public Handler<ServerWebSocket> webSocketHandler() {
socket.reject();
return;
}
StompServerConnection connection = new StompServerWebSocketConnectionImpl(socket, this, writingFrameHandler);
AtomicBoolean connected = new AtomicBoolean();
AtomicBoolean firstFrame = new AtomicBoolean();
StompServerConnection connection = new StompServerWebSocketConnectionImpl(socket, this, frame -> {
if (frame.frame().getCommand() == Command.CONNECTED || frame.frame().getCommand() == Command.STOMP) {
connected.set(true);
}
Handler<ServerFrame> h = writingFrameHandler;
if (h != null) {
h.handle(frame);
}
});
FrameParser parser = new FrameParser(options);
socket.exceptionHandler((exception) -> {
LOGGER.error("The STOMP server caught a WebSocket error - closing connection", exception);
Expand All @@ -241,7 +277,21 @@ public Handler<ServerWebSocket> webSocketHandler() {
connection.close();
}
)
.handler(frame -> stomp.handle(new ServerFrameImpl(frame, connection)));
.handler(frame -> {
if (frame.getCommand() == Command.CONNECT) {
if (firstFrame.compareAndSet(false, true)) {
stomp.handle(new ServerFrameImpl(frame, connection));
} else {
connection.write(Frames.createErrorFrame("Already connected", Collections.emptyMap(), ""));
connection.close();
}
} else if (connected.get()) {
stomp.handle(new ServerFrameImpl(frame, connection));
} else {
connection.write(Frames.createErrorFrame("Not connected", Collections.emptyMap(), ""));
connection.close();
}
});
socket.handler(parser);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

package io.vertx.ext.stomp.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.auth.User;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
Expand All @@ -38,15 +44,21 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.Arrays;

/**
* Tests STOMP server with security.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@RunWith(VertxUnitRunner.class)
public class SecuredServerConnectionTest {

private Vertx vertx;
private StompServer server;
private HttpServer wsServer;
private HttpClient wsClient;
private StompClient client;

@Rule
public RunTestOnContext rule = new RunTestOnContext();
Expand All @@ -55,9 +67,17 @@ public class SecuredServerConnectionTest {
public void setUp(TestContext context) {
vertx = rule.vertx();
AuthenticationProvider provider = PropertyFileAuthentication.create(vertx, "test-auth.properties");
server = StompServer.create(vertx, new StompServerOptions().setSecured(true))
.handler(StompServerHandler.create(vertx).authProvider(provider))
.listen(context.asyncAssertSuccess());
server = StompServer.create(vertx, new StompServerOptions()
.setSecured(true)
.setWebsocketBridge(true)
.setWebsocketPath("/stomp"))
.handler(StompServerHandler.create(vertx).authProvider(provider));
server.listen(StompServerOptions.DEFAULT_STOMP_PORT).onComplete(context.asyncAssertSuccess());
wsServer = vertx.createHttpServer(new HttpServerOptions().setWebSocketSubProtocols(Arrays.asList("v10.stomp", "v11.stomp")))
.webSocketHandler(server.webSocketHandler());
wsServer.listen(8080).onComplete(context.asyncAssertSuccess());
wsClient = vertx.createHttpClient();
client = StompClient.create(vertx, new StompClientOptions().setLogin("admin").setPasscode("admin"));
}

@After
Expand Down Expand Up @@ -158,11 +178,66 @@ public void testFailedAuthenticationWithClient(TestContext context) {
}

void validate(TestContext context, Buffer buffer) {
context.assertTrue(buffer.toString().contains("CONNECTED"));
context.assertTrue(buffer.toString().contains("CONNECTED"), "Was expected <" + buffer.toString() + "> to contain 'CONNECTED'");
context.assertTrue(buffer.toString().contains("version:1.2"));

User user = server.stompHandler().getUserBySession(extractSession(buffer.toString()));
context.assertNotNull(user);
}

@Test
public void testTCPClientMustBeConnected(TestContext context) {
Async async = context.async();
NetClient client = vertx.createNetClient();
testClientMustBeConnected(context, v -> {
client.connect(server.actualPort(), "0.0.0.0").onComplete(context.asyncAssertSuccess(so -> {
Buffer received = Buffer.buffer();
so.handler(received::appendBuffer);
so.write(
"SEND\n" +
"destination:/test\n" +
"\n" +
"hello" +
FrameParser.NULL);
so.endHandler(v2 -> {
context.assertTrue(received.toString().startsWith("ERROR\n"));
async.complete();
});
}));
});
}

@Test
public void testWebSocketClientMustBeConnected(TestContext context) {
Async async = context.async();
testClientMustBeConnected(context, v -> {
wsClient.webSocket(8080, "localhost", "/stomp").onComplete(context.asyncAssertSuccess(ws -> {
Buffer received = Buffer.buffer();
ws.binaryMessageHandler(received::appendBuffer);
ws.writeBinaryMessage(
Buffer.buffer("SEND\n" +
"destination:/test\n" +
"\n" +
"hello" +
FrameParser.NULL));
ws.endHandler(v2 -> {
context.assertTrue(received.toString().startsWith("ERROR\n"));
async.complete();
});
}));
});
}

private void testClientMustBeConnected(TestContext context, Handler<Void> cont) {
client
.connect(server.actualPort(), "localhost")
.onComplete(context.asyncAssertSuccess(conn -> {
Future<String> fut = conn.subscribe("/test", frame -> {
context.fail("Should not receive a messsage");
});
fut.onComplete(context.asyncAssertSuccess(v2 -> {
cont.handle(null);
}));
}));
}
}

0 comments on commit 50d9fa3

Please sign in to comment.