Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support websocket text payload #86

Open
wants to merge 3 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
</parent>

<artifactId>vertx-stomp</artifactId>
<version>4.4.2-SNAPSHOT</version>
<version>4.4.3-SNAPSHOT</version>

<name>Vert.x Stomp</name>
<description>Stomp support for Vert.x 3</description>

<properties>
<stack.version>4.4.2-SNAPSHOT</stack.version>
<stack.version>4.4.3-SNAPSHOT</stack.version>
<jar.manifest>${project.basedir}/src/main/resources/META-INF/MANIFEST.MF</jar.manifest>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setMaxSubscriptionsByClient(((Number)member.getValue()).intValue());
}
break;
case "payloadMode":
if (member.getValue() instanceof String) {
obj.setPayloadMode(io.vertx.ext.stomp.PayloadMode.valueOf((String)member.getValue()));
}
break;
case "secured":
if (member.getValue() instanceof Boolean) {
obj.setSecured((Boolean)member.getValue());
Expand Down Expand Up @@ -112,6 +117,9 @@ public static void toJson(StompServerOptions obj, java.util.Map<String, Object>
json.put("maxHeaderLength", obj.getMaxHeaderLength());
json.put("maxHeaders", obj.getMaxHeaders());
json.put("maxSubscriptionsByClient", obj.getMaxSubscriptionsByClient());
if (obj.getPayloadMode() != null) {
json.put("payloadMode", obj.getPayloadMode().name());
}
json.put("secured", obj.isSecured());
json.put("sendErrorOnNoSubscriptions", obj.isSendErrorOnNoSubscriptions());
if (obj.getSupportedVersions() != null) {
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/vertx/ext/stomp/Destination.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ static Destination bridge(Vertx vertx, BridgeOptions options) {
@Fluent
Destination dispatch(StompServerConnection connection, Frame frame);

/**
* Dispatches the given frame as a text frame
*
* @param connection the connection
* @param frame the frame
* @return the current instance of {@link Destination}
*/
@Fluent
Destination dispatchText(StompServerConnection connection, Frame frame);

/**
* Dispatches the given frame as a binary frame
*
* @param connection the connection
* @param frame the frame
* @return the current instance of {@link Destination}
*/
Destination dispatchBinary(StompServerConnection connection, Frame frame);

/**
* Handles a subscription request to the current {@link Destination}.
*
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/vertx/ext/stomp/PayloadMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.vertx.ext.stomp;

public enum PayloadMode {
TEXT,
BINARY
}
14 changes: 12 additions & 2 deletions src/main/java/io/vertx/ext/stomp/StompServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,24 @@
public interface StompServerConnection {

/**
* Writes the given frame to the socket.
* Writes the given frame to the socket using default payload type
*
* @param frame the frame, must not be {@code null}.
* @param frame the frame, must not be {@code null}.
* @return the current {@link StompServerConnection}
*/
@Fluent
StompServerConnection write(Frame frame);

/**
* Writes the given frame to the socket.
*
* @param frame the frame, must not be {@code null}.
* @param payloadMode explicitely specify the payload type for the underlying socket to use (e.g. websockets)
* @return the current {@link StompServerConnection}
*/
@Fluent
StompServerConnection write(Frame frame, PayloadMode payloadMode);

/**
* Writes the given buffer to the socket. This is a low level API that should be used carefully.
*
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/vertx/ext/stomp/StompServerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class StompServerOptions extends NetServerOptions implements StompOptions

public static final String DEFAULT_WEBSOCKET_PATH = "/stomp";

public static final PayloadMode DEFAULT_PAYLOAD_MODE = PayloadMode.BINARY;

private int maxHeaderLength = DEFAULT_MAX_HEADER_LENGTH;
private int maxHeaders = DEFAULT_MAX_HEADERS;
Expand All @@ -68,6 +69,7 @@ public class StompServerOptions extends NetServerOptions implements StompOptions
private boolean disableTCPServer;
private boolean trailingLine = DEFAULT_TRAILING_LINE;

private PayloadMode payloadMode = DEFAULT_PAYLOAD_MODE;
/**
* Default constructor.
*/
Expand Down Expand Up @@ -102,6 +104,8 @@ public StompServerOptions(StompServerOptions other) {

this.disableTCPServer = other.disableTCPServer;
this.trailingLine = other.trailingLine;

this.payloadMode = other.payloadMode;
}

/**
Expand Down Expand Up @@ -471,4 +475,24 @@ public StompServerOptions setTrailingLine(boolean trailingLine) {
this.trailingLine = trailingLine;
return this;
}

/**
* Specify the default payload type to be used by the underlying socket. Useful for websocket transports.
*
* @return the default payload mode
*/
public PayloadMode getPayloadMode() {
return payloadMode;
}

/**
* Specify the default payload type to be used by the underlying socket. Useful for websocket transports.
*
* @param payloadMode the default payload mode to use
* @return the current {@link StompServerOptions}
*/
public StompServerOptions setPayloadMode(PayloadMode payloadMode) {
this.payloadMode = payloadMode;
return this;
}
}
6 changes: 5 additions & 1 deletion src/main/java/io/vertx/ext/stomp/impl/FrameParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ private void handleLine(Buffer buffer) {
String length = headers.get(Frame.CONTENT_LENGTH);
if (length != null) {
int contentLength = Integer.parseInt(length);
frameParser.fixedSizeMode(contentLength);
if (contentLength != 0) {
frameParser.fixedSizeMode(contentLength);
} else {
frameParser.delimitedMode(NULL);
}
} else {
frameParser.delimitedMode(NULL);
}
Expand Down
34 changes: 25 additions & 9 deletions src/main/java/io/vertx/ext/stomp/impl/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package io.vertx.ext.stomp.impl;

import io.vertx.core.Vertx;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompServerConnection;
import io.vertx.ext.stomp.*;
import io.vertx.ext.stomp.utils.Headers;

import java.util.ArrayList;
Expand Down Expand Up @@ -58,23 +55,42 @@ public String destination() {
/**
* Dispatches the given frame.
*
* @param connection the connection
* @param frame the frame ({@code SEND} frame).
* @param connection the connection
* @param frame the frame
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
* @return the current instance of {@link Destination}
*/
@Override
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
if (subscriptions.isEmpty()) {
lastUsedSubscriptions = -1;
return this;
}
Subscription subscription = getNextSubscription();
String messageId = UUID.randomUUID().toString();
Frame message = transform(frame, subscription, messageId);
subscription.connection.write(message);
if(payloadMode == null) {
subscription.connection.write(message); // Uses server defaults
} else {
subscription.connection.write(message, payloadMode); // Explicit
}
return this;
}

@Override
public Destination dispatch(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, null);
}

@Override
public Destination dispatchText(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.TEXT);
}

@Override
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.BINARY);
}

private Subscription getNextSubscription() {
lastUsedSubscriptions = lastUsedSubscriptions + 1;
if (lastUsedSubscriptions >= subscriptions.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class StompServerTCPConnectionImpl implements StompServerConnection {

private static final Logger log = LoggerFactory.getLogger(StompServerTCPConnectionImpl.class);

private final StompServer server;
protected final StompServer server;
private final NetSocket socket;
private final String sessionId;
protected final Handler<ServerFrame> handler;
Expand Down Expand Up @@ -72,6 +72,11 @@ public StompServerConnection write(Frame frame) {
return write(frame.toBuffer(server.options().isTrailingLine()));
}

@Override
public StompServerConnection write(Frame frame, PayloadMode payloadMode) {
return write(frame);
}

@Override
public StompServerConnection write(Buffer buffer) {
socket.write(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,26 @@ public StompServerWebSocketConnectionImpl(ServerWebSocket socket, StompServer se
}

@Override
public SSLSession sslSession() {
return this.socket.sslSession();
public SSLSession sslSession() { return this.socket.sslSession(); }

@Override
public StompServerConnection write(Frame frame) {
return write(frame, server.options().getPayloadMode());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the flow would be better if that calls write(frame, null) and then in write(Frame, PayloadMode) when the mode is null then default mode from server options is used

}

@Override
public StompServerConnection write(Frame frame, PayloadMode payloadMode) {
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, this));
}
Buffer stompPayload = frame.toBuffer(server.options().isTrailingLine());
if (payloadMode == PayloadMode.BINARY) {
return write(stompPayload);
} else if (payloadMode == PayloadMode.TEXT) {
return writeText(stompPayload.toString());
} else {
return write(stompPayload); // Default
}
}

@Override
Expand All @@ -51,6 +69,11 @@ public StompServerConnection write(Buffer buffer) {
return this;
}

public StompServerConnection writeText(String message) {
socket.writeTextMessage(message);
return this;
}

@Override
public void ping() {
if (handler != null) {
Expand Down
34 changes: 25 additions & 9 deletions src/main/java/io/vertx/ext/stomp/impl/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package io.vertx.ext.stomp.impl;

import io.vertx.core.Vertx;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompServerConnection;
import io.vertx.ext.stomp.*;
import io.vertx.ext.stomp.utils.Headers;

import java.util.ArrayList;
Expand Down Expand Up @@ -56,20 +53,39 @@ public String destination() {
/**
* Dispatches the given frame.
*
* @param connection the connection
* @param frame the frame ({@code SEND} frame).
* @param connection the connection
* @param frame the frame
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
* @return the current instance of {@link Destination}
*/
@Override
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
for (Subscription subscription : subscriptions) {
String messageId = UUID.randomUUID().toString();
Frame message = transform(frame, subscription, messageId);
subscription.connection.write(message);
if(payloadMode != null) {
subscription.connection.write(message, payloadMode);
} else {
subscription.connection.write(message);
}
}
return this;
}

@Override
public Destination dispatch(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, null);
}

@Override
public Destination dispatchText(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.TEXT);
}

@Override
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.BINARY);
}

public static Frame transform(Frame frame, Subscription subscription, String messageId) {
final Headers headers = Headers.create(frame.getHeaders())
// Destination already set in the input headers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package io.vertx.ext.stomp.impl;

import io.vertx.core.Vertx;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompServerConnection;
import io.vertx.ext.stomp.*;
import io.vertx.ext.stomp.utils.Headers;

import java.util.ArrayList;
Expand Down Expand Up @@ -58,12 +55,12 @@ public String destination() {
/**
* Dispatches the given frame.
*
* @param connection the connection
* @param frame the frame ({@code SEND} frame).
* @param connection the connection
* @param frame the frame
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
* @return the current instance of {@link Destination}
*/
@Override
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
if (subscriptions.isEmpty()) {
lastUsedSubscriptions = -1;
return this;
Expand All @@ -72,10 +69,29 @@ public synchronized Destination dispatch(StompServerConnection connection, Frame
String messageId = UUID.randomUUID().toString();
Frame message = transform(frame, subscription, messageId);
subscription.enqueue(message);
subscription.connection().write(message);
if(payloadMode != null) {
subscription.connection.write(message, payloadMode);
} else {
subscription.connection.write(message);
}
return this;
}

@Override
public Destination dispatch(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, null);
}

@Override
public Destination dispatchText(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.TEXT);
}

@Override
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.BINARY);
}

private Subscription getNextSubscription() {
lastUsedSubscriptions = lastUsedSubscriptions + 1;
if (lastUsedSubscriptions >= subscriptions.size()) {
Expand Down