Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renew cluster slots strategy update #2643

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,30 @@ public class JedisClusterInfoCache {

private final GenericObjectPoolConfig<Connection> poolConfig;
private final JedisClientConfig clientConfig;
private final Set<HostAndPort> startNodes;

private static final int MASTER_NODE_INDEX = 2;

@Deprecated
public JedisClusterInfoCache(final JedisClientConfig clientConfig) {
this(clientConfig, new GenericObjectPoolConfig<Connection>());
}

@Deprecated
public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig) {
this(clientConfig, poolConfig, null);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, new GenericObjectPoolConfig<Connection>(), startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.startNodes = startNodes;
}

public void discoverClusterNodesAndSlots(Connection jedis) {
Expand Down Expand Up @@ -82,6 +95,7 @@ public void renewClusterSlots(Connection jedis) {
// If rediscovering is already in process - no need to start one more same rediscovering, just return
if (rediscoverLock.tryLock()) {
try {
// First, if jedis is available, use jedis renew.
if (jedis != null) {
try {
discoverClusterSlots(jedis);
Expand All @@ -91,20 +105,33 @@ public void renewClusterSlots(Connection jedis) {
}
}

// Then, we use startNodes to try, as long as startNodes is available,
// whether it is vip, domain, or physical ip, it will succeed.
if (startNodes != null) {
for (HostAndPort hostAndPort : startNodes) {
try (Connection j = new Connection(hostAndPort, clientConfig)) {
discoverClusterSlots(j);
return;
} catch (JedisConnectionException e) {
// try next nodes
}
}
}

// Finally, we go back to the ShuffledNodesPool and try the remaining physical nodes.
for (ConnectionPool jp : getShuffledNodesPool()) {
Connection j = null;
try {
j = jp.getResource();
try (Connection j = jp.getResource()) {
// If already tried in startNodes, skip this node.
if (startNodes != null && startNodes.contains(j.getHostAndPort())) {
continue;
}
discoverClusterSlots(j);
return;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (j != null) {
j.close();
}
}
}

} finally {
rediscoverLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public class ClusterConnectionProvider implements ConnectionProvider {
protected final JedisClusterInfoCache cache;

public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig) {
this.cache = new JedisClusterInfoCache(clientConfig);
this.cache = new JedisClusterInfoCache(clientConfig, clusterNodes);
initializeSlotsCache(clusterNodes, clientConfig);
}

public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this.cache = new JedisClusterInfoCache(clientConfig, poolConfig);
this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes);
initializeSlotsCache(clusterNodes, clientConfig);
}

Expand Down