Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZeroMQ: optional topic wrap #9197

Merged
merged 14 commits into from
Jun 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler

private volatile Disposable socketMonoSubscriber;

private volatile boolean wrapTopic = true;

/**
* Create an instance based on the provided {@link ZContext} and connection string.
* @param context the {@link ZContext} to use for creating sockets.
Expand Down Expand Up @@ -191,6 +193,15 @@ public void setTopicExpression(Expression topicExpression) {
this.topicExpression = topicExpression;
}

/**
* Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the
* subscriptions must be wrapped with an additional empty frame.
* @param wrapTopic true iff the topic must be wrapped with an additional empty frame.
alessiomatricardi marked this conversation as resolved.
Show resolved Hide resolved
alessiomatricardi marked this conversation as resolved.
Show resolved Hide resolved
*/
public void wrapTopic(boolean wrapTopic) {
this.wrapTopic = wrapTopic;
}

@Override
public String getComponentType() {
return "zeromq:outbound-channel-adapter";
Expand Down Expand Up @@ -244,7 +255,12 @@ protected Mono<Void> handleMessageInternal(Message<?> message) {
if (socket.base() instanceof Pub) {
String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class);
if (topic != null) {
msg.wrap(new ZFrame(topic));
var frame = new ZFrame(topic);
alessiomatricardi marked this conversation as resolved.
Show resolved Hide resolved
if (wrapTopic) {
alessiomatricardi marked this conversation as resolved.
Show resolved Hide resolved
msg.wrap(frame);
} else {
alessiomatricardi marked this conversation as resolved.
Show resolved Hide resolved
msg.push(frame);
}
}
}
}
Expand Down