Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use memoize to concurrently cache in ConnectionHolder and RabbitMQConnector #1447

Merged
merged 1 commit into from
Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import io.vertx.mutiny.rabbitmq.RabbitMQClient;

public class ConnectionHolder {

private final RabbitMQClient client;
private final RabbitMQConnectorCommonConfiguration configuration;
private final AtomicReference<CurrentConnection> holder = new AtomicReference<>();
private final AtomicReference<CurrentConnection> connectionHolder = new AtomicReference<>();
private final Uni<RabbitMQClient> connector;

private final Vertx vertx;

Expand All @@ -27,6 +29,37 @@ public ConnectionHolder(RabbitMQClient client,
this.client = client;
this.configuration = configuration;
this.vertx = vertx;
this.connector = Uni.createFrom().voidItem()
.onItem().transformToUni(unused -> {
log.establishingConnection(configuration.getChannel());
return client.start()
.onSubscription().invoke(() -> log.connectionEstablished(configuration.getChannel()))
.onItem().transform(ignored -> {
connectionHolder.set(new CurrentConnection(client, Vertx.currentContext()));

// handle the case we are already disconnected.
if (!client.isConnected() || connectionHolder.get() == null) {
// Throwing the exception would trigger a retry.
connectionHolder.set(null);
throw ex.illegalStateConnectionDisconnected();
}

return client;
})
.onFailure().invoke(ex -> log.unableToConnectToBroker(ex))
.onFailure().invoke(t -> {
connectionHolder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
});
})
.memoize().until(() -> {
CurrentConnection connection = connectionHolder.get();
if (connection == null) {
return true;
}
return !connection.connection.isConnected();
});

}

public static CompletionStage<Void> runOnContext(Context context, Runnable runnable) {
Expand All @@ -43,7 +76,8 @@ public static CompletionStage<Void> runOnContext(Context context, Runnable runna
return future;
}

public static CompletionStage<Void> runOnContextAndReportFailure(Context context, Throwable reason, Runnable runnable) {
public static CompletionStage<Void> runOnContextAndReportFailure(Context context, Throwable reason,
Runnable runnable) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (Vertx.currentContext() == context) {
runnable.run();
Expand All @@ -58,7 +92,7 @@ public static CompletionStage<Void> runOnContextAndReportFailure(Context context
}

public Context getContext() {
CurrentConnection connection = holder.get();
CurrentConnection connection = connectionHolder.get();
if (connection != null) {
return connection.context;
} else {
Expand All @@ -84,47 +118,11 @@ public synchronized void onFailure(Consumer<Throwable> callback) {
}

public Uni<RabbitMQClient> getOrEstablishConnection() {
return Uni.createFrom().item(() -> {
final CurrentConnection connection = holder.get();
if (connection != null && connection.connection != null && connection.connection.isConnected()) {
return connection.connection;
} else {
return null;
}
})
.onItem().ifNull().switchTo(() -> {
// we don't have a connection, try to connect.
CurrentConnection reference = holder.get();

if (reference != null && reference.connection != null && reference.connection.isConnected()) {
RabbitMQClient connection = reference.connection;
return Uni.createFrom().item(connection);
}

log.establishingConnection(configuration.getChannel());
return client.start()
.onSubscribe().invoke(() -> log.connectionEstablished(configuration.getChannel()))
.onItem().transform(ignored -> {
holder.set(new CurrentConnection(client, Vertx.currentContext()));

// handle the case we are already disconnected.
if (!client.isConnected() || holder.get() == null) {
// Throwing the exception would trigger a retry.
holder.set(null);
throw ex.illegalStateConnectionDisconnected();
}

return client;
})
.onFailure().invoke(ex -> log.unableToConnectToBroker(ex))
.onFailure().invoke(t -> {
holder.set(null);
log.unableToRecoverFromConnectionDisruption(t);
});
});
return connector;
}

private static class CurrentConnection {

final RabbitMQClient connection;
final Context context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -350,47 +349,31 @@ public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(final
// Create a client
final RabbitMQClient client = createClient(new RabbitMQConnectorCommonConfiguration(config));

// This will hold our publisher, assuming we can get hold of one
final AtomicReference<RabbitMQPublisher> sender = new AtomicReference<>();

final ConnectionHolder holder = new ConnectionHolder(client, oc, getVertx());
final Uni<RabbitMQPublisher> getSender = Uni.createFrom().item(sender.get())
.onItem().ifNull().switchTo(() -> {

// If we already have a sender, use it.
RabbitMQPublisher current = sender.get();
if (current != null && client.isConnected()) {
return Uni.createFrom().item(current);
}

return holder.getOrEstablishConnection()
// Once connected, ensure we create the exchange to which messages are to be sent
.onItem().call(connection -> establishExchange(connection, oc))
// Once exchange exists, create ourselves a publisher
.onItem()
.transformToUni(connection -> Uni.createFrom().item(RabbitMQPublisher.create(getVertx(), connection,
new RabbitMQPublisherOptions()
.setReconnectAttempts(oc.getReconnectAttempts())
.setReconnectInterval(oc.getReconnectInterval())
.setMaxInternalQueueSize(
oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE)))))
// Start the publisher
.onItem().call(RabbitMQPublisher::start)
.invoke(s -> {
// Make a note of the publisher and add the channel in the opened state
sender.set(s);
outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.CONNECTED);
});
final Uni<RabbitMQPublisher> getSender = holder.getOrEstablishConnection()
// Once connected, ensure we create the exchange to which messages are to be sent
.onItem().call(connection -> {
return establishExchange(connection, oc);
})
// If the downstream cancels or on failure, drop the sender.
.onFailure().invoke(t -> {
sender.set(null);
outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED);
// Once exchange exists, create ourselves a publisher
.onItem().transformToUni(connection -> {
return Uni.createFrom().item(RabbitMQPublisher.create(getVertx(), connection,
new RabbitMQPublisherOptions()
.setReconnectAttempts(oc.getReconnectAttempts())
.setReconnectInterval(oc.getReconnectInterval())
.setMaxInternalQueueSize(
oc.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE))));
})
.onCancellation().invoke(() -> {
sender.set(null);
outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED);
});
// Start the publisher
.onItem().call(RabbitMQPublisher::start)
.invoke(s -> {
// Add the channel in the opened state
outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.CONNECTED);
})
.onFailure().invoke(t -> outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED))
.onFailure().recoverWithNull()
.memoize().indefinitely()
.onCancellation().invoke(() -> outgoingChannelStatus.put(oc.getChannel(), ChannelStatus.NOT_CONNECTED));

// Set up a sender based on the publisher we established above
final RabbitMQMessageSender processor = new RabbitMQMessageSender(
Expand Down