From 7dbd3551bc28595876eac1a5eb5145efdf29f9a4 Mon Sep 17 00:00:00 2001 From: Domenico Briganti Date: Wed, 18 Jan 2023 17:52:13 +0100 Subject: [PATCH 1/6] HealtCheck on MQTT Connector --- .../reactive/messaging/mqtt/Clients.java | 3 + .../messaging/mqtt/MqttConnector.java | 43 +++-- .../reactive/messaging/mqtt/MqttSink.java | 150 ++++++++++++++---- .../reactive/messaging/mqtt/MqttSource.java | 43 +++-- .../messaging/mqtt/ConnectionSharingTest.java | 6 +- .../mqtt/DynamicMqttTopicSourceTest.java | 10 +- .../messaging/mqtt/FailureHandlerTest.java | 4 +- .../messaging/mqtt/HealthCheckTest.java | 134 ++++++++++++++++ .../mqtt/LocalPropagationAckTest.java | 2 +- .../messaging/mqtt/LocalPropagationTest.java | 2 +- .../messaging/mqtt/MqttCustomOptionTest.java | 2 +- .../reactive/messaging/mqtt/MqttSinkTest.java | 10 +- .../messaging/mqtt/MqttSourceTest.java | 14 +- .../reactive/messaging/mqtt/MqttTestBase.java | 21 +++ .../mqtt/MultipleTopicsConsumptionTest.java | 10 +- .../mqtt/MutualTlsMqttSourceTest.java | 2 +- .../messaging/mqtt/MutualTlsMqttTestBase.java | 10 ++ .../messaging/mqtt/SecureMqttSourceTest.java | 2 +- .../messaging/mqtt/SecureMqttTestBase.java | 10 ++ .../messaging/mqtt/TlsMqttSourceTest.java | 2 +- .../messaging/mqtt/TlsMqttTestBase.java | 10 ++ 21 files changed, 409 insertions(+), 81 deletions(-) create mode 100644 smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/HealthCheckTest.java diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java index 363bd13ae1..09470b23e4 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java @@ -1,5 +1,7 @@ package io.smallrye.reactive.messaging.mqtt; +import static io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging.log; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,6 +36,7 @@ static ClientHolder getHolder(Vertx vertx, MqttClientSessionOptions options) { + "<" + (server == null ? "" : server) + ">-[" + (clientId != null ? clientId : "") + "]"; return clients.computeIfAbsent(id, key -> { + log.infof("Create MQTT Client for %s.", id); MqttClientSession client = MqttClientSession.create(vertx.getDelegate(), options); return new ClientHolder(client); }); diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java index bf2a1a6542..55f8923e31 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java @@ -22,6 +22,9 @@ import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; +import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.vertx.mutiny.core.Vertx; @@ -31,6 +34,7 @@ @ConnectorAttribute(name = "client-id", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the client identifier") @ConnectorAttribute(name = "auto-generated-client-id", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set if the MQTT client must generate clientId automatically", defaultValue = "true") @ConnectorAttribute(name = "auto-keep-alive", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set if the MQTT client must handle `PINGREQ` automatically", defaultValue = "true") +@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true") @ConnectorAttribute(name = "ssl", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set whether SSL/TLS is enabled", defaultValue = "false") @ConnectorAttribute(name = "ssl.keystore.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the keystore type [`pkcs12`, `jks`, `pem`]", defaultValue = "pkcs12") @ConnectorAttribute(name = "ssl.keystore.location", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the keystore location. In case of `pem` type this is the server ca cert path") @@ -61,12 +65,12 @@ @ConnectorAttribute(name = "merge", direction = OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "buffer-size", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", type = "int", defaultValue = "128") @ConnectorAttribute(name = "unsubscribe-on-disconnection", direction = INCOMING_AND_OUTGOING, description = "This flag restore the old behavior to unsubscribe from the broken on disconnection", type = "boolean", defaultValue = "false") -public class MqttConnector implements IncomingConnectorFactory, OutgoingConnectorFactory { +public class MqttConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, HealthReporter { static final String CONNECTOR_NAME = "smallrye-mqtt"; @Inject - private ExecutionHolder executionHolder; + ExecutionHolder executionHolder; @Inject @Any @@ -95,22 +99,41 @@ public SubscriberBuilder, Void> getSubscriberBuilder(Config return sink.getSink(); } - public boolean isReady() { - boolean ready = isSourceReady(); + @Override + public HealthReport getStartup() { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + for (MqttSource source : sources) { + source.isStarted(builder); + } + for (MqttSink sink : sinks) { + sink.isStarted(builder); + } + return builder.build(); + } + @Override + public HealthReport getReadiness() { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + for (MqttSource source : sources) { + source.isReady(builder); + } for (MqttSink sink : sinks) { - ready = ready && sink.isReady(); + sink.isReady(builder); } + return builder.build(); - return ready; } - public boolean isSourceReady() { - boolean ready = true; + @Override + public HealthReport getLiveness() { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); for (MqttSource source : sources) { - ready = ready && source.isReady(); + source.isAlive(builder); + } + for (MqttSink sink : sinks) { + sink.isAlive(builder); } - return ready; + return builder.build(); } public void destroy(@Observes @Destroyed(ApplicationScoped.class) final Object context) { diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java index 429ffe2a16..22f8f04533 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java @@ -13,13 +13,17 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; +import org.reactivestreams.Processor; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import io.netty.handler.codec.mqtt.MqttQoS; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.vertx.AsyncResultUni; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; +import io.smallrye.reactive.messaging.health.HealthReport.HealthReportBuilder; +import io.smallrye.reactive.messaging.mqtt.Clients.ClientHolder; import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers; -import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession; import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; @@ -29,46 +33,118 @@ public class MqttSink { + private final String channel; private final String topic; private final int qos; + private final boolean healthEnabled; private final SubscriberBuilder, Void> sink; + + private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean ready = new AtomicBoolean(); + private final AtomicReference reference = new AtomicReference<>(); public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config, Instance instances) { + MqttClientSessionOptions options = MqttHelpers.createClientOptions(config, instances); - topic = config.getTopic().orElseGet(config::getChannel); + + channel = config.getChannel(); + topic = config.getTopic().orElse(channel); qos = config.getQos(); + healthEnabled = config.getHealthEnabled(); - AtomicReference reference = new AtomicReference<>(); sink = ReactiveStreams.> builder() - .flatMapCompletionStage(msg -> { - Clients.ClientHolder client = reference.get(); - if (client == null) { - client = Clients.getHolder(vertx, options); - reference.set(client); - } - - return client.start() - .onSuccess(ignore -> ready.set(true)) - .map(ignore -> msg) - .toCompletionStage(); - }) - .flatMapCompletionStage(msg -> send(reference, msg)) - .onComplete(() -> { - Clients.ClientHolder c = reference.getAndSet(null); - if (c != null) { - c.close() - .onComplete(ignore -> ready.set(false)); - } - }) + .via(new ConnectOnSubscribeProcessor(vertx, options)) + .flatMapCompletionStage(msg -> send(msg)) .onError(log::errorWhileSendingMessageToBroker) .ignore(); + } - private CompletionStage send(AtomicReference reference, Message msg) { - MqttClientSession client = reference.get().getClient(); + /* + * This processor let che client mqtt to connect on su + */ + private class ConnectOnSubscribeProcessor implements Processor, Message> { + + private Vertx vertx; + private MqttClientSessionOptions options; + private Subscriber> subscriber; + private Subscription subscription; + private long requestedItem; + private Object lock = new Object(); + + public ConnectOnSubscribeProcessor(Vertx vertx, MqttClientSessionOptions options) { + this.vertx = vertx; + this.options = options; + } + + @Override + public void subscribe(Subscriber> subscriber) { + System.out.println("subscribe"); + this.subscriber = subscriber; + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + if (ready.get()) { + subscription.request(n); + } else { + synchronized (lock) { + if (ready.get()) { + subscription.request(n); + } else { + requestedItem = n; + } + } + } + } + + @Override + public void cancel() { + subscription.cancel(); + } + }); + } + + @Override + public void onSubscribe(Subscription subscription) { + System.out.println("onSubscribe"); + this.subscription = subscription; + ClientHolder client = Clients.getHolder(vertx, options); + reference.set(client); + client.start().onSuccess(ignore -> { + started.set(true); + synchronized (lock) { + ready.set(true); + if (requestedItem > 0) + subscription.request(requestedItem); + } + }).toCompletionStage(); + } + + @Override + public void onNext(Message t) { + subscriber.onNext(t); + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + Clients.ClientHolder c = reference.getAndSet(null); + if (c != null) { + c.close() + .onComplete(ignore -> ready.set(false)); + } + } + }; + + private CompletionStage send(Message msg) { + final String actualTopicToBeUsed; final MqttQoS actualQoS; final boolean isRetain; @@ -91,9 +167,11 @@ private CompletionStage send(AtomicReference reference, } return AsyncResultUni - . toUni(h -> client - .publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain) - .onComplete(h)) + . toUni(h -> { + reference.get().getClient() + .publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain) + .onComplete(h); + }) .onItemOrFailure().transformToUni((s, f) -> { if (f != null) { return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg)); @@ -132,7 +210,19 @@ public SubscriberBuilder, Void> getSink() { return sink; } - public boolean isReady() { - return ready.get(); + public void isStarted(HealthReportBuilder builder) { + if (healthEnabled) + builder.add(channel, started.get()); + } + + public void isReady(HealthReportBuilder builder) { + if (healthEnabled) + builder.add(channel, ready.get()); } + + public void isAlive(HealthReportBuilder builder) { + if (healthEnabled) + builder.add(channel, reference == null ? false : reference.get().getClient().isConnected()); + } + } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java index bced78bd87..d0e4d8fe35 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java @@ -12,6 +12,7 @@ import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.health.HealthReport.HealthReportBuilder; import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers; import io.smallrye.reactive.messaging.mqtt.internal.MqttTopicHelper; import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; @@ -20,17 +21,26 @@ public class MqttSource { + private final String channel; + private final Pattern pattern; + private final boolean healthEnabled; + private final PublisherBuilder> source; + + private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean ready = new AtomicBoolean(); - private final Pattern pattern; + private final Clients.ClientHolder holder; public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, Instance instances) { MqttClientSessionOptions options = MqttHelpers.createClientOptions(config, instances); - String topic = config.getTopic().orElseGet(config::getChannel); + channel = config.getChannel(); + String topic = config.getTopic().orElse(channel); int qos = config.getQos(); boolean broadcast = config.getBroadcast(); + healthEnabled = config.getHealthEnabled(); + MqttFailureHandler.Strategy strategy = MqttFailureHandler.Strategy.from(config.getFailureStrategy()); MqttFailureHandler onNack = createFailureHandler(strategy, config.getChannel()); @@ -43,21 +53,23 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, pattern = null; } - Clients.ClientHolder holder = Clients.getHolder(vertx, options); - holder.start(); + holder = Clients.getHolder(vertx, options); + holder.start().onSuccess(ignore -> started.set(true)); holder.getClient() .subscribe(topic, RequestedQoS.valueOf(qos)) - .onComplete(outcome -> log.info("Subscription outcome: " + outcome)) - .onSuccess(ignore -> ready.set(true)); + .onFailure(outcome -> log.info("Subscription failed!")) + .onSuccess(outcome -> { + log.info("Subscription success on topic " + topic + ", Max QoS " + outcome + "."); + ready.set(true); + }); this.source = ReactiveStreams.fromPublisher( holder.stream() .select().where(m -> MqttTopicHelper.matches(topic, pattern, m)) .onItem().transform(m -> new ReceivingMqttMessage(m, onNack)) .stage(multi -> { - if (broadcast) { + if (broadcast) return multi.broadcast().toAllSubscribers(); - } return multi; }) .onOverflow().buffer(config.getBufferSize()) @@ -89,8 +101,19 @@ PublisherBuilder> getSource() { return source; } - public boolean isReady() { - return ready.get(); + public void isStarted(HealthReportBuilder builder) { + if (healthEnabled) + builder.add(channel, started.get()); + } + + public void isReady(HealthReportBuilder builder) { + if (healthEnabled) + builder.add(channel, ready.get()); + } + + public void isAlive(HealthReportBuilder builder) { + if (healthEnabled) + builder.add(channel, holder.getClient().isConnected()); } } diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/ConnectionSharingTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/ConnectionSharingTest.java index e4e67dbd6c..f7289d122d 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/ConnectionSharingTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/ConnectionSharingTest.java @@ -41,11 +41,9 @@ public void testWithClientId() { container = weld.initialize(); App bean = container.getBeanManager().createInstance().select(App.class).get(); + MqttConnector mqttConnector = this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get(); - await() - .until( - () -> this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get().isReady()); - + await().until(() -> mqttConnector.getReadiness().isOk()); await().atMost(2, TimeUnit.MINUTES).until(() -> bean.prices().size() >= 10); assertThat(bean.prices()).isNotEmpty(); } diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/DynamicMqttTopicSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/DynamicMqttTopicSourceTest.java index 0c7806b3a2..9a21f7340e 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/DynamicMqttTopicSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/DynamicMqttTopicSourceTest.java @@ -42,11 +42,11 @@ public void cleanup() { private void awaitAndVerify() { DynamicTopicApp bean = container.getBeanManager().createInstance().select(DynamicTopicApp.class).get(); MqttConnector connector = this.container - .select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get(); + .select(MqttConnector.class, ConnectorLiteral.of(MqttConnector.CONNECTOR_NAME)).get(); await() .pollInterval(Duration.ofSeconds(1)) - .until(connector::isSourceReady); + .until(() -> connector.getReadiness().isOk()); bean.publish(); @@ -112,7 +112,9 @@ public void testWithTwoPlusAndDashInExpression() { public void testWithSpecialWord() { Weld weld = baseWeld(getConfig("$/app/#")); weld.addBeanClass(DynamicTopicApp.class); + container = weld.initialize(); + awaitAndVerify(); } @@ -128,7 +130,7 @@ private MapBasedConfig getConfig(String pattern) { config.put(prefix + "password", System.getProperty("mqtt-pwd")); } - prefix = "mp.messaging.incoming.mqtt."; + prefix = "mp.messaging.incoming.in."; config.put(prefix + "topic", pattern); config.put(prefix + "connector", MqttConnector.CONNECTOR_NAME); config.put(prefix + "host", System.getProperty("mqtt-host")); @@ -170,7 +172,7 @@ public void publish() { MqttQoS.EXACTLY_ONCE)); } - @Incoming("mqtt") + @Incoming("in") public CompletionStage received(MqttMessage message) { messages.add(message); return message.ack(); diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/FailureHandlerTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/FailureHandlerTest.java index 2d38c276c5..99d3f5c21d 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/FailureHandlerTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/FailureHandlerTest.java @@ -52,7 +52,7 @@ public void testFailStrategy() { MqttConnector connector = container.getBeanManager().createInstance().select(MqttConnector.class, ConnectorLiteral.of(MqttConnector.CONNECTOR_NAME)).get(); - await().until(connector::isReady); + await().until(() -> connector.getReadiness().isOk()); usage.produceStrings("fail", 10, null, () -> Integer.toString(counter.getAndIncrement())); @@ -69,7 +69,7 @@ public void testIgnoreStrategy() { MqttConnector connector = container.getBeanManager().createInstance().select(MqttConnector.class, ConnectorLiteral.of(MqttConnector.CONNECTOR_NAME)).get(); - await().until(connector::isReady); + await().until(() -> connector.getReadiness().isOk()); usage.produceStrings("ignore", 10, null, () -> Integer.toString(counter.getAndIncrement())); diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/HealthCheckTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/HealthCheckTest.java new file mode 100644 index 0000000000..df4a2c1939 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/HealthCheckTest.java @@ -0,0 +1,134 @@ +package io.smallrye.reactive.messaging.mqtt; + +import static io.smallrye.reactive.messaging.mqtt.MqttConnector.CONNECTOR_NAME; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; +import org.jboss.weld.environment.se.Weld; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.providers.extension.HealthCenter; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class HealthCheckTest extends MqttTestBase { + + private WeldContainer container; + + @AfterEach + public void cleanup() { + if (container != null) { + container.close(); + } + } + + private void awaitAndVerify() { + + container.getBeanManager().createInstance().select(JustToStartConnector.class).get(); + MqttConnector connector = this.container.select(MqttConnector.class, ConnectorLiteral.of(CONNECTOR_NAME)).get(); + + await() + .pollInterval(Duration.ofSeconds(1)) + .until(() -> connector.getReadiness().isOk()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + + assertThat(liveness.getChannels().size()).isEqualTo(2); + + mosquitto.stop(); + + await() + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !connector.getLiveness().isOk()); + + liveness = getHealth().getLiveness(); + assertThat(liveness.isOk()).isFalse(); + + } + + @Test + public void testWithDash() { + Weld weld = baseWeld(getConfig()); + weld.addBeanClass(JustToStartConnector.class); + + container = weld.initialize(); + + awaitAndVerify(); + } + + private MapBasedConfig getConfig() { + String prefix = "mp.messaging.outgoing.out."; + Map config = new HashMap<>(); + config.put(prefix + "connector", CONNECTOR_NAME); + config.put(prefix + "host", System.getProperty("mqtt-host")); + config.put(prefix + "port", Integer.valueOf(System.getProperty("mqtt-port"))); + if (System.getProperty("mqtt-user") != null) { + config.put(prefix + "username", System.getProperty("mqtt-user")); + config.put(prefix + "password", System.getProperty("mqtt-pwd")); + } + + prefix = "mp.messaging.incoming.in."; + config.put(prefix + "connector", CONNECTOR_NAME); + config.put(prefix + "host", System.getProperty("mqtt-host")); + config.put(prefix + "port", Integer.valueOf(System.getProperty("mqtt-port"))); + if (System.getProperty("mqtt-user") != null) { + config.put(prefix + "username", System.getProperty("mqtt-user")); + config.put(prefix + "password", System.getProperty("mqtt-pwd")); + } + return new MapBasedConfig(config); + } + + public HealthCenter getHealth() { + if (container == null) { + throw new IllegalStateException("Application not started"); + } + return container.getBeanManager().createInstance().select(HealthCenter.class).get(); + } + + public boolean isStarted() { + return getHealth().getStartup().isOk(); + } + + public boolean isReady() { + return getHealth().getReadiness().isOk(); + } + + public boolean isAlive() { + return getHealth().getLiveness().isOk(); + } + + @ApplicationScoped + public static class JustToStartConnector { + + @Inject + @Channel("out") + Emitter emitter; + + @Incoming("in") + public CompletionStage received(MqttMessage message) { + return message.ack(); + } + + } + +} diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/LocalPropagationAckTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/LocalPropagationAckTest.java index 5629d4d831..a50638d826 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/LocalPropagationAckTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/LocalPropagationAckTest.java @@ -70,7 +70,7 @@ private T runApplication(MapBasedConfig config, Class beanClass) { public void waitUntilReady(WeldContainer container) { MqttConnector connector = container.select(MqttConnector.class, ConnectorLiteral.of(MqttConnector.CONNECTOR_NAME)).get(); - await().until(connector::isReady); + await().until(() -> connector.getReadiness().isOk()); } @BeforeEach diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/LocalPropagationTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/LocalPropagationTest.java index cd0dc3c10b..d9a4646982 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/LocalPropagationTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/LocalPropagationTest.java @@ -80,7 +80,7 @@ private T runApplication(MapBasedConfig config, Class beanClass) { public void waitUntilReady(WeldContainer container) { MqttConnector connector = container.select(MqttConnector.class, ConnectorLiteral.of(MqttConnector.CONNECTOR_NAME)).get(); - await().until(connector::isReady); + await().until(() -> connector.getReadiness().isOk()); } @BeforeEach diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttCustomOptionTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttCustomOptionTest.java index c041d45409..effcd2b816 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttCustomOptionTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttCustomOptionTest.java @@ -48,7 +48,7 @@ public void testWithChannelPort() { MqttConnector connector = container.getBeanManager().createInstance().select(MqttConnector.class, ConnectorLiteral.of(MqttConnector.CONNECTOR_NAME)).get(); - await().pollDelay(Duration.ofSeconds(1)).until(() -> !connector.isReady()); + await().pollDelay(Duration.ofSeconds(1)).until(() -> !connector.getReadiness().isOk()); } } diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java index a5288ed23d..54f27c405e 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java @@ -52,6 +52,7 @@ public void testSinkUsingInteger() throws InterruptedException { MqttSink sink = new MqttSink(vertx, new MqttConnectorOutgoingConfiguration(new MapBasedConfig(config)), null); Subscriber> subscriber = sink.getSink().build(); + Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -63,7 +64,7 @@ public void testSinkUsingInteger() throws InterruptedException { @SuppressWarnings("unchecked") @Test - public void testSinkUsingChannelName() throws InterruptedException { + public void testSinkUsingIntegerAndChannelNameAsTopic() throws InterruptedException { String topic = UUID.randomUUID().toString(); CountDownLatch latch = new CountDownLatch(1); AtomicInteger expected = new AtomicInteger(0); @@ -116,8 +117,9 @@ public void testSinkUsingString() throws InterruptedException { } @RepeatedTest(5) + @Test public void testABeanProducingMessagesSentToMQTT() throws InterruptedException { - Clients.clear(); + Weld weld = baseWeld(getConfig()); weld.addBeanClass(ProducingBean.class); @@ -127,7 +129,9 @@ public void testABeanProducingMessagesSentToMQTT() throws InterruptedException { v -> latch.countDown()); container = weld.initialize(); - assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); + + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } private MapBasedConfig getConfig() { diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java index b14c9a307b..f220cacf3b 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java @@ -44,7 +44,8 @@ public void testSource() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isReady); + awaitUntilReady(source); + AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, counter::getAndIncrement)).start(); @@ -71,7 +72,7 @@ public void testSourceUsingChannelName() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isReady); + awaitUntilReady(source); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, counter::getAndIncrement)).start(); @@ -104,7 +105,7 @@ public void testBroadcast() { stream.forEach(messages1::add).run(); stream.forEach(messages2::add).run(); - await().until(source::isReady); + awaitUntilReady(source); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, @@ -146,7 +147,7 @@ public void testWithVeryLargeMessage() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isReady); + awaitUntilReady(source); new Thread(() -> usage.produce(topic, 10, null, () -> large)).start(); @@ -176,8 +177,9 @@ static MapBasedConfig getConfig() { public void testABeanConsumingTheMQTTMessages() { ConsumptionBean bean = deploy(); - await() - .until(() -> container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get().isReady()); + MqttConnector mqttConnector = this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get(); + + await().until(() -> mqttConnector.getReadiness().isOk()); List list = bean.getResults(); assertThat(list).isEmpty(); diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java index 6b852bf613..6661ff56e2 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java @@ -1,5 +1,7 @@ package io.smallrye.reactive.messaging.mqtt; +import static org.awaitility.Awaitility.await; + import java.io.File; import java.time.Duration; @@ -15,11 +17,13 @@ import org.testcontainers.utility.MountableFile; import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.providers.MediatorFactory; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; import io.smallrye.reactive.messaging.providers.extension.EmitterFactoryImpl; import io.smallrye.reactive.messaging.providers.extension.EmitterImpl; +import io.smallrye.reactive.messaging.providers.extension.HealthCenter; import io.smallrye.reactive.messaging.providers.extension.LegacyEmitterFactoryImpl; import io.smallrye.reactive.messaging.providers.extension.MediatorManager; import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl; @@ -80,6 +84,22 @@ public static void awaitForMosquittoToBeReady(GenericContainer mosquitto) { v.closeAndAwait(); } + public void awaitUntilReady(MqttSource source) { + await().until(() -> { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + source.isReady(builder); + return builder.build().isOk(); + }); + } + + public void awaitUntilReady(MqttSink sink) { + await().until(() -> { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + sink.isReady(builder); + return builder.build().isOk(); + }); + } + @AfterAll public static void stopBroker() { mosquitto.stop(); @@ -132,6 +152,7 @@ static Weld baseWeld(MapBasedConfig config) { weld.addBeanClass(EmitterFactoryImpl.class); weld.addBeanClass(MutinyEmitterFactoryImpl.class); weld.addBeanClass(LegacyEmitterFactoryImpl.class); + weld.addBeanClass(HealthCenter.class); // Add SmallRye Config weld.addExtension(new io.smallrye.config.inject.ConfigExtension()); diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MultipleTopicsConsumptionTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MultipleTopicsConsumptionTest.java index 6b77a61505..fb88c3eae3 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MultipleTopicsConsumptionTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MultipleTopicsConsumptionTest.java @@ -41,10 +41,9 @@ public void testWithClientId() { container = weld.initialize(); Consumers bean = container.getBeanManager().createInstance().select(Consumers.class).get(); + MqttConnector mqttConnector = this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get(); - await() - .until( - () -> this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get().isReady()); + await().until(() -> mqttConnector.getReadiness().isOk()); assertThat(bean.prices()).isEmpty(); assertThat(bean.products()).isEmpty(); @@ -68,10 +67,9 @@ public void testWithoutClientId() { container = weld.initialize(); Consumers bean = container.getBeanManager().createInstance().select(Consumers.class).get(); + MqttConnector mqttConnector = this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get(); - await() - .until( - () -> this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get().isReady()); + await().until(() -> mqttConnector.getReadiness().isOk()); assertThat(bean.prices()).isEmpty(); assertThat(bean.products()).isEmpty(); diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java index 5414c11ac1..ac2ceb83dd 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java @@ -47,7 +47,7 @@ public void testMutualTLS() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isReady); + awaitUntilReady(source); pause(); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttTestBase.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttTestBase.java index f80ec7f42a..64227535d1 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttTestBase.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttTestBase.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.mqtt; import static io.smallrye.reactive.messaging.mqtt.MqttTestBase.awaitForMosquittoToBeReady; +import static org.awaitility.Awaitility.await; import java.util.Properties; @@ -16,6 +17,7 @@ import org.testcontainers.containers.wait.strategy.Wait; import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.reactive.messaging.health.HealthReport; import io.vertx.mutiny.core.Vertx; public class MutualTlsMqttTestBase { @@ -38,6 +40,14 @@ public static void startBroker() { awaitForMosquittoToBeReady(mosquitto); } + public void awaitUntilReady(MqttSource source) { + await().until(() -> { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + source.isReady(builder); + return builder.build().isOk(); + }); + } + @AfterAll public static void stopBroker() { mosquitto.stop(); diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java index 76b7825dec..3d062ba5b3 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java @@ -49,7 +49,7 @@ public void testSecureSource() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isReady); + awaitUntilReady(source); pause(); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttTestBase.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttTestBase.java index 09a27b2e04..7f96426555 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttTestBase.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttTestBase.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.mqtt; import static io.smallrye.reactive.messaging.mqtt.MqttTestBase.awaitForMosquittoToBeReady; +import static org.awaitility.Awaitility.await; import org.eclipse.microprofile.config.ConfigProvider; import org.junit.jupiter.api.AfterAll; @@ -14,6 +15,7 @@ import org.testcontainers.containers.wait.strategy.Wait; import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.reactive.messaging.health.HealthReport; import io.vertx.mutiny.core.Vertx; public class SecureMqttTestBase { @@ -35,6 +37,14 @@ public static void startBroker() { awaitForMosquittoToBeReady(mosquitto); } + public void awaitUntilReady(MqttSource source) { + await().until(() -> { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + source.isReady(builder); + return builder.build().isOk(); + }); + } + @AfterAll public static void stopBroker() { mosquitto.stop(); diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java index 54218c43c6..5aba37869d 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java @@ -43,7 +43,7 @@ public void testTLS() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isReady); + awaitUntilReady(source); pause(); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttTestBase.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttTestBase.java index 5b0f5eb3b1..32bbec8f27 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttTestBase.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttTestBase.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.mqtt; import static io.smallrye.reactive.messaging.mqtt.MqttTestBase.awaitForMosquittoToBeReady; +import static org.awaitility.Awaitility.await; import java.util.Properties; @@ -16,6 +17,7 @@ import org.testcontainers.containers.wait.strategy.Wait; import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.reactive.messaging.health.HealthReport; import io.vertx.mutiny.core.Vertx; public class TlsMqttTestBase { @@ -42,6 +44,14 @@ public static void stopBroker() { mosquitto.stop(); } + public void awaitUntilReady(MqttSource source) { + await().until(() -> { + HealthReport.HealthReportBuilder builder = HealthReport.builder(); + source.isReady(builder); + return builder.build().isOk(); + }); + } + @BeforeEach public void setup() { mosquitto.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("mosquitto"))); From 5a5cc63f73c394b5854ceb5fd49e75ad44635caf Mon Sep 17 00:00:00 2001 From: Domenico Briganti Date: Thu, 19 Jan 2023 16:03:15 +0100 Subject: [PATCH 2/6] Remove SysOut and Repeated Test --- .../reactive/messaging/mqtt/MqttSink.java | 20 +++++++++---------- .../reactive/messaging/mqtt/MqttSinkTest.java | 2 -- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java index 22f8f04533..a6192ff9a9 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java @@ -81,7 +81,6 @@ public ConnectOnSubscribeProcessor(Vertx vertx, MqttClientSessionOptions options @Override public void subscribe(Subscriber> subscriber) { - System.out.println("subscribe"); this.subscriber = subscriber; subscriber.onSubscribe(new Subscription() { @Override @@ -108,7 +107,6 @@ public void cancel() { @Override public void onSubscribe(Subscription subscription) { - System.out.println("onSubscribe"); this.subscription = subscription; ClientHolder client = Clients.getHolder(vertx, options); reference.set(client); @@ -168,17 +166,17 @@ private CompletionStage send(Message msg) { return AsyncResultUni . toUni(h -> { - reference.get().getClient() - .publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain) - .onComplete(h); + reference.get().getClient() + .publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain) + .onComplete(h); }) .onItemOrFailure().transformToUni((s, f) -> { - if (f != null) { - return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg)); - } else { - OutgoingMessageMetadata.setResultOnMessage(msg, s); - return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg)); - } + if (f != null) { + return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg)); + } else { + OutgoingMessageMetadata.setResultOnMessage(msg, s); + return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg)); + } }) .subscribeAsCompletionStage(); } diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java index 54f27c405e..8989670a5e 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java @@ -15,7 +15,6 @@ import org.jboss.weld.environment.se.Weld; import org.jboss.weld.environment.se.WeldContainer; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscriber; @@ -116,7 +115,6 @@ public void testSinkUsingString() throws InterruptedException { assertThat(expected).hasValue(10); } - @RepeatedTest(5) @Test public void testABeanProducingMessagesSentToMQTT() throws InterruptedException { From 7636517ecd649c2604ade9998f3bebe556e06200 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 23 Jan 2023 20:14:57 +0000 Subject: [PATCH 3/6] Mqtt health check startup probe using Mutiny API --- .../reactive/messaging/mqtt/MqttSink.java | 149 +++++------------- .../providers/helpers/MultiUtils.java | 102 ++++++++++++ 2 files changed, 142 insertions(+), 109 deletions(-) diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java index a6192ff9a9..a417747b97 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java @@ -3,8 +3,6 @@ import static io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging.log; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -13,18 +11,17 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; -import org.reactivestreams.Processor; import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import io.netty.handler.codec.mqtt.MqttQoS; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.vertx.AsyncResultUni; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; import io.smallrye.reactive.messaging.health.HealthReport.HealthReportBuilder; -import io.smallrye.reactive.messaging.mqtt.Clients.ClientHolder; import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession; import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; @@ -41,7 +38,6 @@ public class MqttSink { private final SubscriberBuilder, Void> sink; private final AtomicBoolean started = new AtomicBoolean(); - private final AtomicBoolean ready = new AtomicBoolean(); private final AtomicReference reference = new AtomicReference<>(); public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config, @@ -54,95 +50,29 @@ public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config, qos = config.getQos(); healthEnabled = config.getHealthEnabled(); - sink = ReactiveStreams.> builder() - .via(new ConnectOnSubscribeProcessor(vertx, options)) - .flatMapCompletionStage(msg -> send(msg)) - .onError(log::errorWhileSendingMessageToBroker) - .ignore(); - - } - - /* - * This processor let che client mqtt to connect on su - */ - private class ConnectOnSubscribeProcessor implements Processor, Message> { - - private Vertx vertx; - private MqttClientSessionOptions options; - private Subscriber> subscriber; - private Subscription subscription; - private long requestedItem; - private Object lock = new Object(); - - public ConnectOnSubscribeProcessor(Vertx vertx, MqttClientSessionOptions options) { - this.vertx = vertx; - this.options = options; - } - - @Override - public void subscribe(Subscriber> subscriber) { - this.subscriber = subscriber; - subscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { - if (ready.get()) { - subscription.request(n); - } else { - synchronized (lock) { - if (ready.get()) { - subscription.request(n); - } else { - requestedItem = n; - } - } + Subscriber> subscriber = MultiUtils.via(m -> m.onSubscription() + .call(() -> { + Clients.ClientHolder client = reference.get(); + if (client == null) { + client = Clients.getHolder(vertx, options); + reference.set(client); } - } - - @Override - public void cancel() { - subscription.cancel(); - } - }); - } - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - ClientHolder client = Clients.getHolder(vertx, options); - reference.set(client); - client.start().onSuccess(ignore -> { - started.set(true); - synchronized (lock) { - ready.set(true); - if (requestedItem > 0) - subscription.request(requestedItem); - } - }).toCompletionStage(); - } - - @Override - public void onNext(Message t) { - subscriber.onNext(t); - } - - @Override - public void onError(Throwable t) { - subscriber.onError(t); - } - - @Override - public void onComplete() { - subscriber.onComplete(); - Clients.ClientHolder c = reference.getAndSet(null); - if (c != null) { - c.close() - .onComplete(ignore -> ready.set(false)); - } - } - }; - - private CompletionStage send(Message msg) { + return AsyncResultUni. toUni(h -> reference.get().start().onComplete(h)) + .onItem().invoke(() -> started.set(true)); + }) + .onItem().transformToUniAndConcatenate(this::send) + .onCompletion().invoke(() -> { + Clients.ClientHolder c = reference.getAndSet(null); + if (c != null) { + c.close().onComplete(ignore -> started.set(false)); + } + }) + .onFailure().invoke(log::errorWhileSendingMessageToBroker)); + sink = ReactiveStreams.fromSubscriber(subscriber); + } + private Uni> send(Message msg) { + MqttClientSession client = reference.get().getClient(); final String actualTopicToBeUsed; final MqttQoS actualQoS; final boolean isRetain; @@ -161,24 +91,21 @@ private CompletionStage send(Message msg) { if (actualTopicToBeUsed == null) { log.ignoringNoTopicSet(); - return CompletableFuture.completedFuture(msg); + return Uni.createFrom().item(msg); } return AsyncResultUni - . toUni(h -> { - reference.get().getClient() - .publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain) - .onComplete(h); - }) + . toUni(h -> client + .publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain) + .onComplete(h)) .onItemOrFailure().transformToUni((s, f) -> { - if (f != null) { - return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg)); - } else { - OutgoingMessageMetadata.setResultOnMessage(msg, s); - return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg)); - } - }) - .subscribeAsCompletionStage(); + if (f != null) { + return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg)); + } else { + OutgoingMessageMetadata.setResultOnMessage(msg, s); + return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg)); + } + }); } private Buffer convert(Object payload) { @@ -208,6 +135,10 @@ public SubscriberBuilder, Void> getSink() { return sink; } + private boolean isConnected() { + return reference.get() != null && reference.get().getClient().isConnected(); + } + public void isStarted(HealthReportBuilder builder) { if (healthEnabled) builder.add(channel, started.get()); @@ -215,12 +146,12 @@ public void isStarted(HealthReportBuilder builder) { public void isReady(HealthReportBuilder builder) { if (healthEnabled) - builder.add(channel, ready.get()); + builder.add(channel, isConnected()); } public void isAlive(HealthReportBuilder builder) { if (healthEnabled) - builder.add(channel, reference == null ? false : reference.get().getClient().isConnected()); + builder.add(channel, isConnected()); } } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java index 56f7928610..6bdbf95791 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java @@ -2,7 +2,9 @@ import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; +import java.util.Objects; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; import java.util.function.Supplier; @@ -15,6 +17,8 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.subscription.MultiSubscriber; import io.smallrye.reactive.messaging.MediatorConfiguration; @@ -84,4 +88,102 @@ public void onCompletion() { }; } + public static Subscriber via(Function, Multi> function) { + return via(NoopProcessor.create(), function); + } + + public static class NoopProcessor implements Processor, Subscription { + + private volatile boolean done = false; + private volatile boolean cancelled = false; + + private volatile Subscription upstream = null; + private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(NoopProcessor.class, Subscription.class, "upstream"); + private volatile Subscriber downstream = null; + private static final AtomicReferenceFieldUpdater DOWNSTREAM_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(NoopProcessor.class, Subscriber.class, "downstream"); + + public static NoopProcessor create() { + return new NoopProcessor<>(); + } + + private NoopProcessor() { + } + + @Override + public void subscribe(Subscriber downstream) { + ParameterValidation.nonNull(downstream, "downstream"); + if (DOWNSTREAM_UPDATER.compareAndSet(this, null, downstream)) { + if (upstream != null) { + downstream.onSubscribe(this); + } + } else { + Subscriptions.fail(downstream, new IllegalStateException("Already subscribed")); + } + } + + @Override + public void onSubscribe(Subscription upstream) { + if (isDoneOrCancelled() || !UPSTREAM_UPDATER.compareAndSet(this, null, upstream)) { + upstream.cancel(); + return; + } + Subscriber subscriber = downstream; + if (subscriber != null) { + subscriber.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (isDoneOrCancelled()) { + return; + } + Subscriber subscriber = downstream; + if (subscriber != null) { + subscriber.onNext(t); + } + } + + private boolean isDoneOrCancelled() { + return done || cancelled; + } + + @Override + public void onError(Throwable failure) { + Objects.requireNonNull(failure); + if (isDoneOrCancelled()) { + return; + } + + this.done = true; + } + + @Override + public void onComplete() { + if (isDoneOrCancelled()) { + return; + } + this.done = true; + } + + @Override + public void request(long n) { + if (n > 0) { + UPSTREAM_UPDATER.get(this).request(n); + } + } + + @Override + public void cancel() { + if (cancelled) { + return; + } + this.cancelled = true; + DOWNSTREAM_UPDATER.getAndSet(this, null); + } + + } + } From ecb39bbbe9f8aaec5a259bcd260607c322f9a017 Mon Sep 17 00:00:00 2001 From: Domenico Briganti Date: Tue, 31 Jan 2023 00:03:54 +0100 Subject: [PATCH 4/6] Uniform all vars as final --- .../java/io/smallrye/reactive/messaging/mqtt/MqttSink.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java index a417747b97..a27f36fae5 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java @@ -72,7 +72,7 @@ public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config, } private Uni> send(Message msg) { - MqttClientSession client = reference.get().getClient(); + final MqttClientSession client = reference.get().getClient(); final String actualTopicToBeUsed; final MqttQoS actualQoS; final boolean isRetain; @@ -88,7 +88,7 @@ private Uni> send(Message msg) { isRetain = false; actualQoS = MqttQoS.valueOf(this.qos); } - + if (actualTopicToBeUsed == null) { log.ignoringNoTopicSet(); return Uni.createFrom().item(msg); From 938f568ea384662aa58dd44f9a682c758a2501cf Mon Sep 17 00:00:00 2001 From: Domenico Briganti Date: Tue, 31 Jan 2023 01:20:07 +0100 Subject: [PATCH 5/6] Update Readiness and Liveness probes following https://developers.redhat.com/blog/2020/11/10/you-probably-need-liveness-and-readiness-probes --- .../reactive/messaging/mqtt/MqttSink.java | 25 +++++++++++++------ .../reactive/messaging/mqtt/MqttSource.java | 15 ++++++----- .../mqtt/session/MqttClientSession.java | 9 +++++++ 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java index a27f36fae5..0c20fc9977 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java @@ -38,6 +38,7 @@ public class MqttSink { private final SubscriberBuilder, Void> sink; private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean alive = new AtomicBoolean(); private final AtomicReference reference = new AtomicReference<>(); public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config, @@ -58,16 +59,22 @@ public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config, reference.set(client); } return AsyncResultUni. toUni(h -> reference.get().start().onComplete(h)) - .onItem().invoke(() -> started.set(true)); + .onItem().invoke(() -> { + started.set(true); + alive.set(true); + }); }) .onItem().transformToUniAndConcatenate(this::send) .onCompletion().invoke(() -> { Clients.ClientHolder c = reference.getAndSet(null); - if (c != null) { - c.close().onComplete(ignore -> started.set(false)); - } + if (c != null) + c.close(); + alive.set(false); }) - .onFailure().invoke(log::errorWhileSendingMessageToBroker)); + .onFailure().invoke(e -> { + alive.set(false); + log.errorWhileSendingMessageToBroker(e); + })); sink = ReactiveStreams.fromSubscriber(subscriber); } @@ -88,7 +95,7 @@ private Uni> send(Message msg) { isRetain = false; actualQoS = MqttQoS.valueOf(this.qos); } - + if (actualTopicToBeUsed == null) { log.ignoringNoTopicSet(); return Uni.createFrom().item(msg); @@ -139,6 +146,10 @@ private boolean isConnected() { return reference.get() != null && reference.get().getClient().isConnected(); } + private boolean isDisconnected() { + return reference.get() != null && reference.get().getClient().isDisconnected(); + } + public void isStarted(HealthReportBuilder builder) { if (healthEnabled) builder.add(channel, started.get()); @@ -151,7 +162,7 @@ public void isReady(HealthReportBuilder builder) { public void isAlive(HealthReportBuilder builder) { if (healthEnabled) - builder.add(channel, isConnected()); + builder.add(channel, !isDisconnected()); } } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java index d0e4d8fe35..af531c8319 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java @@ -28,7 +28,7 @@ public class MqttSource { private final PublisherBuilder> source; private final AtomicBoolean started = new AtomicBoolean(); - private final AtomicBoolean ready = new AtomicBoolean(); + private final AtomicBoolean alive = new AtomicBoolean(); private final Clients.ClientHolder holder; public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, @@ -60,7 +60,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, .onFailure(outcome -> log.info("Subscription failed!")) .onSuccess(outcome -> { log.info("Subscription success on topic " + topic + ", Max QoS " + outcome + "."); - ready.set(true); + alive.set(true); }); this.source = ReactiveStreams.fromPublisher( @@ -74,7 +74,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, }) .onOverflow().buffer(config.getBufferSize()) .onCancellation().call(() -> { - ready.set(false); + alive.set(false); if (config.getUnsubscribeOnDisconnection()) return Uni .createFrom() @@ -83,7 +83,10 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, else return Uni.createFrom().voidItem(); }) - .onFailure().invoke(log::unableToConnectToBroker)); + .onFailure().invoke(e -> { + alive.set(false); + log.unableToConnectToBroker(e); + })); } private MqttFailureHandler createFailureHandler(MqttFailureHandler.Strategy strategy, String channel) { @@ -108,12 +111,12 @@ public void isStarted(HealthReportBuilder builder) { public void isReady(HealthReportBuilder builder) { if (healthEnabled) - builder.add(channel, ready.get()); + builder.add(channel, alive.get()); } public void isAlive(HealthReportBuilder builder) { if (healthEnabled) - builder.add(channel, holder.getClient().isConnected()); + builder.add(channel, !holder.getClient().isDisconnected()); } } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java index 61c7327594..33ece79145 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java @@ -128,6 +128,15 @@ default boolean isConnected() { return getState() == SessionState.CONNECTED; } + /** + * Check if the session is currently disconnected. + * + * @return {@code true} if the session is currently disconnected, {@code false} otherwise. + */ + default boolean isDisconnected() { + return getState() == SessionState.DISCONNECTED; + } + /** * Subscribes to a single topic with related QoS level. * From e5779fda628cd22925c239cf1ab8b5e632114bba Mon Sep 17 00:00:00 2001 From: Domenico Briganti Date: Tue, 31 Jan 2023 11:32:10 +0100 Subject: [PATCH 6/6] Fix Readiness and Liveness flag --- .../io/smallrye/reactive/messaging/mqtt/MqttSink.java | 6 +----- .../io/smallrye/reactive/messaging/mqtt/MqttSource.java | 4 ++-- .../messaging/mqtt/session/MqttClientSession.java | 9 --------- .../reactive/messaging/mqtt/HealthCheckTest.java | 4 ++-- 4 files changed, 5 insertions(+), 18 deletions(-) diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java index 0c20fc9977..dcc0af7347 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java @@ -146,10 +146,6 @@ private boolean isConnected() { return reference.get() != null && reference.get().getClient().isConnected(); } - private boolean isDisconnected() { - return reference.get() != null && reference.get().getClient().isDisconnected(); - } - public void isStarted(HealthReportBuilder builder) { if (healthEnabled) builder.add(channel, started.get()); @@ -162,7 +158,7 @@ public void isReady(HealthReportBuilder builder) { public void isAlive(HealthReportBuilder builder) { if (healthEnabled) - builder.add(channel, !isDisconnected()); + builder.add(channel, alive.get()); } } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java index af531c8319..bf6b1e20ae 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java @@ -111,12 +111,12 @@ public void isStarted(HealthReportBuilder builder) { public void isReady(HealthReportBuilder builder) { if (healthEnabled) - builder.add(channel, alive.get()); + builder.add(channel, holder.getClient().isConnected()); } public void isAlive(HealthReportBuilder builder) { if (healthEnabled) - builder.add(channel, !holder.getClient().isDisconnected()); + builder.add(channel, alive.get()); } } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java index 33ece79145..61c7327594 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java @@ -128,15 +128,6 @@ default boolean isConnected() { return getState() == SessionState.CONNECTED; } - /** - * Check if the session is currently disconnected. - * - * @return {@code true} if the session is currently disconnected, {@code false} otherwise. - */ - default boolean isDisconnected() { - return getState() == SessionState.DISCONNECTED; - } - /** * Subscribes to a single topic with related QoS level. * diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/HealthCheckTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/HealthCheckTest.java index df4a2c1939..c2167b5372 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/HealthCheckTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/HealthCheckTest.java @@ -59,10 +59,10 @@ private void awaitAndVerify() { await() .pollInterval(Duration.ofSeconds(1)) - .until(() -> !connector.getLiveness().isOk()); + .until(() -> !connector.getReadiness().isOk()); liveness = getHealth().getLiveness(); - assertThat(liveness.isOk()).isFalse(); + assertThat(liveness.isOk()).isTrue(); }