Skip to content

Commit

Permalink
Merge pull request #2052 from dometec/main
Browse files Browse the repository at this point in the history
HealtCheck on MQTT Connector
  • Loading branch information
ozangunalp authored Jan 31, 2023
2 parents 9f02f93 + e5779fd commit f83fa93
Show file tree
Hide file tree
Showing 22 changed files with 450 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.reactive.messaging.mqtt;

import static io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging.log;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -34,6 +36,7 @@ static ClientHolder getHolder(Vertx vertx, MqttClientSessionOptions options) {
+ "<" + (server == null ? "" : server)
+ ">-[" + (clientId != null ? clientId : "") + "]";
return clients.computeIfAbsent(id, key -> {
log.infof("Create MQTT Client for %s.", id);
MqttClientSession client = MqttClientSession.create(vertx.getDelegate(), options);
return new ClientHolder(client);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.vertx.mutiny.core.Vertx;
Expand All @@ -31,6 +34,7 @@
@ConnectorAttribute(name = "client-id", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the client identifier")
@ConnectorAttribute(name = "auto-generated-client-id", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set if the MQTT client must generate clientId automatically", defaultValue = "true")
@ConnectorAttribute(name = "auto-keep-alive", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set if the MQTT client must handle `PINGREQ` automatically", defaultValue = "true")
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
@ConnectorAttribute(name = "ssl", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set whether SSL/TLS is enabled", defaultValue = "false")
@ConnectorAttribute(name = "ssl.keystore.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the keystore type [`pkcs12`, `jks`, `pem`]", defaultValue = "pkcs12")
@ConnectorAttribute(name = "ssl.keystore.location", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the keystore location. In case of `pem` type this is the server ca cert path")
Expand Down Expand Up @@ -61,12 +65,12 @@
@ConnectorAttribute(name = "merge", direction = OUTGOING, description = "Whether the connector should allow multiple upstreams", type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "buffer-size", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", type = "int", defaultValue = "128")
@ConnectorAttribute(name = "unsubscribe-on-disconnection", direction = INCOMING_AND_OUTGOING, description = "This flag restore the old behavior to unsubscribe from the broken on disconnection", type = "boolean", defaultValue = "false")
public class MqttConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
public class MqttConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, HealthReporter {

static final String CONNECTOR_NAME = "smallrye-mqtt";

@Inject
private ExecutionHolder executionHolder;
ExecutionHolder executionHolder;

@Inject
@Any
Expand Down Expand Up @@ -95,22 +99,41 @@ public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config
return sink.getSink();
}

public boolean isReady() {
boolean ready = isSourceReady();
@Override
public HealthReport getStartup() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MqttSource source : sources) {
source.isStarted(builder);
}
for (MqttSink sink : sinks) {
sink.isStarted(builder);
}
return builder.build();
}

@Override
public HealthReport getReadiness() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MqttSource source : sources) {
source.isReady(builder);
}
for (MqttSink sink : sinks) {
ready = ready && sink.isReady();
sink.isReady(builder);
}
return builder.build();

return ready;
}

public boolean isSourceReady() {
boolean ready = true;
@Override
public HealthReport getLiveness() {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
for (MqttSource source : sources) {
ready = ready && source.isReady();
source.isAlive(builder);
}
for (MqttSink sink : sinks) {
sink.isAlive(builder);
}
return ready;
return builder.build();
}

public void destroy(@Observes @Destroyed(ApplicationScoped.class) final Object context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import static io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging.log;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -13,14 +11,17 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.health.HealthReport.HealthReportBuilder;
import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
Expand All @@ -29,46 +30,56 @@

public class MqttSink {

private final String channel;
private final String topic;
private final int qos;
private final boolean healthEnabled;

private final SubscriberBuilder<? extends Message<?>, Void> sink;
private final AtomicBoolean ready = new AtomicBoolean();

private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean alive = new AtomicBoolean();
private final AtomicReference<Clients.ClientHolder> reference = new AtomicReference<>();

public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config,
Instance<MqttClientSessionOptions> instances) {

MqttClientSessionOptions options = MqttHelpers.createClientOptions(config, instances);
topic = config.getTopic().orElseGet(config::getChannel);

channel = config.getChannel();
topic = config.getTopic().orElse(channel);
qos = config.getQos();
healthEnabled = config.getHealthEnabled();

AtomicReference<Clients.ClientHolder> reference = new AtomicReference<>();
sink = ReactiveStreams.<Message<?>> builder()
.flatMapCompletionStage(msg -> {
Subscriber<? extends Message<?>> subscriber = MultiUtils.via(m -> m.onSubscription()
.call(() -> {
Clients.ClientHolder client = reference.get();
if (client == null) {
client = Clients.getHolder(vertx, options);
reference.set(client);
}

return client.start()
.onSuccess(ignore -> ready.set(true))
.map(ignore -> msg)
.toCompletionStage();
return AsyncResultUni.<Void> toUni(h -> reference.get().start().onComplete(h))
.onItem().invoke(() -> {
started.set(true);
alive.set(true);
});
})
.flatMapCompletionStage(msg -> send(reference, msg))
.onComplete(() -> {
.onItem().transformToUniAndConcatenate(this::send)
.onCompletion().invoke(() -> {
Clients.ClientHolder c = reference.getAndSet(null);
if (c != null) {
c.close()
.onComplete(ignore -> ready.set(false));
}
if (c != null)
c.close();
alive.set(false);
})
.onError(log::errorWhileSendingMessageToBroker)
.ignore();
.onFailure().invoke(e -> {
alive.set(false);
log.errorWhileSendingMessageToBroker(e);
}));
sink = ReactiveStreams.fromSubscriber(subscriber);
}

private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference, Message<?> msg) {
MqttClientSession client = reference.get().getClient();
private Uni<? extends Message<?>> send(Message<?> msg) {
final MqttClientSession client = reference.get().getClient();
final String actualTopicToBeUsed;
final MqttQoS actualQoS;
final boolean isRetain;
Expand All @@ -87,7 +98,7 @@ private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference,

if (actualTopicToBeUsed == null) {
log.ignoringNoTopicSet();
return CompletableFuture.completedFuture(msg);
return Uni.createFrom().item(msg);
}

return AsyncResultUni
Expand All @@ -101,8 +112,7 @@ private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference,
OutgoingMessageMetadata.setResultOnMessage(msg, s);
return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg));
}
})
.subscribeAsCompletionStage();
});
}

private Buffer convert(Object payload) {
Expand Down Expand Up @@ -132,7 +142,23 @@ public SubscriberBuilder<? extends Message<?>, Void> getSink() {
return sink;
}

public boolean isReady() {
return ready.get();
private boolean isConnected() {
return reference.get() != null && reference.get().getClient().isConnected();
}

public void isStarted(HealthReportBuilder builder) {
if (healthEnabled)
builder.add(channel, started.get());
}

public void isReady(HealthReportBuilder builder) {
if (healthEnabled)
builder.add(channel, isConnected());
}

public void isAlive(HealthReportBuilder builder) {
if (healthEnabled)
builder.add(channel, alive.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport.HealthReportBuilder;
import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.internal.MqttTopicHelper;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
Expand All @@ -20,17 +21,26 @@

public class MqttSource {

private final PublisherBuilder<MqttMessage<?>> source;
private final AtomicBoolean ready = new AtomicBoolean();
private final String channel;
private final Pattern pattern;
private final boolean healthEnabled;

private final PublisherBuilder<MqttMessage<?>> source;

private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean alive = new AtomicBoolean();
private final Clients.ClientHolder holder;

public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config,
Instance<MqttClientSessionOptions> instances) {
MqttClientSessionOptions options = MqttHelpers.createClientOptions(config, instances);

String topic = config.getTopic().orElseGet(config::getChannel);
channel = config.getChannel();
String topic = config.getTopic().orElse(channel);
int qos = config.getQos();
boolean broadcast = config.getBroadcast();
healthEnabled = config.getHealthEnabled();

MqttFailureHandler.Strategy strategy = MqttFailureHandler.Strategy.from(config.getFailureStrategy());
MqttFailureHandler onNack = createFailureHandler(strategy, config.getChannel());

Expand All @@ -43,26 +53,28 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config,
pattern = null;
}

Clients.ClientHolder holder = Clients.getHolder(vertx, options);
holder.start();
holder = Clients.getHolder(vertx, options);
holder.start().onSuccess(ignore -> started.set(true));
holder.getClient()
.subscribe(topic, RequestedQoS.valueOf(qos))
.onComplete(outcome -> log.info("Subscription outcome: " + outcome))
.onSuccess(ignore -> ready.set(true));
.onFailure(outcome -> log.info("Subscription failed!"))
.onSuccess(outcome -> {
log.info("Subscription success on topic " + topic + ", Max QoS " + outcome + ".");
alive.set(true);
});

this.source = ReactiveStreams.fromPublisher(
holder.stream()
.select().where(m -> MqttTopicHelper.matches(topic, pattern, m))
.onItem().transform(m -> new ReceivingMqttMessage(m, onNack))
.stage(multi -> {
if (broadcast) {
if (broadcast)
return multi.broadcast().toAllSubscribers();
}
return multi;
})
.onOverflow().buffer(config.getBufferSize())
.onCancellation().call(() -> {
ready.set(false);
alive.set(false);
if (config.getUnsubscribeOnDisconnection())
return Uni
.createFrom()
Expand All @@ -71,7 +83,10 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config,
else
return Uni.createFrom().voidItem();
})
.onFailure().invoke(log::unableToConnectToBroker));
.onFailure().invoke(e -> {
alive.set(false);
log.unableToConnectToBroker(e);
}));
}

private MqttFailureHandler createFailureHandler(MqttFailureHandler.Strategy strategy, String channel) {
Expand All @@ -89,8 +104,19 @@ PublisherBuilder<MqttMessage<?>> getSource() {
return source;
}

public boolean isReady() {
return ready.get();
public void isStarted(HealthReportBuilder builder) {
if (healthEnabled)
builder.add(channel, started.get());
}

public void isReady(HealthReportBuilder builder) {
if (healthEnabled)
builder.add(channel, holder.getClient().isConnected());
}

public void isAlive(HealthReportBuilder builder) {
if (healthEnabled)
builder.add(channel, alive.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ public void testWithClientId() {
container = weld.initialize();

App bean = container.getBeanManager().createInstance().select(App.class).get();
MqttConnector mqttConnector = this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get();

await()
.until(
() -> this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get().isReady());

await().until(() -> mqttConnector.getReadiness().isOk());
await().atMost(2, TimeUnit.MINUTES).until(() -> bean.prices().size() >= 10);
assertThat(bean.prices()).isNotEmpty();
}
Expand Down
Loading

0 comments on commit f83fa93

Please sign in to comment.