Skip to content

Commit

Permalink
Renew cluster slots strategy update
Browse files Browse the repository at this point in the history
backported from redis#2643
  • Loading branch information
yangbodong22011 committed Apr 12, 2023
1 parent dd4cd8d commit 7cb5020
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final JedisClientConfig seedNodesClientConfig,
final GenericObjectPoolConfig<Jedis> poolConfig,
final JedisClientConfig clusterNodesClientConfig) {
this.cache = new JedisClusterInfoCache(poolConfig, clusterNodesClientConfig);
this.cache = new JedisClusterInfoCache(poolConfig, clusterNodesClientConfig, nodes);
initializeSlotsCache(nodes, seedNodesClientConfig);
}

public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig<Jedis> poolConfig, final JedisClientConfig clientConfig) {
this.cache = new JedisClusterInfoCache(poolConfig, clientConfig);
this.cache = new JedisClusterInfoCache(poolConfig, clientConfig, nodes);
initializeSlotsCache(nodes, clientConfig);
}

Expand Down
35 changes: 26 additions & 9 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class JedisClusterInfoCache {

private final GenericObjectPoolConfig<Jedis> poolConfig;
private final JedisClientConfig clientConfig;
private final Set<HostAndPort> startNodes;

private static final int MASTER_NODE_INDEX = 2;

Expand Down Expand Up @@ -134,13 +135,14 @@ public JedisClusterInfoCache(final GenericObjectPoolConfig<Jedis> poolConfig,
.blockingSocketTimeoutMillis(infiniteSoTimeout).user(user).password(password)
.clientName(clientName).ssl(ssl).sslSocketFactory(sslSocketFactory)
.sslParameters(sslParameters).hostnameVerifier(hostnameVerifier)
.hostAndPortMapper(hostAndPortMap).build());
.hostAndPortMapper(hostAndPortMap).build(), null);
}

public JedisClusterInfoCache(final GenericObjectPoolConfig<Jedis> poolConfig,
final JedisClientConfig clientConfig) {
final JedisClientConfig clientConfig, final Set<HostAndPort> startNodes) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.startNodes = startNodes;
}

public void discoverClusterNodesAndSlots(Jedis jedis) {
Expand Down Expand Up @@ -181,6 +183,7 @@ public void renewClusterSlots(Jedis jedis) {
// If rediscovering is already in process - no need to start one more same rediscovering, just return
if (rediscoverLock.tryLock()) {
try {
// First, if jedis is available, use jedis renew.
if (jedis != null) {
try {
discoverClusterSlots(jedis);
Expand All @@ -190,20 +193,34 @@ public void renewClusterSlots(Jedis jedis) {
}
}

// Then, we use startNodes to try, as long as startNodes is available,
// whether it is vip, domain, or physical ip, it will succeed.
if (startNodes != null) {
for (HostAndPort hostAndPort : startNodes) {
try (Jedis j = new Jedis(hostAndPort, clientConfig)) {
discoverClusterSlots(j);
return;
} catch (JedisConnectionException e) {
// try next nodes
}
}
}

// Finally, we go back to the ShuffledNodesPool and try the remaining physical nodes.
for (JedisPool jp : getShuffledNodesPool()) {
Jedis j = null;
try {
j = jp.getResource();
try (Jedis j = jp.getResource()) {
// If already tried in startNodes, skip this node.
HostAndPort hostAndPort = new HostAndPort(j.getClient().getHost(), j.getClient().getPort());
if (startNodes != null && startNodes.contains(hostAndPort)) {
continue;
}
discoverClusterSlots(j);
return;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (j != null) {
j.close();
}
}
}

} finally {
rediscoverLock.unlock();
}
Expand Down

0 comments on commit 7cb5020

Please sign in to comment.