diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java index f00e0b5cfe..0677325780 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java @@ -64,6 +64,7 @@ @ConnectorAttribute(name = "merge", direction = OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "buffer-size", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", type = "int", defaultValue = "128") @ConnectorAttribute(name = "unsubscribe-on-disconnection", direction = INCOMING_AND_OUTGOING, description = "This flag restore the old behavior to unsubscribe from the broken on disconnection", type = "boolean", defaultValue = "false") +@ConnectorAttribute(name = "retain", direction = OUTGOING, description = "Whether the published message should be retained", type = "boolean", defaultValue = "false") public class MqttConnector implements InboundConnector, OutboundConnector, HealthReporter { static final String CONNECTOR_NAME = "smallrye-mqtt"; diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java index c6ae829528..e101b58b50 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java @@ -32,6 +32,7 @@ public class MqttSink { private final String topic; private final int qos; private final boolean healthEnabled; + private final boolean retain; private final Flow.Subscriber> sink; @@ -48,6 +49,7 @@ public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config, topic = config.getTopic().orElse(channel); qos = config.getQos(); healthEnabled = config.getHealthEnabled(); + retain = config.getRetain(); sink = MultiUtils.via(m -> m.onSubscription() .call(() -> { @@ -89,7 +91,7 @@ private Uni> send(Message msg) { isRetain = mm.isRetain(); } else { actualTopicToBeUsed = this.topic; - isRetain = false; + isRetain = this.retain; actualQoS = MqttQoS.valueOf(this.qos); } diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java index 46c72134e1..69d6f202d8 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java @@ -5,14 +5,17 @@ import static org.hamcrest.core.Is.is; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jboss.weld.environment.se.Weld; import org.jboss.weld.environment.se.WeldContainer; import org.junit.jupiter.api.AfterEach; @@ -147,6 +150,42 @@ public void testABeanProducingNullPayloadsSentToMQTT() throws InterruptedExcepti } + @Test + public void testSinkUsingRawWithRetain() throws InterruptedException { + String topic = UUID.randomUUID().toString(); + CountDownLatch latch = new CountDownLatch(1); + List expected = new CopyOnWriteArrayList<>(); + usage.consumeRaw(topic, 10, 10, TimeUnit.SECONDS, + latch::countDown, + (top, msg) -> expected.add(msg)); + + Map config = new HashMap<>(); + config.put("channel-name", topic); + config.put("topic", topic); + config.put("host", address); + config.put("port", port); + config.put("retain", true); + MqttSink sink = new MqttSink(vertx, new MqttConnectorOutgoingConfiguration(new MapBasedConfig(config)), null); + + Subscriber> subscriber = sink.getSink(); + Multi.createFrom().range(1_234, 1_244) + .map(Message::of) + .subscribe((Subscriber>) subscriber); + + assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); + await().untilAsserted(() -> assertThat(expected).hasSize(10) + .allSatisfy(m -> assertThat(m.isRetained()).isFalse())); + + List expectedRetained = new CopyOnWriteArrayList<>(); + usage.consumeRaw(topic, 10, 10, TimeUnit.SECONDS, + latch::countDown, + (top, msg) -> expectedRetained.add(msg)); + + await().pollDelay(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(expectedRetained).hasSize(1) + .allSatisfy(m -> assertThat(m.isRetained()).isTrue())); + + } + private MapBasedConfig getConfig() { String prefix = "mp.messaging.outgoing.sink."; Map config = new HashMap<>();