diff --git a/src/main/java/io/lettuce/core/cluster/RoundRobin.java b/src/main/java/io/lettuce/core/cluster/RoundRobin.java index 6741b1d9bd..b78c039e22 100644 --- a/src/main/java/io/lettuce/core/cluster/RoundRobin.java +++ b/src/main/java/io/lettuce/core/cluster/RoundRobin.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.function.BiFunction; /** * Circular element provider. This class allows infinite scrolling over a collection with the possibility to provide an initial @@ -31,6 +32,16 @@ class RoundRobin { protected volatile V offset; + private final BiFunction hasElementChanged; + + public RoundRobin() { + this((a, b) -> false); + } + + public RoundRobin(BiFunction hasElementChanged) { + this.hasElementChanged = hasElementChanged; + } + /** * Return whether this {@link RoundRobin} is still consistent and contains all items from the leader {@link Collection} and * vice versa. @@ -42,7 +53,19 @@ public boolean isConsistent(Collection leader) { Collection collection = this.collection; - return collection.containsAll(leader) && leader.containsAll(collection); + for (V currentElement : collection) { + boolean found = false; + for (V searchedElement : leader) { + if (searchedElement.equals(currentElement) && !hasElementChanged.apply(currentElement, searchedElement)) { + found = true; + } + } + if (!found) { + return false; + } + } + + return collection.size() == leader.size(); } /** diff --git a/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java b/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java index 0b7c3d7a44..77ec415fcd 100644 --- a/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java +++ b/src/main/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplier.java @@ -52,7 +52,7 @@ public RoundRobinSocketAddressSupplier(Supplier partitions, LettuceAssert.notNull(sortFunction, "Sort-Function must not be null"); this.partitions = partitions; - this.roundRobin = new RoundRobin<>(); + this.roundRobin = new RoundRobin<>((a, b) -> !a.getUri().equals(b.getUri())); this.sortFunction = (Function) sortFunction; this.clientResources = clientResources; resetRoundRobin(partitions.get()); diff --git a/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java b/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java index 26c7e96097..17ea1d643d 100644 --- a/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/RoundRobinSocketAddressSupplierUnitTests.java @@ -46,11 +46,13 @@ class RoundRobinSocketAddressSupplierUnitTests { private static RedisURI hap2 = new RedisURI("127.0.0.1", 2, Duration.ofSeconds(1)); private static RedisURI hap3 = new RedisURI("127.0.0.1", 3, Duration.ofSeconds(1)); private static RedisURI hap4 = new RedisURI("127.0.0.1", 4, Duration.ofSeconds(1)); + private static RedisURI hap5 = new RedisURI("127.0.0.0", 5, Duration.ofSeconds(1)); private static InetSocketAddress addr1 = new InetSocketAddress(hap1.getHost(), hap1.getPort()); private static InetSocketAddress addr2 = new InetSocketAddress(hap2.getHost(), hap2.getPort()); private static InetSocketAddress addr3 = new InetSocketAddress(hap3.getHost(), hap3.getPort()); private static InetSocketAddress addr4 = new InetSocketAddress(hap4.getHost(), hap4.getPort()); + private static InetSocketAddress addr5 = new InetSocketAddress(hap5.getHost(), hap5.getPort()); private static Partitions partitions; @@ -85,6 +87,24 @@ void noOffset() { assertThat(sut.get()).isNotEqualTo(addr3); } + @Test + void nodeIPChanges() { + + RoundRobinSocketAddressSupplier sut = new RoundRobinSocketAddressSupplier(() -> partitions, + redisClusterNodes -> redisClusterNodes, clientResourcesMock); + + assertThat(sut.get()).isEqualTo(addr1); + + assertThat(partitions.remove(new RedisClusterNode(hap1, "2", true, "", 0, 0, 0, new ArrayList<>(), new HashSet<>()))) + .isTrue(); + assertThat(partitions.add(new RedisClusterNode(hap5, "2", true, "", 0, 0, 0, new ArrayList<>(), new HashSet<>()))) + .isTrue(); + + assertThat(sut.get()).isEqualTo(addr1); + assertThat(sut.get()).isEqualTo(addr3); + assertThat(sut.get()).isEqualTo(addr5); + } + @Test void partitionTableChangesNewNode() {