Skip to content

Commit

Permalink
Merge pull request #1447 from outfoxx/fix/rabbitmq_connector_races
Browse files Browse the repository at this point in the history
Use `memoize` to concurrently cache in `ConnectionHolder` and `RabbitMQConnector`
  • Loading branch information
cescoffier authored Oct 11, 2021
2 parents 9d654eb + 6ce7d38 commit d6ebbbf
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import io.vertx.mutiny.rabbitmq.RabbitMQClient;

public class ConnectionHolder {

private final RabbitMQClient client;
private final RabbitMQConnectorCommonConfiguration configuration;
private final AtomicReference<CurrentConnection> holder = new AtomicReference<>();
private final AtomicReference<CurrentConnection> connectionHolder = new AtomicReference<>();
private final Uni<RabbitMQClient> connector;

private final Vertx vertx;

Expand All @@ -27,6 +29,37 @@ public ConnectionHolder(RabbitMQClient client,
this.client = client;
this.configuration = configuration;
this.vertx = vertx;
this.connector = Uni.createFrom().voidItem()
.onItem().transformToUni(unused -> {
log.establishingConnection(configuration.getChannel());
return client.start()
.onSubscription().invoke(() -> log.connectionEstablished(configuration.getChannel()))
.onItem().transform(ignored -> {
connectionHolder.set(new CurrentConnection(client, Vertx.currentContext()));

// handle the case we are already disconnected.
if (!client.isConnected() || connectionHolder.get() == null) {
// Throwing the exception would trigger a retry.
connectionHolder.set(null);
throw ex.illegalStateConnectionDisconnected();
}

return client;
})
.onFailure().invoke(ex -> log.unableToConnectToBroker(ex))
.onFailure().invoke(t -> {
connectionHolder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
});
})
.memoize().until(() -> {
CurrentConnection connection = connectionHolder.get();
if (connection == null) {
return true;
}
return !connection.connection.isConnected();
});

}

public static CompletionStage<Void> runOnContext(Context context, Runnable runnable) {
Expand All @@ -43,7 +76,8 @@ public static CompletionStage<Void> runOnContext(Context context, Runnable runna
return future;
}

public static CompletionStage<Void> runOnContextAndReportFailure(Context context, Throwable reason, Runnable runnable) {
public static CompletionStage<Void> runOnContextAndReportFailure(Context context, Throwable reason,
Runnable runnable) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (Vertx.currentContext() == context) {
runnable.run();
Expand All @@ -58,7 +92,7 @@ public static CompletionStage<Void> runOnContextAndReportFailure(Context context
}

public Context getContext() {
CurrentConnection connection = holder.get();
CurrentConnection connection = connectionHolder.get();
if (connection != null) {
return connection.context;
} else {
Expand All @@ -84,47 +118,11 @@ public synchronized void onFailure(Consumer<Throwable> callback) {
}

public Uni<RabbitMQClient> getOrEstablishConnection() {
return Uni.createFrom().item(() -> {
final CurrentConnection connection = holder.get();
if (connection != null && connection.connection != null && connection.connection.isConnected()) {
return connection.connection;
} else {
return null;
}
})
.onItem().ifNull().switchTo(() -> {
// we don't have a connection, try to connect.
CurrentConnection reference = holder.get();

if (reference != null && reference.connection != null && reference.connection.isConnected()) {
RabbitMQClient connection = reference.connection;
return Uni.createFrom().item(connection);
}

log.establishingConnection(configuration.getChannel());
return client.start()
.onSubscribe().invoke(() -> log.connectionEstablished(configuration.getChannel()))
.onItem().transform(ignored -> {
holder.set(new CurrentConnection(client, Vertx.currentContext()));

// handle the case we are already disconnected.
if (!client.isConnected() || holder.get() == null) {
// Throwing the exception would trigger a retry.
holder.set(null);
throw ex.illegalStateConnectionDisconnected();
}

return client;
})
.onFailure().invoke(ex -> log.unableToConnectToBroker(ex))
.onFailure().invoke(t -> {
holder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
});
});
return connector;
}

private static class CurrentConnection {

final RabbitMQClient connection;
final Context context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -350,47 +349,31 @@ public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(final
// Create a client
final RabbitMQClient client = createClient(new RabbitMQConnectorCommonConfiguration(config));

// This will hold our publisher, assuming we can get hold of one
final AtomicReference<RabbitMQPublisher> sender = new AtomicReference<>();

final ConnectionHolder holder = new ConnectionHolder(client, oc, getVertx());
final Uni<RabbitMQPublisher> getSender = Uni.createFrom().item(sender.get())
.onItem().ifNull().switchTo(() -> {

// If we already have a sender, use it.
RabbitMQPublisher current = sender.get();
if (current != null && client.isConnected()) {
return Uni.createFrom().item(current);
}

return holder.getOrEstablishConnection()
// Once connected, ensure we create the exchange to which messages are to be sent
.onItem().call(connection -> establishExchange(connection, oc))
// Once exchange exists, create ourselves a publisher
.onItem()
.transformToUni(connection -> Uni.createFrom().item(RabbitMQPublisher.create(getVertx(), connection,
new RabbitMQPublisherOptions()
.setReconnectAttempts(oc.getReconnectAttempts())
.setReconnectInterval(oc.getReconnectInterval())
.setMaxInternalQueueSize(
oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE)))))
// Start the publisher
.onItem().call(RabbitMQPublisher::start)
.invoke(s -> {
// Make a note of the publisher and add the channel in the opened state
sender.set(s);
outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.CONNECTED);
});
final Uni<RabbitMQPublisher> getSender = holder.getOrEstablishConnection()
// Once connected, ensure we create the exchange to which messages are to be sent
.onItem().call(connection -> {
return establishExchange(connection, oc);
})
// If the downstream cancels or on failure, drop the sender.
.onFailure().invoke(t -> {
sender.set(null);
outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED);
// Once exchange exists, create ourselves a publisher
.onItem().transformToUni(connection -> {
return Uni.createFrom().item(RabbitMQPublisher.create(getVertx(), connection,
new RabbitMQPublisherOptions()
.setReconnectAttempts(oc.getReconnectAttempts())
.setReconnectInterval(oc.getReconnectInterval())
.setMaxInternalQueueSize(
oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE))));
})
.onCancellation().invoke(() -> {
sender.set(null);
outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED);
});
// Start the publisher
.onItem().call(RabbitMQPublisher::start)
.invoke(s -> {
// Add the channel in the opened state
outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.CONNECTED);
})
.onFailure().invoke(t -> outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED))
.onFailure().recoverWithNull()
.memoize().indefinitely()
.onCancellation().invoke(() -> outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED));

// Set up a sender based on the publisher we established above
final RabbitMQMessageSender processor = new RabbitMQMessageSender(
Expand Down

0 comments on commit d6ebbbf

Please sign in to comment.