Skip to content

Commit

Permalink
Introducing periodic topology mechanism for JedisCluster (redis#3596)
Browse files Browse the repository at this point in the history
The main changes in this PR are as follows:

1. Add `topologyRefreshEnabled` and `topologyRefreshPeriod` to control the periodic topology refresh mechanism.
2. `topologyRefreshExecutor` is a single-threaded executor responsible for running `TopologyRefreshTask`.
3. Add `checkClusterSlotSequence` to check whether the cluster slots returned by the server are consecutive and there are no duplicates.
4. [Test] In JedisClusterTestBase, `CLUSTER RESET SOFT` was changed to `HARD`, because SOFT cannot clean up the Redis Cluster configuration and may cause a crash. Please refer to redis/redis#12701
5. [Test] Adjust the cluster-node-timeout in the Makefile to 15s, consistent with the default configuration of Redis.

---------

* Introducing periodic topology mechanism for JedisCluster

* Apply suggestions from sazzad16

Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>

* Remove topologyRefreshEnabled

* Apply suggestions from sazzad16

Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>

* remove save topologyRefreshPeriod

* Move INIT_NO_ERROR_PROPERTY to JedisCluster

* add timeout for clusterPeriodTopologyRefreshTest

* Update src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java

Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>

---------

Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com>
  • Loading branch information
yangbodong22011 and sazzad16 committed Nov 3, 2023
1 parent e247f63 commit ce27d48
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 17 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7379
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node1.pid
logfile /tmp/redis_cluster_node1.log
save ""
Expand All @@ -223,7 +223,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7380
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node2.pid
logfile /tmp/redis_cluster_node2.log
save ""
Expand All @@ -237,7 +237,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7381
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node3.pid
logfile /tmp/redis_cluster_node3.log
save ""
Expand All @@ -251,7 +251,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7382
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node4.pid
logfile /tmp/redis_cluster_node4.log
save ""
Expand All @@ -265,7 +265,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7383
cluster-node-timeout 5000
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node5.pid
logfile /tmp/redis_cluster_node5.log
save ""
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
Expand All @@ -21,6 +22,12 @@ public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientCo
this.closeable = this.provider;
}

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod));
this.closeable = this.provider;
}

public ClusterPipeline(ClusterConnectionProvider provider) {
super(new ClusterCommandObjects());
this.provider = provider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ private Socket connectToFirstSuccessfulHost(HostAndPort hostAndPort) throws Exce
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediately

socket.connect(new InetSocketAddress(host.getHostAddress(), hostAndPort.getPort()), connectionTimeout);
// Passing 'host' directly will avoid another call to InetAddress.getByName() inside the InetSocketAddress constructor.
// For machines with ipv4 and ipv6, but the startNode uses ipv4 to connect, the ipv6 connection may fail.
socket.connect(new InetSocketAddress(host, hostAndPort.getPort()), connectionTimeout);
return socket;
} catch (Exception e) {
jce.addSuppressed(e);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

public class JedisCluster extends UnifiedJedis {

public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";

/**
* Default timeout in milliseconds.
*/
Expand Down Expand Up @@ -192,6 +194,13 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
Expand Down
85 changes: 81 additions & 4 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -10,17 +11,26 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;

public class JedisClusterInfoCache {

private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class);

private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
Expand All @@ -36,6 +46,20 @@ public class JedisClusterInfoCache {

private static final int MASTER_NODE_INDEX = 2;

/**
* The single thread executor for the topology refresh task.
*/
private ScheduledExecutorService topologyRefreshExecutor = null;

class TopologyRefreshTask implements Runnable {
@Override
public void run() {
logger.debug("Cluster topology refresh run, old nodes: {}", nodes.keySet());
renewClusterSlots(null);
logger.debug("Cluster topology refresh run, new nodes: {}", nodes.keySet());
}
}

@Deprecated
public JedisClusterInfoCache(final JedisClientConfig clientConfig) {
this(clientConfig, new GenericObjectPoolConfig<Connection>());
Expand All @@ -53,15 +77,55 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<Hos

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, poolConfig, startNodes, null);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.startNodes = startNodes;
if (topologyRefreshPeriod != null) {
logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes);
topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor();
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
}

/**
* Check whether the number and order of slots in the cluster topology are equal to CLUSTER_HASHSLOTS
* @param slotsInfo the cluster topology
* @return if slots is ok, return true, elese return false.
*/
private boolean checkClusterSlotSequence(List<Object> slotsInfo) {
List<Integer> slots = new ArrayList<>();
for (Object slotInfoObj : slotsInfo) {
List<Object> slotInfo = (List<Object>)slotInfoObj;
slots.addAll(getAssignedSlotArray(slotInfo));
}
Collections.sort(slots);
if (slots.size() != Protocol.CLUSTER_HASHSLOTS) {
return false;
}
for (int i = 0; i < Protocol.CLUSTER_HASHSLOTS; ++i) {
if (i != slots.get(i)) {
return false;
}
}
return true;
}

public void discoverClusterNodesAndSlots(Connection jedis) {
List<Object> slotsInfo = executeClusterSlots(jedis);
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) {
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
}
if (!checkClusterSlotSequence(slotsInfo)) {
throw new JedisClusterOperationException("Cluster slots have holes.");
}
}
w.lock();
try {
Expand Down Expand Up @@ -144,8 +208,13 @@ public void renewClusterSlots(Connection jedis) {

private void discoverClusterSlots(Connection jedis) {
List<Object> slotsInfo = executeClusterSlots(jedis);
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) {
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
}
if (!checkClusterSlotSequence(slotsInfo)) {
throw new JedisClusterOperationException("Cluster slots have holes.");
}
}
w.lock();
try {
Expand Down Expand Up @@ -319,6 +388,14 @@ public void reset() {
}
}

public void close() {
reset();
if (topologyRefreshExecutor != null) {
logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes);
topologyRefreshExecutor.shutdownNow();
}
}

public static String getNodeKey(HostAndPort hnp) {
//return hnp.getHost() + ":" + hnp.getPort();
return hnp.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis.providers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -17,9 +18,9 @@
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;

public class ClusterConnectionProvider implements ConnectionProvider {
import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;

private static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";
public class ClusterConnectionProvider implements ConnectionProvider {

protected final JedisClusterInfoCache cache;

Expand All @@ -34,6 +35,12 @@ public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfi
initializeSlotsCache(clusterNodes, clientConfig);
}

public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshPeriod);
initializeSlotsCache(clusterNodes, clientConfig);
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig clientConfig) {
if (startNodes.isEmpty()) {
throw new JedisClusterOperationException("No nodes to initialize cluster slots cache.");
Expand Down Expand Up @@ -66,7 +73,7 @@ private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig

@Override
public void close() {
cache.reset();
cache.close();
}

public void renewSlotCache() {
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/redis/clients/jedis/JedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,65 @@ public void clusterRefreshNodes() throws Exception {
}
}

@Test(timeout = 30_000)
public void clusterPeriodTopologyRefreshTest() throws Exception {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(nodeInfo1);
jedisClusterNode.add(nodeInfo2);
jedisClusterNode.add(nodeInfo3);

// we set topologyRefreshPeriod is 1s
Duration topologyRefreshPeriod = Duration.ofSeconds(1);
try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG,
topologyRefreshPeriod, DEFAULT_REDIRECTIONS, Duration.ofSeconds(10))) {
assertEquals(3, cluster.getClusterNodes().size());
cleanUp(); // cleanup and add node4

// at first, join node4 to cluster
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
// split available slots across the three nodes
int slotsPerNode = CLUSTER_HASHSLOTS / 4;
int[] node1Slots = new int[slotsPerNode];
int[] node2Slots = new int[slotsPerNode];
int[] node3Slots = new int[slotsPerNode];
int[] node4Slots = new int[slotsPerNode];
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) {
if (i < slotsPerNode) {
node1Slots[slot1++] = i;
} else if (i >= slotsPerNode && i < slotsPerNode*2) {
node2Slots[slot2++] = i;
} else if (i >= slotsPerNode*2 && i < slotsPerNode*3) {
node3Slots[slot3++] = i;
} else {
node4Slots[slot4++] = i;
}
}

node1.clusterAddSlots(node1Slots);
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
node4.clusterAddSlots(node4Slots);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4);

// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (3 -> 4)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);

assertEquals(4, cluster.getClusterNodes().size());
String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort();
assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4));

// make 4 nodes to 3 nodes
cleanUp();
setUp();

// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (4 -> 3)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);
assertEquals(3, cluster.getClusterNodes().size());
}
}

private static String getNodeServingSlotRange(String infoOutput) {
// f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0
// 1394372400827 0 connected 5461-10922
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/redis/clients/jedis/JedisClusterTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ protected void cleanUp() {
node2.flushDB();
node3.flushDB();
node4.flushDB();
node1.clusterReset(ClusterResetType.SOFT);
node2.clusterReset(ClusterResetType.SOFT);
node3.clusterReset(ClusterResetType.SOFT);
node4.clusterReset(ClusterResetType.SOFT);
node1.clusterReset(ClusterResetType.HARD);
node2.clusterReset(ClusterResetType.HARD);
node3.clusterReset(ClusterResetType.HARD);
node4.clusterReset(ClusterResetType.HARD);
}

@After
Expand Down

0 comments on commit ce27d48

Please sign in to comment.