Skip to content

Commit

Permalink
Polishing #2338
Browse files Browse the repository at this point in the history
Add event subclasses for persistent reconnects and uncovered slots exposing the retry-counter/slot value.
  • Loading branch information
mp911de committed Apr 18, 2023
1 parent 0f1b2ca commit c4b45d2
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.lettuce.core.cluster;

import static io.lettuce.core.event.cluster.AdaptiveRefreshTriggeredEvent.*;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -147,7 +149,7 @@ public void onReconnectAttempt(int attempt) {
if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
&& attempt >= getClusterTopologyRefreshOptions().getRefreshTriggersReconnectAttempts()) {
if (indicateTopologyRefreshSignal()) {
emitAdaptiveRefreshScheduledEvent(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS);
emitPersistentReconnectAdaptiveRefreshScheduledEvent(attempt);
}
}
}
Expand All @@ -157,7 +159,7 @@ public void onUncoveredSlot(int slot) {

if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT)) {
if (indicateTopologyRefreshSignal()) {
emitAdaptiveRefreshScheduledEvent(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT);
emitUncoveredSlotAdaptiveRefreshScheduledEvent(slot);
}
}
}
Expand All @@ -180,6 +182,26 @@ private void emitAdaptiveRefreshScheduledEvent(ClusterTopologyRefreshOptions.Ref
clientResources.eventBus().publish(event);
}

private void emitPersistentReconnectAdaptiveRefreshScheduledEvent(int attempt) {
logger.debug("Adaptive refresh event due to: {} attempt {}",
ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS, attempt);

AdaptiveRefreshTriggeredEvent event = new PersistentReconnectsAdaptiveRefreshTriggeredEvent(partitions,
this::scheduleRefresh, attempt);

clientResources.eventBus().publish(event);
}

private void emitUncoveredSlotAdaptiveRefreshScheduledEvent(int slot) {
logger.debug("Adaptive refresh event due to: {} for slot {}",
ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT, slot);

AdaptiveRefreshTriggeredEvent event = new UncoveredSlotAdaptiveRefreshTriggeredEvent(partitions, this::scheduleRefresh,
slot);

clientResources.eventBus().publish(event);
}

private boolean indicateTopologyRefreshSignal() {

logger.debug("ClusterTopologyRefreshScheduler.indicateTopologyRefreshSignal()");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ public class AdaptiveRefreshTriggeredEvent implements Event {
private final Supplier<Partitions> partitionsSupplier;

private final Runnable topologyRefreshScheduler;

private final ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger;

public AdaptiveRefreshTriggeredEvent(Supplier<Partitions> partitionsSupplier, Runnable topologyRefreshScheduler,
ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) {
ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) {
this.partitionsSupplier = partitionsSupplier;
this.topologyRefreshScheduler = topologyRefreshScheduler;
this.refreshTrigger = refreshTrigger;
Expand All @@ -59,8 +60,64 @@ public Partitions getPartitions() {

/**
* Retrieve the {@link ClusterTopologyRefreshOptions.RefreshTrigger} that caused this event.
*
* @return the {@link ClusterTopologyRefreshOptions.RefreshTrigger} that caused this event.
*/
public ClusterTopologyRefreshOptions.RefreshTrigger getRefreshTrigger() {
return refreshTrigger;
}

/**
* Extension to {@link AdaptiveRefreshTriggeredEvent} providing the reconnect-attempt counter value.
*
* @since 6.2.3
*/
public static class PersistentReconnectsAdaptiveRefreshTriggeredEvent extends AdaptiveRefreshTriggeredEvent {

private final int attempt;

public PersistentReconnectsAdaptiveRefreshTriggeredEvent(Supplier<Partitions> partitionsSupplier,
Runnable topologyRefreshScheduler, int attempt) {
super(partitionsSupplier, topologyRefreshScheduler,
ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS);
this.attempt = attempt;
}

/**
* Return the reconnection-attempt at which this event was emitted.
*
* @return the reconnection-attempt at which this event was emitted.
*/
public int getAttempt() {
return attempt;
}

}

/**
* Extension to {@link AdaptiveRefreshTriggeredEvent} providing the uncovered slot value.
*
* @since 6.2.3
*/
public static class UncoveredSlotAdaptiveRefreshTriggeredEvent extends AdaptiveRefreshTriggeredEvent {

private final int slot;

public UncoveredSlotAdaptiveRefreshTriggeredEvent(Supplier<Partitions> partitionsSupplier,
Runnable topologyRefreshScheduler, int slot) {
super(partitionsSupplier, topologyRefreshScheduler, ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT);
this.slot = slot;
}

/**
* Return the slot that is not covered.
*
* @return the slot that is not covered.
*/
public int getSlot() {
return slot;
}

}

}

0 comments on commit c4b45d2

Please sign in to comment.