diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java index 24e14af7a77..b99fe26996c 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java @@ -36,6 +36,7 @@ * The {@link ReactiveMessageHandlerSpec} extension for {@link ZeroMqMessageHandler}. * * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -126,6 +127,20 @@ public ZeroMqMessageHandlerSpec topic(String topic) { return this; } + /** + * Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the + * subscriptions must be wrapped with an additional empty frame. + * It is ignored for all other {@link SocketType}s supported. + * This attribute is set to {@code true} by default. + * @param wrapTopic true if the topic must be wrapped with an additional empty frame. + * @return the spec + * @since 6.2.6 + */ + public ZeroMqMessageHandlerSpec wrapTopic(boolean wrapTopic) { + this.reactiveMessageHandler.wrapTopic(wrapTopic); + return this; + } + /** * Specify a {@link Function} to evaluate a topic a {@link SocketType#PUB} * is going to use for distributing messages into the diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java index 9d53fad05f9..986bb4292eb 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ /** * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -108,6 +109,20 @@ public ZeroMqMessageProducerSpec topics(String... topics) { return this; } + /** + * Specify if the topic + * that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame. + * It is ignored for all other {@link SocketType}s supported. + * This attribute is set to {@code true} by default. + * @param unwrapTopic true if the received topic is wrapped with an additional empty frame. + * @return the spec + * @since 6.2.6 + */ + public ZeroMqMessageProducerSpec unwrapTopic(boolean unwrapTopic) { + this.target.unwrapTopic(unwrapTopic); + return this; + } + /** * Configure an URL for {@link org.zeromq.ZMQ.Socket#connect(String)}. * @param connectUrl the URL to connect ZeroMq socket to. diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java index 0f0d3c02850..d2c484c3e19 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import org.zeromq.SocketType; import org.zeromq.ZContext; +import org.zeromq.ZFrame; import org.zeromq.ZMQ; import org.zeromq.ZMsg; import reactor.core.publisher.Flux; @@ -54,6 +55,7 @@ * When the {@link SocketType#SUB} is used, the received topic is stored in the {@link ZeroMqHeaders#TOPIC}. * * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -90,6 +92,8 @@ public class ZeroMqMessageProducer extends MessageProducerSupport { private volatile Mono socketMono; + private volatile boolean unwrapTopic = true; + public ZeroMqMessageProducer(ZContext context) { this(context, SocketType.PAIR); } @@ -189,6 +193,18 @@ public int getBoundPort() { return this.bindPort.get(); } + /** + * Specify if the topic + * that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame. + * It is ignored for all other {@link SocketType}s supported. + * This attribute is set to {@code true} by default. + * @param unwrapTopic true if the received topic is wrapped with an additional empty frame. + * @since 6.2.6 + */ + public void unwrapTopic(boolean unwrapTopic) { + this.unwrapTopic = unwrapTopic; + } + @Override public String getComponentType() { return "zeromq:inbound-channel-adapter"; @@ -284,7 +300,8 @@ private Mono> convertMessage(Mono msgMono) { return msgMono.map((msg) -> { Map headers = null; if (msg.size() > 1) { - headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, msg.unwrap().getString(ZMQ.CHARSET)); + ZFrame topicFrame = this.unwrapTopic ? msg.unwrap() : msg.pop(); + headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, topicFrame.getString(ZMQ.CHARSET)); } return this.messageMapper.toMessage(msg.getLast().getData(), headers); // NOSONAR }); diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index df6ac71d04a..2a668148318 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -60,6 +60,7 @@ * the {@link ZMsg} is sent into a socket as is and it is not destroyed for possible further reusing. * * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -88,6 +89,8 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler private volatile Disposable socketMonoSubscriber; + private volatile boolean wrapTopic = true; + /** * Create an instance based on the provided {@link ZContext} and connection string. * @param context the {@link ZContext} to use for creating sockets. @@ -191,6 +194,18 @@ public void setTopicExpression(Expression topicExpression) { this.topicExpression = topicExpression; } + /** + * Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the + * subscriptions must be wrapped with an additional empty frame. + * It is ignored for all other {@link SocketType}s supported. + * This attribute is set to {@code true} by default. + * @param wrapTopic true if the topic must be wrapped with an additional empty frame. + * @since 6.2.6 + */ + public void wrapTopic(boolean wrapTopic) { + this.wrapTopic = wrapTopic; + } + @Override public String getComponentType() { return "zeromq:outbound-channel-adapter"; @@ -244,7 +259,13 @@ protected Mono handleMessageInternal(Message message) { if (socket.base() instanceof Pub) { String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class); if (topic != null) { - msg.wrap(new ZFrame(topic)); + ZFrame topicFrame = new ZFrame(topic); + if (this.wrapTopic) { + msg.wrap(topicFrame); + } + else { + msg.push(topicFrame); + } } } } diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java index c80c99c230c..0847f8b751e 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.test.util.TestUtils; +import org.springframework.integration.zeromq.ZeroMqHeaders; import org.springframework.messaging.support.GenericMessage; import static org.assertj.core.api.Assertions.assertThat; @@ -40,6 +41,7 @@ /** * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -88,7 +90,7 @@ void testMessageProducerForPair() { stepVerifier.verify(); - messageProducer.destroy(); + messageProducer.stop(); socket.close(); } @@ -142,7 +144,42 @@ void testMessageProducerForPubSubReceiveRaw() { stepVerifier.verify(Duration.ofSeconds(10)); - messageProducer.destroy(); + messageProducer.stop(); + socket.close(); + } + + @Test + void testMessageProducerForPubSubDisabledWrapTopic() { + String socketAddress = "inproc://messageProducerWrapTopic.test"; + ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB); + socket.bind(socketAddress); + + FluxMessageChannel outputChannel = new FluxMessageChannel(); + + StepVerifier stepVerifier = + StepVerifier.create(outputChannel) + .assertNext((message) -> assertThat(message.getHeaders()).containsEntry(ZeroMqHeaders.TOPIC, "testTopicWithNonWrappedTopic")) + .thenCancel() + .verifyLater(); + + ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(CONTEXT, SocketType.SUB); + messageProducer.setOutputChannel(outputChannel); + messageProducer.setTopics("test"); + messageProducer.setConnectUrl(socketAddress); + messageProducer.setBeanFactory(mock(BeanFactory.class)); + messageProducer.unwrapTopic(false); + messageProducer.afterPropertiesSet(); + messageProducer.start(); + + assertThat(socket.recv()).isNotNull(); + + ZMsg msg = ZMsg.newStringMsg("test"); + msg.push("testTopicWithNonWrappedTopic"); + msg.send(socket); + + stepVerifier.verify(); + + messageProducer.stop(); socket.close(); } diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java index 0d491ae487c..1431047c437 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java @@ -42,6 +42,7 @@ /** * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -150,4 +151,40 @@ void testMessageHandlerForPushPullOverProxy() { proxy.destroy(); } + @Test + void testMessageHandlerForPubSubDisabledWrapTopic() { + ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB); + subSocket.setReceiveTimeOut(0); + int port = subSocket.bindToRandomPort("tcp://*"); + subSocket.subscribe("test"); + + ZeroMqMessageHandler messageHandler = + new ZeroMqMessageHandler(CONTEXT, "tcp://localhost:" + port, SocketType.PUB); + messageHandler.setBeanFactory(mock(BeanFactory.class)); + messageHandler.setTopicExpression( + new FunctionExpression>((message) -> message.getHeaders().get("topic"))); + messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper()); + messageHandler.wrapTopic(false); + messageHandler.afterPropertiesSet(); + messageHandler.start(); + + Message testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build(); + + await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100)) + .untilAsserted(() -> { + subSocket.subscribe("test"); + messageHandler.handleMessage(testMessage).subscribe(); + ZMsg msg = ZMsg.recvMsg(subSocket); + assertThat(msg).isNotNull(); + assertThat(msg.pop().getString(ZMQ.CHARSET)).isEqualTo("testTopic"); + Message capturedMessage = + new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData()); + assertThat(capturedMessage).isEqualTo(testMessage); + msg.destroy(); + }); + + messageHandler.destroy(); + subSocket.close(); + } + } diff --git a/src/reference/antora/modules/ROOT/pages/zeromq.adoc b/src/reference/antora/modules/ROOT/pages/zeromq.adoc index be9140b1f75..a0b85efdfee 100644 --- a/src/reference/antora/modules/ROOT/pages/zeromq.adoc +++ b/src/reference/antora/modules/ROOT/pages/zeromq.adoc @@ -119,6 +119,9 @@ If the `receiveRaw` option is set to `true`, a `ZMsg`, consumed from the socket, Otherwise, an `InboundMessageMapper` is used to convert the consumed data into a `Message`. If the received `ZMsg` is multi-frame, the first frame is treated as the `ZeroMqHeaders.TOPIC` header this ZeroMQ message was published to. +If the `unwrapTopic` option is set to `false`, the incoming message is considered to consist of two frames: the topic and the ZeroMQ message. +Otherwise, by default, the `ZMsg` is considered to consist of three frames: the first one containing the topic, the last frame containing the message, with an empty frame in the middle. + With `SocketType.SUB`, the `ZeroMqMessageProducer` uses the provided `topics` option for subscriptions; defaults to subscribe to all. Subscriptions can be adjusted at runtime using `subscribeToTopics()` and `unsubscribeFromTopics()` `@ManagedOperation` s. @@ -146,6 +149,10 @@ Only `SocketType.PAIR`, `SocketType.PUSH` and `SocketType.PUB` are supported. The `ZeroMqMessageHandler` only supports connecting the ZeroMQ socket; binding is not supported. When the `SocketType.PUB` is used, the `topicExpression` is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null. The subscriber side (`SocketType.SUB`) must receive the topic frame first before parsing the actual data. + +If the `wrapTopic` option is set to `false`, the ZeroMQ message frame is sent after the injected topic, if present. +By default, an additional empty frame is sent between the topic and the message. + When the payload of the request message is a `ZMsg`, no conversion or topic extraction is performed: the `ZMsg` is sent into a socket as is and it is not destroyed for possible further reuse. Otherwise, an `OutboundMessageMapper` is used to convert a request message (or just its payload) into a ZeroMQ frame to publish. By default, a `ConvertingBytesMessageMapper` is used supplied with a `ConfigurableCompositeMessageConverter`.