diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ConnectionHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ConnectionHolder.java index 8317b355ca..ca6af90549 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ConnectionHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ConnectionHolder.java @@ -15,9 +15,11 @@ import io.vertx.mutiny.rabbitmq.RabbitMQClient; public class ConnectionHolder { + private final RabbitMQClient client; private final RabbitMQConnectorCommonConfiguration configuration; - private final AtomicReference holder = new AtomicReference<>(); + private final AtomicReference connectionHolder = new AtomicReference<>(); + private final Uni connector; private final Vertx vertx; @@ -27,6 +29,37 @@ public ConnectionHolder(RabbitMQClient client, this.client = client; this.configuration = configuration; this.vertx = vertx; + this.connector = Uni.createFrom().voidItem() + .onItem().transformToUni(unused -> { + log.establishingConnection(configuration.getChannel()); + return client.start() + .onSubscription().invoke(() -> log.connectionEstablished(configuration.getChannel())) + .onItem().transform(ignored -> { + connectionHolder.set(new CurrentConnection(client, Vertx.currentContext())); + + // handle the case we are already disconnected. + if (!client.isConnected() || connectionHolder.get() == null) { + // Throwing the exception would trigger a retry. + connectionHolder.set(null); + throw ex.illegalStateConnectionDisconnected(); + } + + return client; + }) + .onFailure().invoke(ex -> log.unableToConnectToBroker(ex)) + .onFailure().invoke(t -> { + connectionHolder.set(null); + log.unableToRecoverFromConnectionDisruption(t); + }); + }) + .memoize().until(() -> { + CurrentConnection connection = connectionHolder.get(); + if (connection == null) { + return true; + } + return !connection.connection.isConnected(); + }); + } public static CompletionStage runOnContext(Context context, Runnable runnable) { @@ -43,7 +76,8 @@ public static CompletionStage runOnContext(Context context, Runnable runna return future; } - public static CompletionStage runOnContextAndReportFailure(Context context, Throwable reason, Runnable runnable) { + public static CompletionStage runOnContextAndReportFailure(Context context, Throwable reason, + Runnable runnable) { CompletableFuture future = new CompletableFuture<>(); if (Vertx.currentContext() == context) { runnable.run(); @@ -58,7 +92,7 @@ public static CompletionStage runOnContextAndReportFailure(Context context } public Context getContext() { - CurrentConnection connection = holder.get(); + CurrentConnection connection = connectionHolder.get(); if (connection != null) { return connection.context; } else { @@ -84,47 +118,11 @@ public synchronized void onFailure(Consumer callback) { } public Uni getOrEstablishConnection() { - return Uni.createFrom().item(() -> { - final CurrentConnection connection = holder.get(); - if (connection != null && connection.connection != null && connection.connection.isConnected()) { - return connection.connection; - } else { - return null; - } - }) - .onItem().ifNull().switchTo(() -> { - // we don't have a connection, try to connect. - CurrentConnection reference = holder.get(); - - if (reference != null && reference.connection != null && reference.connection.isConnected()) { - RabbitMQClient connection = reference.connection; - return Uni.createFrom().item(connection); - } - - log.establishingConnection(configuration.getChannel()); - return client.start() - .onSubscribe().invoke(() -> log.connectionEstablished(configuration.getChannel())) - .onItem().transform(ignored -> { - holder.set(new CurrentConnection(client, Vertx.currentContext())); - - // handle the case we are already disconnected. - if (!client.isConnected() || holder.get() == null) { - // Throwing the exception would trigger a retry. - holder.set(null); - throw ex.illegalStateConnectionDisconnected(); - } - - return client; - }) - .onFailure().invoke(ex -> log.unableToConnectToBroker(ex)) - .onFailure().invoke(t -> { - holder.set(null); - log.unableToRecoverFromConnectionDisruption(t); - }); - }); + return connector; } private static class CurrentConnection { + final RabbitMQClient connection; final Context context; diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index 303f17900b..2bd41cdee1 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -10,7 +10,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -350,47 +349,31 @@ public SubscriberBuilder, Void> getSubscriberBuilder(final // Create a client final RabbitMQClient client = createClient(new RabbitMQConnectorCommonConfiguration(config)); - // This will hold our publisher, assuming we can get hold of one - final AtomicReference sender = new AtomicReference<>(); - final ConnectionHolder holder = new ConnectionHolder(client, oc, getVertx()); - final Uni getSender = Uni.createFrom().item(sender.get()) - .onItem().ifNull().switchTo(() -> { - - // If we already have a sender, use it. - RabbitMQPublisher current = sender.get(); - if (current != null && client.isConnected()) { - return Uni.createFrom().item(current); - } - - return holder.getOrEstablishConnection() - // Once connected, ensure we create the exchange to which messages are to be sent - .onItem().call(connection -> establishExchange(connection, oc)) - // Once exchange exists, create ourselves a publisher - .onItem() - .transformToUni(connection -> Uni.createFrom().item(RabbitMQPublisher.create(getVertx(), connection, - new RabbitMQPublisherOptions() - .setReconnectAttempts(oc.getReconnectAttempts()) - .setReconnectInterval(oc.getReconnectInterval()) - .setMaxInternalQueueSize( - oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE))))) - // Start the publisher - .onItem().call(RabbitMQPublisher::start) - .invoke(s -> { - // Make a note of the publisher and add the channel in the opened state - sender.set(s); - outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.CONNECTED); - }); + final Uni getSender = holder.getOrEstablishConnection() + // Once connected, ensure we create the exchange to which messages are to be sent + .onItem().call(connection -> { + return establishExchange(connection, oc); }) - // If the downstream cancels or on failure, drop the sender. - .onFailure().invoke(t -> { - sender.set(null); - outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED); + // Once exchange exists, create ourselves a publisher + .onItem().transformToUni(connection -> { + return Uni.createFrom().item(RabbitMQPublisher.create(getVertx(), connection, + new RabbitMQPublisherOptions() + .setReconnectAttempts(oc.getReconnectAttempts()) + .setReconnectInterval(oc.getReconnectInterval()) + .setMaxInternalQueueSize( + oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE)))); }) - .onCancellation().invoke(() -> { - sender.set(null); - outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED); - }); + // Start the publisher + .onItem().call(RabbitMQPublisher::start) + .invoke(s -> { + // Add the channel in the opened state + outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.CONNECTED); + }) + .onFailure().invoke(t -> outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED)) + .onFailure().recoverWithNull() + .memoize().indefinitely() + .onCancellation().invoke(() -> outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED)); // Set up a sender based on the publisher we established above final RabbitMQMessageSender processor = new RabbitMQMessageSender(