Skip to content

Commit

Permalink
#2663 Adding a configuration 'retain' property for outgoing MQTT mess…
Browse files Browse the repository at this point in the history
…age.

Note: test fails due to retain property not being received while it _is_ being send.
  • Loading branch information
diversit authored and ozangunalp committed Jul 2, 2024
1 parent ce97c67 commit 3f1bb22
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
@ConnectorAttribute(name = "merge", direction = OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "buffer-size", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", type = "int", defaultValue = "128")
@ConnectorAttribute(name = "unsubscribe-on-disconnection", direction = INCOMING_AND_OUTGOING, description = "This flag restore the old behavior to unsubscribe from the broken on disconnection", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "retain", direction = OUTGOING, description = "Whether the published message should be retained", type = "boolean", defaultValue = "false")
public class MqttConnector implements InboundConnector, OutboundConnector, HealthReporter {

static final String CONNECTOR_NAME = "smallrye-mqtt";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class MqttSink {
private final String topic;
private final int qos;
private final boolean healthEnabled;
private final boolean retain;

private final Flow.Subscriber<? extends Message<?>> sink;

Expand All @@ -48,6 +49,7 @@ public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config,
topic = config.getTopic().orElse(channel);
qos = config.getQos();
healthEnabled = config.getHealthEnabled();
retain = config.getRetain();

sink = MultiUtils.via(m -> m.onSubscription()
.call(() -> {
Expand Down Expand Up @@ -89,7 +91,7 @@ private Uni<? extends Message<?>> send(Message<?> msg) {
isRetain = mm.isRetain();
} else {
actualTopicToBeUsed = this.topic;
isRetain = false;
isRetain = this.retain;
actualQoS = MqttQoS.valueOf(this.qos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import static org.hamcrest.core.Is.is;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jboss.weld.environment.se.Weld;
import org.jboss.weld.environment.se.WeldContainer;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -147,6 +150,42 @@ public void testABeanProducingNullPayloadsSentToMQTT() throws InterruptedExcepti

}

@Test
public void testSinkUsingRawWithRetain() throws InterruptedException {
String topic = UUID.randomUUID().toString();
CountDownLatch latch = new CountDownLatch(1);
List<MqttMessage> expected = new CopyOnWriteArrayList<>();
usage.consumeRaw(topic, 10, 10, TimeUnit.SECONDS,
latch::countDown,
(top, msg) -> expected.add(msg));

Map<String, Object> config = new HashMap<>();
config.put("channel-name", topic);
config.put("topic", topic);
config.put("host", address);
config.put("port", port);
config.put("retain", true);
MqttSink sink = new MqttSink(vertx, new MqttConnectorOutgoingConfiguration(new MapBasedConfig(config)), null);

Subscriber<? extends Message<?>> subscriber = sink.getSink();
Multi.createFrom().range(1_234, 1_244)
.map(Message::of)
.subscribe((Subscriber<? super Message<Integer>>) subscriber);

assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
await().untilAsserted(() -> assertThat(expected).hasSize(10)
.allSatisfy(m -> assertThat(m.isRetained()).isFalse()));

List<MqttMessage> expectedRetained = new CopyOnWriteArrayList<>();
usage.consumeRaw(topic, 10, 10, TimeUnit.SECONDS,
latch::countDown,
(top, msg) -> expectedRetained.add(msg));

await().pollDelay(2, TimeUnit.SECONDS).untilAsserted(() -> assertThat(expectedRetained).hasSize(1)
.allSatisfy(m -> assertThat(m.isRetained()).isTrue()));

}

private MapBasedConfig getConfig() {
String prefix = "mp.messaging.outgoing.sink.";
Map<String, Object> config = new HashMap<>();
Expand Down

0 comments on commit 3f1bb22

Please sign in to comment.