From e00027070921bb278b250418de61b2e3bbbd1956 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Fri, 31 May 2024 17:04:29 +0200 Subject: [PATCH 01/14] Update ZeroMqMessageHandler.java --- .../zeromq/outbound/ZeroMqMessageHandler.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index df6ac71d04a..868813aa9ef 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -88,6 +88,8 @@ public class ZeroMqMessageHandler extends AbstractReactiveMessageHandler private volatile Disposable socketMonoSubscriber; + private volatile boolean wrapTopic = true; + /** * Create an instance based on the provided {@link ZContext} and connection string. * @param context the {@link ZContext} to use for creating sockets. @@ -191,6 +193,15 @@ public void setTopicExpression(Expression topicExpression) { this.topicExpression = topicExpression; } + /** + * Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the + * subscriptions must be wrapped with an additional empty frame. + * @param wrapTopic true iff the topic must be wrapped with an additional empty frame. + */ + public void wrapTopic(boolean wrapTopic) { + this.wrapTopic = wrapTopic; + } + @Override public String getComponentType() { return "zeromq:outbound-channel-adapter"; @@ -244,7 +255,8 @@ protected Mono handleMessageInternal(Message message) { if (socket.base() instanceof Pub) { String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class); if (topic != null) { - msg.wrap(new ZFrame(topic)); + var frame = new ZFrame(topic); + wrapTopic ? msg.wrap(frame) : msg.push(frame); } } } From 3d6db04f21728264dd27015e5bc3f8aab56bbf30 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Fri, 31 May 2024 17:07:45 +0200 Subject: [PATCH 02/14] Update ZeroMqMessageHandler.java --- .../integration/zeromq/outbound/ZeroMqMessageHandler.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index 868813aa9ef..9138a2b9099 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -256,7 +256,11 @@ protected Mono handleMessageInternal(Message message) { String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class); if (topic != null) { var frame = new ZFrame(topic); - wrapTopic ? msg.wrap(frame) : msg.push(frame); + if(wrapTopic) { + msg.wrap(frame); + } else { + msg.push(frame); + } } } } From ac9d8ff72a3ce80e98c38e5b13a13a33664551c5 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Fri, 31 May 2024 17:27:04 +0200 Subject: [PATCH 03/14] code reformat --- .../integration/zeromq/outbound/ZeroMqMessageHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index 9138a2b9099..07cbd4eb229 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -256,7 +256,7 @@ protected Mono handleMessageInternal(Message message) { String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class); if (topic != null) { var frame = new ZFrame(topic); - if(wrapTopic) { + if (wrapTopic) { msg.wrap(frame); } else { msg.push(frame); From a93c5b3bcfd14b4b36384b2f455bab86130ca036 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Wed, 5 Jun 2024 19:40:40 +0200 Subject: [PATCH 04/14] add author, fix code style, add same logic also for ZeroMqMessageProducer --- .../zeromq/inbound/ZeroMqMessageProducer.java | 26 +++++++++++++++++-- .../zeromq/outbound/ZeroMqMessageHandler.java | 8 ++++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java index 0f0d3c02850..426a055ae4d 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import org.zeromq.SocketType; import org.zeromq.ZContext; +import org.zeromq.ZFrame; import org.zeromq.ZMQ; import org.zeromq.ZMsg; import reactor.core.publisher.Flux; @@ -54,6 +55,7 @@ * When the {@link SocketType#SUB} is used, the received topic is stored in the {@link ZeroMqHeaders#TOPIC}. * * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -90,6 +92,8 @@ public class ZeroMqMessageProducer extends MessageProducerSupport { private volatile Mono socketMono; + private volatile boolean wrapTopic = true; + public ZeroMqMessageProducer(ZContext context) { this(context, SocketType.PAIR); } @@ -189,6 +193,17 @@ public int getBoundPort() { return this.bindPort.get(); } + /** + * Specify if the topic + * that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame. + * It is ignored for all other {@link SocketType}s supported. + * @param wrapTopic true iff the received topic is wrapped with an additional empty frame. + * @since 6.2.6 + */ + public void wrapTopic(boolean wrapTopic) { + this.wrapTopic = wrapTopic; + } + @Override public String getComponentType() { return "zeromq:inbound-channel-adapter"; @@ -284,7 +299,14 @@ private Mono> convertMessage(Mono msgMono) { return msgMono.map((msg) -> { Map headers = null; if (msg.size() > 1) { - headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, msg.unwrap().getString(ZMQ.CHARSET)); + ZFrame frame; + if (this.wrapTopic) { + frame = msg.unwrap(); + } + else { + frame = msg.pop(); + } + headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, frame.getString(ZMQ.CHARSET)); } return this.messageMapper.toMessage(msg.getLast().getData(), headers); // NOSONAR }); diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index 07cbd4eb229..c7dd3cf06d8 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -60,6 +60,7 @@ * the {@link ZMsg} is sent into a socket as is and it is not destroyed for possible further reusing. * * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -196,7 +197,9 @@ public void setTopicExpression(Expression topicExpression) { /** * Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the * subscriptions must be wrapped with an additional empty frame. + * It is ignored for all other {@link SocketType}s supported. * @param wrapTopic true iff the topic must be wrapped with an additional empty frame. + * @since 6.2.6 */ public void wrapTopic(boolean wrapTopic) { this.wrapTopic = wrapTopic; @@ -256,9 +259,10 @@ protected Mono handleMessageInternal(Message message) { String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class); if (topic != null) { var frame = new ZFrame(topic); - if (wrapTopic) { + if (this.wrapTopic) { msg.wrap(frame); - } else { + } + else { msg.push(frame); } } From 1cd15dc2462efc6488c91dfbad4f2c4d16755ecf Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Wed, 5 Jun 2024 19:41:11 +0200 Subject: [PATCH 05/14] add wrapTopic function also in DSL specs --- .../zeromq/dsl/ZeroMqMessageHandlerSpec.java | 14 ++++++++++++++ .../zeromq/dsl/ZeroMqMessageProducerSpec.java | 16 +++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java index 24e14af7a77..3048865735e 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java @@ -36,6 +36,7 @@ * The {@link ReactiveMessageHandlerSpec} extension for {@link ZeroMqMessageHandler}. * * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -126,6 +127,19 @@ public ZeroMqMessageHandlerSpec topic(String topic) { return this; } + /** + * Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the + * subscriptions must be wrapped with an additional empty frame. + * It is ignored for all other {@link SocketType}s supported. + * @param wrapTopic true iff the topic must be wrapped with an additional empty frame. + * @return the spec + * @since 6.2.6 + */ + public ZeroMqMessageHandlerSpec wrapTopic(boolean wrapTopic) { + this.reactiveMessageHandler.wrapTopic(wrapTopic); + return this; + } + /** * Specify a {@link Function} to evaluate a topic a {@link SocketType#PUB} * is going to use for distributing messages into the diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java index 9d53fad05f9..f32807d9d2b 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ /** * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -108,6 +109,19 @@ public ZeroMqMessageProducerSpec topics(String... topics) { return this; } + /** + * Specify if the topic + * that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame. + * It is ignored for all other {@link SocketType}s supported. + * @param wrapTopic true iff the received topic is wrapped with an additional empty frame. + * @return the spec + * @since 6.2.6 + */ + public ZeroMqMessageProducerSpec wrapTopic(boolean wrapTopic) { + this.target.wrapTopic(wrapTopic); + return this; + } + /** * Configure an URL for {@link org.zeromq.ZMQ.Socket#connect(String)}. * @param connectUrl the URL to connect ZeroMq socket to. From 293d3c4e0e894ea2e18ee17d727dfb79c1afa2b9 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Wed, 5 Jun 2024 19:41:19 +0200 Subject: [PATCH 06/14] add tests --- .../inbound/ZeroMqMessageProducerTests.java | 45 ++++++++++++++++++- .../outbound/ZeroMqMessageHandlerTests.java | 37 +++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java index c80c99c230c..5dd611b2f04 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ /** * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -146,4 +147,46 @@ void testMessageProducerForPubSubReceiveRaw() { socket.close(); } + @Test + void testMessageProducerForPubSubDisabledWrapTopic() { + String socketAddress = "inproc://messageProducer.test"; + ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB); + socket.bind(socketAddress); + socket.setReceiveTimeOut(10_000); + + FluxMessageChannel outputChannel = new FluxMessageChannel(); + + StepVerifier stepVerifier = + StepVerifier.create(outputChannel) + .assertNext((message) -> + assertThat(message.getPayload()) + .asInstanceOf(InstanceOfAssertFactories.type(ZMsg.class)) + .extracting(ZMsg::pop) + .isEqualTo(new ZFrame("testTopicWithNonWrappedTopic"))) + .thenCancel() + .verifyLater(); + + ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(CONTEXT, SocketType.SUB); + messageProducer.setOutputChannel(outputChannel); + messageProducer.setTopics("test"); + messageProducer.setReceiveRaw(true); + messageProducer.setConnectUrl(socketAddress); + messageProducer.setConsumeDelay(Duration.ofMillis(10)); + messageProducer.setBeanFactory(mock(BeanFactory.class)); + messageProducer.wrapTopic(false); + messageProducer.afterPropertiesSet(); + messageProducer.start(); + + assertThat(socket.recv()).isNotNull(); + + ZMsg msg = ZMsg.newStringMsg("test"); + msg.push(new ZFrame("testTopicWithNonWrappedTopic")); + msg.send(socket); + + stepVerifier.verify(Duration.ofSeconds(10)); + + messageProducer.destroy(); + socket.close(); + } + } diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java index 0d491ae487c..1431047c437 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java @@ -42,6 +42,7 @@ /** * @author Artem Bilan + * @author Alessio Matricardi * * @since 5.4 */ @@ -150,4 +151,40 @@ void testMessageHandlerForPushPullOverProxy() { proxy.destroy(); } + @Test + void testMessageHandlerForPubSubDisabledWrapTopic() { + ZMQ.Socket subSocket = CONTEXT.createSocket(SocketType.SUB); + subSocket.setReceiveTimeOut(0); + int port = subSocket.bindToRandomPort("tcp://*"); + subSocket.subscribe("test"); + + ZeroMqMessageHandler messageHandler = + new ZeroMqMessageHandler(CONTEXT, "tcp://localhost:" + port, SocketType.PUB); + messageHandler.setBeanFactory(mock(BeanFactory.class)); + messageHandler.setTopicExpression( + new FunctionExpression>((message) -> message.getHeaders().get("topic"))); + messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper()); + messageHandler.wrapTopic(false); + messageHandler.afterPropertiesSet(); + messageHandler.start(); + + Message testMessage = MessageBuilder.withPayload("test").setHeader("topic", "testTopic").build(); + + await().atMost(Duration.ofSeconds(20)).pollDelay(Duration.ofMillis(100)) + .untilAsserted(() -> { + subSocket.subscribe("test"); + messageHandler.handleMessage(testMessage).subscribe(); + ZMsg msg = ZMsg.recvMsg(subSocket); + assertThat(msg).isNotNull(); + assertThat(msg.pop().getString(ZMQ.CHARSET)).isEqualTo("testTopic"); + Message capturedMessage = + new EmbeddedJsonHeadersMessageMapper().toMessage(msg.getFirst().getData()); + assertThat(capturedMessage).isEqualTo(testMessage); + msg.destroy(); + }); + + messageHandler.destroy(); + subSocket.close(); + } + } From 581198982da0a0f8c9ed468140d50572b58e2d13 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Thu, 6 Jun 2024 21:27:41 +0200 Subject: [PATCH 07/14] address review --- .../zeromq/dsl/ZeroMqMessageHandlerSpec.java | 3 ++- .../zeromq/dsl/ZeroMqMessageProducerSpec.java | 7 ++++--- .../zeromq/inbound/ZeroMqMessageProducer.java | 19 ++++++++++--------- .../zeromq/outbound/ZeroMqMessageHandler.java | 9 +++++---- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java index 3048865735e..b99fe26996c 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageHandlerSpec.java @@ -131,7 +131,8 @@ public ZeroMqMessageHandlerSpec topic(String topic) { * Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the * subscriptions must be wrapped with an additional empty frame. * It is ignored for all other {@link SocketType}s supported. - * @param wrapTopic true iff the topic must be wrapped with an additional empty frame. + * This attribute is set to {@code true} by default. + * @param wrapTopic true if the topic must be wrapped with an additional empty frame. * @return the spec * @since 6.2.6 */ diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java index f32807d9d2b..986bb4292eb 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/dsl/ZeroMqMessageProducerSpec.java @@ -113,12 +113,13 @@ public ZeroMqMessageProducerSpec topics(String... topics) { * Specify if the topic * that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame. * It is ignored for all other {@link SocketType}s supported. - * @param wrapTopic true iff the received topic is wrapped with an additional empty frame. + * This attribute is set to {@code true} by default. + * @param unwrapTopic true if the received topic is wrapped with an additional empty frame. * @return the spec * @since 6.2.6 */ - public ZeroMqMessageProducerSpec wrapTopic(boolean wrapTopic) { - this.target.wrapTopic(wrapTopic); + public ZeroMqMessageProducerSpec unwrapTopic(boolean unwrapTopic) { + this.target.unwrapTopic(unwrapTopic); return this; } diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java index 426a055ae4d..65d77460aa7 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java @@ -92,7 +92,7 @@ public class ZeroMqMessageProducer extends MessageProducerSupport { private volatile Mono socketMono; - private volatile boolean wrapTopic = true; + private volatile boolean unwrapTopic = true; public ZeroMqMessageProducer(ZContext context) { this(context, SocketType.PAIR); @@ -197,11 +197,12 @@ public int getBoundPort() { * Specify if the topic * that {@link SocketType#SUB} socket is going to receive is wrapped with an additional empty frame. * It is ignored for all other {@link SocketType}s supported. - * @param wrapTopic true iff the received topic is wrapped with an additional empty frame. + * This attribute is set to {@code true} by default. + * @param unwrapTopic true if the received topic is wrapped with an additional empty frame. * @since 6.2.6 */ - public void wrapTopic(boolean wrapTopic) { - this.wrapTopic = wrapTopic; + public void unwrapTopic(boolean unwrapTopic) { + this.unwrapTopic = unwrapTopic; } @Override @@ -299,14 +300,14 @@ private Mono> convertMessage(Mono msgMono) { return msgMono.map((msg) -> { Map headers = null; if (msg.size() > 1) { - ZFrame frame; - if (this.wrapTopic) { - frame = msg.unwrap(); + ZFrame topicFrame; + if (this.unwrapTopic) { + topicFrame = msg.unwrap(); } else { - frame = msg.pop(); + topicFrame = msg.pop(); } - headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, frame.getString(ZMQ.CHARSET)); + headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, topicFrame.getString(ZMQ.CHARSET)); } return this.messageMapper.toMessage(msg.getLast().getData(), headers); // NOSONAR }); diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java index c7dd3cf06d8..2a668148318 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java @@ -198,7 +198,8 @@ public void setTopicExpression(Expression topicExpression) { * Specify if the topic that {@link SocketType#PUB} socket is going to use for distributing messages into the * subscriptions must be wrapped with an additional empty frame. * It is ignored for all other {@link SocketType}s supported. - * @param wrapTopic true iff the topic must be wrapped with an additional empty frame. + * This attribute is set to {@code true} by default. + * @param wrapTopic true if the topic must be wrapped with an additional empty frame. * @since 6.2.6 */ public void wrapTopic(boolean wrapTopic) { @@ -258,12 +259,12 @@ protected Mono handleMessageInternal(Message message) { if (socket.base() instanceof Pub) { String topic = this.topicExpression.getValue(this.evaluationContext, message, String.class); if (topic != null) { - var frame = new ZFrame(topic); + ZFrame topicFrame = new ZFrame(topic); if (this.wrapTopic) { - msg.wrap(frame); + msg.wrap(topicFrame); } else { - msg.push(frame); + msg.push(topicFrame); } } } From 8aa6beb7228d162c283b29b6aa16db12629cb4a7 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Thu, 6 Jun 2024 21:30:35 +0200 Subject: [PATCH 08/14] fix tests --- .../integration/zeromq/inbound/ZeroMqMessageProducerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java index 5dd611b2f04..4a7d46dc4c6 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java @@ -173,7 +173,7 @@ void testMessageProducerForPubSubDisabledWrapTopic() { messageProducer.setConnectUrl(socketAddress); messageProducer.setConsumeDelay(Duration.ofMillis(10)); messageProducer.setBeanFactory(mock(BeanFactory.class)); - messageProducer.wrapTopic(false); + messageProducer.unwrapTopic(false); messageProducer.afterPropertiesSet(); messageProducer.start(); From e301177765b76eaa07868082bbed95a0fd2b19e5 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Thu, 6 Jun 2024 21:53:03 +0200 Subject: [PATCH 09/14] update docs --- src/reference/antora/modules/ROOT/pages/zeromq.adoc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/reference/antora/modules/ROOT/pages/zeromq.adoc b/src/reference/antora/modules/ROOT/pages/zeromq.adoc index be9140b1f75..c074f759eb5 100644 --- a/src/reference/antora/modules/ROOT/pages/zeromq.adoc +++ b/src/reference/antora/modules/ROOT/pages/zeromq.adoc @@ -119,6 +119,12 @@ If the `receiveRaw` option is set to `true`, a `ZMsg`, consumed from the socket, Otherwise, an `InboundMessageMapper` is used to convert the consumed data into a `Message`. If the received `ZMsg` is multi-frame, the first frame is treated as the `ZeroMqHeaders.TOPIC` header this ZeroMQ message was published to. +If the `unwrapTopic` option is set to `false`, +the incoming message is considered to consist of two frames: the topic and the ZeroMQ message. +Otherwise, by default, the `ZMsg` is considered to consist of three frames: +the first one containing the topic, the last frame containing the message, +with an empty frame in the middle. + With `SocketType.SUB`, the `ZeroMqMessageProducer` uses the provided `topics` option for subscriptions; defaults to subscribe to all. Subscriptions can be adjusted at runtime using `subscribeToTopics()` and `unsubscribeFromTopics()` `@ManagedOperation` s. @@ -146,6 +152,11 @@ Only `SocketType.PAIR`, `SocketType.PUSH` and `SocketType.PUB` are supported. The `ZeroMqMessageHandler` only supports connecting the ZeroMQ socket; binding is not supported. When the `SocketType.PUB` is used, the `topicExpression` is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null. The subscriber side (`SocketType.SUB`) must receive the topic frame first before parsing the actual data. + +If the `wrapTopic` option is set to `false`, +the ZeroMQ message frame is sent after the injected topic, if present. +By default, an additional empty frame is sent between the topic and the message. + When the payload of the request message is a `ZMsg`, no conversion or topic extraction is performed: the `ZMsg` is sent into a socket as is and it is not destroyed for possible further reuse. Otherwise, an `OutboundMessageMapper` is used to convert a request message (or just its payload) into a ZeroMQ frame to publish. By default, a `ConvertingBytesMessageMapper` is used supplied with a `ConfigurableCompositeMessageConverter`. From 1c2a1cff97fd635ec88f49db58bc8b1452ff8b3f Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Thu, 6 Jun 2024 21:57:58 +0200 Subject: [PATCH 10/14] use ternary operator --- .../integration/zeromq/inbound/ZeroMqMessageProducer.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java index 65d77460aa7..d2c484c3e19 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java @@ -300,13 +300,7 @@ private Mono> convertMessage(Mono msgMono) { return msgMono.map((msg) -> { Map headers = null; if (msg.size() > 1) { - ZFrame topicFrame; - if (this.unwrapTopic) { - topicFrame = msg.unwrap(); - } - else { - topicFrame = msg.pop(); - } + ZFrame topicFrame = this.unwrapTopic ? msg.unwrap() : msg.pop(); headers = Collections.singletonMap(ZeroMqHeaders.TOPIC, topicFrame.getString(ZMQ.CHARSET)); } return this.messageMapper.toMessage(msg.getLast().getData(), headers); // NOSONAR From b1717d596f2942795e085310a8cc60bbe81b8320 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Thu, 6 Jun 2024 22:08:24 +0200 Subject: [PATCH 11/14] ensure docs follow guidelines --- src/reference/antora/modules/ROOT/pages/zeromq.adoc | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/reference/antora/modules/ROOT/pages/zeromq.adoc b/src/reference/antora/modules/ROOT/pages/zeromq.adoc index c074f759eb5..a0b85efdfee 100644 --- a/src/reference/antora/modules/ROOT/pages/zeromq.adoc +++ b/src/reference/antora/modules/ROOT/pages/zeromq.adoc @@ -119,11 +119,8 @@ If the `receiveRaw` option is set to `true`, a `ZMsg`, consumed from the socket, Otherwise, an `InboundMessageMapper` is used to convert the consumed data into a `Message`. If the received `ZMsg` is multi-frame, the first frame is treated as the `ZeroMqHeaders.TOPIC` header this ZeroMQ message was published to. -If the `unwrapTopic` option is set to `false`, -the incoming message is considered to consist of two frames: the topic and the ZeroMQ message. -Otherwise, by default, the `ZMsg` is considered to consist of three frames: -the first one containing the topic, the last frame containing the message, -with an empty frame in the middle. +If the `unwrapTopic` option is set to `false`, the incoming message is considered to consist of two frames: the topic and the ZeroMQ message. +Otherwise, by default, the `ZMsg` is considered to consist of three frames: the first one containing the topic, the last frame containing the message, with an empty frame in the middle. With `SocketType.SUB`, the `ZeroMqMessageProducer` uses the provided `topics` option for subscriptions; defaults to subscribe to all. Subscriptions can be adjusted at runtime using `subscribeToTopics()` and `unsubscribeFromTopics()` `@ManagedOperation` s. @@ -153,8 +150,7 @@ The `ZeroMqMessageHandler` only supports connecting the ZeroMQ socket; binding i When the `SocketType.PUB` is used, the `topicExpression` is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null. The subscriber side (`SocketType.SUB`) must receive the topic frame first before parsing the actual data. -If the `wrapTopic` option is set to `false`, -the ZeroMQ message frame is sent after the injected topic, if present. +If the `wrapTopic` option is set to `false`, the ZeroMQ message frame is sent after the injected topic, if present. By default, an additional empty frame is sent between the topic and the message. When the payload of the request message is a `ZMsg`, no conversion or topic extraction is performed: the `ZMsg` is sent into a socket as is and it is not destroyed for possible further reuse. From e2e977d31e1ac9cf8bc67e8b3b392bdb2b8b4998 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Fri, 7 Jun 2024 00:53:00 +0200 Subject: [PATCH 12/14] fix wrap topic test: duplicate socket address caused binding exception --- .../integration/zeromq/inbound/ZeroMqMessageProducerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java index 4a7d46dc4c6..86031a84856 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java @@ -149,7 +149,7 @@ void testMessageProducerForPubSubReceiveRaw() { @Test void testMessageProducerForPubSubDisabledWrapTopic() { - String socketAddress = "inproc://messageProducer.test"; + String socketAddress = "inproc://messageProducerWrapTopic.test"; ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB); socket.bind(socketAddress); socket.setReceiveTimeOut(10_000); From 897271b477273d5690e694fc67faf928d5e2d367 Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Fri, 7 Jun 2024 01:17:50 +0200 Subject: [PATCH 13/14] rewrite the MessageProducer wrapTopic test --- .../zeromq/inbound/ZeroMqMessageProducerTests.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java index 86031a84856..58719c74c1f 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java @@ -32,6 +32,7 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.test.util.TestUtils; +import org.springframework.integration.zeromq.ZeroMqHeaders; import org.springframework.messaging.support.GenericMessage; import static org.assertj.core.api.Assertions.assertThat; @@ -158,18 +159,13 @@ void testMessageProducerForPubSubDisabledWrapTopic() { StepVerifier stepVerifier = StepVerifier.create(outputChannel) - .assertNext((message) -> - assertThat(message.getPayload()) - .asInstanceOf(InstanceOfAssertFactories.type(ZMsg.class)) - .extracting(ZMsg::pop) - .isEqualTo(new ZFrame("testTopicWithNonWrappedTopic"))) + .assertNext((message) -> assertThat(message.getHeaders()).containsEntry(ZeroMqHeaders.TOPIC, "testTopicWithNonWrappedTopic")) .thenCancel() .verifyLater(); ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(CONTEXT, SocketType.SUB); messageProducer.setOutputChannel(outputChannel); messageProducer.setTopics("test"); - messageProducer.setReceiveRaw(true); messageProducer.setConnectUrl(socketAddress); messageProducer.setConsumeDelay(Duration.ofMillis(10)); messageProducer.setBeanFactory(mock(BeanFactory.class)); @@ -180,7 +176,7 @@ void testMessageProducerForPubSubDisabledWrapTopic() { assertThat(socket.recv()).isNotNull(); ZMsg msg = ZMsg.newStringMsg("test"); - msg.push(new ZFrame("testTopicWithNonWrappedTopic")); + msg.push("testTopicWithNonWrappedTopic"); msg.send(socket); stepVerifier.verify(Duration.ofSeconds(10)); From 9501031f7a69288606909dfd513685998798eded Mon Sep 17 00:00:00 2001 From: Alessio Matricardi Date: Sun, 9 Jun 2024 19:21:15 +0200 Subject: [PATCH 14/14] call stop instead of destroy method --- .../zeromq/inbound/ZeroMqMessageProducerTests.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java index 58719c74c1f..0847f8b751e 100644 --- a/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java +++ b/spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java @@ -90,7 +90,7 @@ void testMessageProducerForPair() { stepVerifier.verify(); - messageProducer.destroy(); + messageProducer.stop(); socket.close(); } @@ -144,7 +144,7 @@ void testMessageProducerForPubSubReceiveRaw() { stepVerifier.verify(Duration.ofSeconds(10)); - messageProducer.destroy(); + messageProducer.stop(); socket.close(); } @@ -153,7 +153,6 @@ void testMessageProducerForPubSubDisabledWrapTopic() { String socketAddress = "inproc://messageProducerWrapTopic.test"; ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB); socket.bind(socketAddress); - socket.setReceiveTimeOut(10_000); FluxMessageChannel outputChannel = new FluxMessageChannel(); @@ -167,7 +166,6 @@ void testMessageProducerForPubSubDisabledWrapTopic() { messageProducer.setOutputChannel(outputChannel); messageProducer.setTopics("test"); messageProducer.setConnectUrl(socketAddress); - messageProducer.setConsumeDelay(Duration.ofMillis(10)); messageProducer.setBeanFactory(mock(BeanFactory.class)); messageProducer.unwrapTopic(false); messageProducer.afterPropertiesSet(); @@ -179,9 +177,9 @@ void testMessageProducerForPubSubDisabledWrapTopic() { msg.push("testTopicWithNonWrappedTopic"); msg.send(socket); - stepVerifier.verify(Duration.ofSeconds(10)); + stepVerifier.verify(); - messageProducer.destroy(); + messageProducer.stop(); socket.close(); }