diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index 2b7f780dbb..960d36a383 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -15,6 +15,8 @@ */ package io.lettuce.core.cluster; +import static io.lettuce.core.event.cluster.AdaptiveRefreshTriggeredEvent.*; + import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -147,7 +149,7 @@ public void onReconnectAttempt(int attempt) { if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS) && attempt >= getClusterTopologyRefreshOptions().getRefreshTriggersReconnectAttempts()) { if (indicateTopologyRefreshSignal()) { - emitAdaptiveRefreshScheduledEvent(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS); + emitPersistentReconnectAdaptiveRefreshScheduledEvent(attempt); } } } @@ -157,7 +159,7 @@ public void onUncoveredSlot(int slot) { if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT)) { if (indicateTopologyRefreshSignal()) { - emitAdaptiveRefreshScheduledEvent(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT); + emitUncoveredSlotAdaptiveRefreshScheduledEvent(slot); } } } @@ -180,6 +182,26 @@ private void emitAdaptiveRefreshScheduledEvent(ClusterTopologyRefreshOptions.Ref clientResources.eventBus().publish(event); } + private void emitPersistentReconnectAdaptiveRefreshScheduledEvent(int attempt) { + logger.debug("Adaptive refresh event due to: {} attempt {}", + ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS, attempt); + + AdaptiveRefreshTriggeredEvent event = new PersistentReconnectsAdaptiveRefreshTriggeredEvent(partitions, + this::scheduleRefresh, attempt); + + clientResources.eventBus().publish(event); + } + + private void emitUncoveredSlotAdaptiveRefreshScheduledEvent(int slot) { + logger.debug("Adaptive refresh event due to: {} for slot {}", + ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT, slot); + + AdaptiveRefreshTriggeredEvent event = new UncoveredSlotAdaptiveRefreshTriggeredEvent(partitions, this::scheduleRefresh, + slot); + + clientResources.eventBus().publish(event); + } + private boolean indicateTopologyRefreshSignal() { logger.debug("ClusterTopologyRefreshScheduler.indicateTopologyRefreshSignal()"); diff --git a/src/main/java/io/lettuce/core/event/cluster/AdaptiveRefreshTriggeredEvent.java b/src/main/java/io/lettuce/core/event/cluster/AdaptiveRefreshTriggeredEvent.java index 086e5c47a1..d8b87bad84 100644 --- a/src/main/java/io/lettuce/core/event/cluster/AdaptiveRefreshTriggeredEvent.java +++ b/src/main/java/io/lettuce/core/event/cluster/AdaptiveRefreshTriggeredEvent.java @@ -32,10 +32,11 @@ public class AdaptiveRefreshTriggeredEvent implements Event { private final Supplier partitionsSupplier; private final Runnable topologyRefreshScheduler; + private final ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger; public AdaptiveRefreshTriggeredEvent(Supplier partitionsSupplier, Runnable topologyRefreshScheduler, - ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) { + ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) { this.partitionsSupplier = partitionsSupplier; this.topologyRefreshScheduler = topologyRefreshScheduler; this.refreshTrigger = refreshTrigger; @@ -59,8 +60,64 @@ public Partitions getPartitions() { /** * Retrieve the {@link ClusterTopologyRefreshOptions.RefreshTrigger} that caused this event. + * + * @return the {@link ClusterTopologyRefreshOptions.RefreshTrigger} that caused this event. */ public ClusterTopologyRefreshOptions.RefreshTrigger getRefreshTrigger() { return refreshTrigger; } + + /** + * Extension to {@link AdaptiveRefreshTriggeredEvent} providing the reconnect-attempt counter value. + * + * @since 6.2.3 + */ + public static class PersistentReconnectsAdaptiveRefreshTriggeredEvent extends AdaptiveRefreshTriggeredEvent { + + private final int attempt; + + public PersistentReconnectsAdaptiveRefreshTriggeredEvent(Supplier partitionsSupplier, + Runnable topologyRefreshScheduler, int attempt) { + super(partitionsSupplier, topologyRefreshScheduler, + ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS); + this.attempt = attempt; + } + + /** + * Return the reconnection-attempt at which this event was emitted. + * + * @return the reconnection-attempt at which this event was emitted. + */ + public int getAttempt() { + return attempt; + } + + } + + /** + * Extension to {@link AdaptiveRefreshTriggeredEvent} providing the uncovered slot value. + * + * @since 6.2.3 + */ + public static class UncoveredSlotAdaptiveRefreshTriggeredEvent extends AdaptiveRefreshTriggeredEvent { + + private final int slot; + + public UncoveredSlotAdaptiveRefreshTriggeredEvent(Supplier partitionsSupplier, + Runnable topologyRefreshScheduler, int slot) { + super(partitionsSupplier, topologyRefreshScheduler, ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT); + this.slot = slot; + } + + /** + * Return the slot that is not covered. + * + * @return the slot that is not covered. + */ + public int getSlot() { + return slot; + } + + } + }