Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZeroMQ: optional topic wrap #9197

Merged
merged 14 commits into from
Jun 10, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* The {@link ReactiveMessageHandlerSpec} extension for {@link ZeroMqMessageHandler}.
*
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -126,6 +127,20 @@ public ZeroMqMessageHandlerSpec topic(String topic) {
return this;
}

/**
* Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the
* subscriptions must be wrapped with an additional empty frame.
* It is ignored for all other {@link SocketType}s supported.
* This attribute is set to {@code true} by default.
* @param wrapTopic true if the topic must be wrapped with an additional empty frame.
* @return the spec
* @since 6.2.6
*/
public ZeroMqMessageHandlerSpec wrapTopic(boolean wrapTopic) {
this.reactiveMessageHandler.wrapTopic(wrapTopic);
return this;
}

/**
* Specify a {@link Function} to evaluate a topic a {@link SocketType#PUB}
* is going to use for distributing messages into the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@

/**
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -108,6 +109,20 @@ public ZeroMqMessageProducerSpec topics(String... topics) {
return this;
}

/**
* Specify if the topic
* that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame.
* It is ignored for all other {@link SocketType}s supported.
* This attribute is set to {@code true} by default.
* @param unwrapTopic true if the received topic is wrapped with an additional empty frame.
* @return the spec
* @since 6.2.6
*/
public ZeroMqMessageProducerSpec unwrapTopic(boolean unwrapTopic) {
this.target.unwrapTopic(unwrapTopic);
return this;
}

/**
* Configure an URL for {@link org.zeromq.ZMQ.Socket#connect(String)}.
* @param connectUrl the URL to connect ZeroMq socket to.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -54,6 +55,7 @@
* When the {@link SocketType#SUB} is used, the received topic is stored in the {@link ZeroMqHeaders#TOPIC}.
*
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -90,6 +92,8 @@ public class ZeroMqMessageProducer extends MessageProducerSupport {

private volatile Mono<ZMQ.Socket> socketMono;

private volatile boolean unwrapTopic = true;

public ZeroMqMessageProducer(ZContext context) {
this(context, SocketType.PAIR);
}
Expand Down Expand Up @@ -189,6 +193,18 @@ public int getBoundPort() {
return this.bindPort.get();
}

/**
* Specify if the topic
* that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame.
* It is ignored for all other {@link SocketType}s supported.
* This attribute is set to {@code true} by default.
* @param unwrapTopic true if the received topic is wrapped with an additional empty frame.
* @since 6.2.6
*/
public void unwrapTopic(boolean unwrapTopic) {
this.unwrapTopic = unwrapTopic;
}

@Override
public String getComponentType() {
return "zeromq:inbound-channel-adapter";
Expand Down Expand Up @@ -284,7 +300,8 @@ private Mono<Message<?>> convertMessage(Mono<ZMsg> msgMono) {
return msgMono.map((msg) -> {
Map<String, Object> headers = null;
if (msg.size() > 1) {
headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, msg.unwrap().getString(ZMQ.CHARSET));
ZFrame topicFrame = this.unwrapTopic ? msg.unwrap() : msg.pop();
headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, topicFrame.getString(ZMQ.CHARSET));
}
return this.messageMapper.toMessage(msg.getLast().getData(), headers); // NOSONAR
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* the {@link ZMsg} is sent into a socket as is and it is not destroyed for possible further reusing.
*
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -88,6 +89,8 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler

private volatile Disposable socketMonoSubscriber;

private volatile boolean wrapTopic = true;

/**
* Create an instance based on the provided {@link ZContext} and connection string.
* @param context the {@link ZContext} to use for creating sockets.
Expand Down Expand Up @@ -191,6 +194,18 @@ public void setTopicExpression(Expression topicExpression) {
this.topicExpression = topicExpression;
}

/**
* Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the
* subscriptions must be wrapped with an additional empty frame.
* It is ignored for all other {@link SocketType}s supported.
* This attribute is set to {@code true} by default.
* @param wrapTopic true if the topic must be wrapped with an additional empty frame.
* @since 6.2.6
*/
public void wrapTopic(boolean wrapTopic) {
this.wrapTopic = wrapTopic;
}

@Override
public String getComponentType() {
return "zeromq:outbound-channel-adapter";
Expand Down Expand Up @@ -244,7 +259,13 @@ protected Mono<Void> handleMessageInternal(Message<?> message) {
if (socket.base() instanceof Pub) {
String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class);
if (topic != null) {
msg.wrap(new ZFrame(topic));
ZFrame topicFrame = new ZFrame(topic);
if (this.wrapTopic) {
msg.wrap(topicFrame);
}
else {
msg.push(topicFrame);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.integration.zeromq.ZeroMqHeaders;
import org.springframework.messaging.support.GenericMessage;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -40,6 +41,7 @@

/**
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -88,7 +90,7 @@ void testMessageProducerForPair() {

stepVerifier.verify();

messageProducer.destroy();
messageProducer.stop();
socket.close();
}

Expand Down Expand Up @@ -142,7 +144,42 @@ void testMessageProducerForPubSubReceiveRaw() {

stepVerifier.verify(Duration.ofSeconds(10));

messageProducer.destroy();
messageProducer.stop();
socket.close();
}

@Test
void testMessageProducerForPubSubDisabledWrapTopic() {
String socketAddress = "inproc://messageProducerWrapTopic.test";
ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB);
socket.bind(socketAddress);

FluxMessageChannel outputChannel = new FluxMessageChannel();

StepVerifier stepVerifier =
StepVerifier.create(outputChannel)
.assertNext((message) -> assertThat(message.getHeaders()).containsEntry(ZeroMqHeaders.TOPIC, "testTopicWithNonWrappedTopic"))
.thenCancel()
.verifyLater();

ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(CONTEXT, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("test");
messageProducer.setConnectUrl(socketAddress);
messageProducer.setBeanFactory(mock(BeanFactory.class));
messageProducer.unwrapTopic(false);
messageProducer.afterPropertiesSet();
messageProducer.start();

assertThat(socket.recv()).isNotNull();

ZMsg msg = ZMsg.newStringMsg("test");
msg.push("testTopicWithNonWrappedTopic");
msg.send(socket);

stepVerifier.verify();

messageProducer.stop();
socket.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

/**
* @author Artem Bilan
* @author Alessio Matricardi
*
* @since 5.4
*/
Expand Down Expand Up @@ -150,4 +151,40 @@ void testMessageHandlerForPushPullOverProxy() {
proxy.destroy();
}

@Test
void testMessageHandlerForPubSubDisabledWrapTopic() {
ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB);
subSocket.setReceiveTimeOut(0);
int port = subSocket.bindToRandomPort("tcp://*");
subSocket.subscribe("test");

ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(CONTEXT, "tcp://localhost:" + port, SocketType.PUB);
messageHandler.setBeanFactory(mock(BeanFactory.class));
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
messageHandler.wrapTopic(false);
messageHandler.afterPropertiesSet();
messageHandler.start();

Message<?> testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build();

await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100))
.untilAsserted(() -> {
subSocket.subscribe("test");
messageHandler.handleMessage(testMessage).subscribe();
ZMsg msg = ZMsg.recvMsg(subSocket);
assertThat(msg).isNotNull();
assertThat(msg.pop().getString(ZMQ.CHARSET)).isEqualTo("testTopic");
Message<?> capturedMessage =
new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData());
assertThat(capturedMessage).isEqualTo(testMessage);
msg.destroy();
});

messageHandler.destroy();
subSocket.close();
}

}
7 changes: 7 additions & 0 deletions src/reference/antora/modules/ROOT/pages/zeromq.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ If the `receiveRaw` option is set to `true`, a `ZMsg`, consumed from the socket,
Otherwise, an `InboundMessageMapper` is used to convert the consumed data into a `Message`.
If the received `ZMsg` is multi-frame, the first frame is treated as the `ZeroMqHeaders.TOPIC` header this ZeroMQ message was published to.

If the `unwrapTopic` option is set to `false`, the incoming message is considered to consist of two frames: the topic and the ZeroMQ message.
Otherwise, by default, the `ZMsg` is considered to consist of three frames: the first one containing the topic, the last frame containing the message, with an empty frame in the middle.

With `SocketType.SUB`, the `ZeroMqMessageProducer` uses the provided `topics` option for subscriptions; defaults to subscribe to all.
Subscriptions can be adjusted at runtime using `subscribeToTopics()` and `unsubscribeFromTopics()` `@ManagedOperation` s.

Expand Down Expand Up @@ -146,6 +149,10 @@ Only `SocketType.PAIR`, `SocketType.PUSH` and `SocketType.PUB` are supported.
The `ZeroMqMessageHandler` only supports connecting the ZeroMQ socket; binding is not supported.
When the `SocketType.PUB` is used, the `topicExpression` is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null.
The subscriber side (`SocketType.SUB`) must receive the topic frame first before parsing the actual data.

If the `wrapTopic` option is set to `false`, the ZeroMQ message frame is sent after the injected topic, if present.
By default, an additional empty frame is sent between the topic and the message.

When the payload of the request message is a `ZMsg`, no conversion or topic extraction is performed: the `ZMsg` is sent into a socket as is and it is not destroyed for possible further reuse.
Otherwise, an `OutboundMessageMapper<byte[]>` is used to convert a request message (or just its payload) into a ZeroMQ frame to publish.
By default, a `ConvertingBytesMessageMapper` is used supplied with a `ConfigurableCompositeMessageConverter`.
Expand Down