diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 9325de80d7..8a31353258 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -435,6 +435,11 @@ private void initializeFromClientConfig(final JedisClientConfig config) { } } + // set readonly flag to ALL connections (including master nodes) when enable read from replica + if (config.isReadOnlyForReplica()) { + fireAndForgetMsg.add(new CommandArguments(Command.READONLY)); + } + for (CommandArguments arg : fireAndForgetMsg) { sendCommand(arg); } diff --git a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java index 6d62646a5e..98fa4677d8 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java @@ -26,11 +26,13 @@ public final class DefaultJedisClientConfig implements JedisClientConfig { private final ClientSetInfoConfig clientSetInfoConfig; + private final boolean readOnlyForReplica; + private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMillis, int soTimeoutMillis, int blockingSocketTimeoutMillis, Supplier credentialsProvider, int database, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper, - ClientSetInfoConfig clientSetInfoConfig) { + ClientSetInfoConfig clientSetInfoConfig, boolean readOnlyForReplica) { this.redisProtocol = protocol; this.connectionTimeoutMillis = connectionTimeoutMillis; this.socketTimeoutMillis = soTimeoutMillis; @@ -44,6 +46,7 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi this.hostnameVerifier = hostnameVerifier; this.hostAndPortMapper = hostAndPortMapper; this.clientSetInfoConfig = clientSetInfoConfig; + this.readOnlyForReplica = readOnlyForReplica; } @Override @@ -122,6 +125,11 @@ public ClientSetInfoConfig getClientSetInfoConfig() { return clientSetInfoConfig; } + @Override + public boolean isReadOnlyForReplica() { + return readOnlyForReplica; + } + public static Builder builder() { return new Builder(); } @@ -149,6 +157,8 @@ public static class Builder { private ClientSetInfoConfig clientSetInfoConfig = ClientSetInfoConfig.DEFAULT; + private boolean readOnlyForReplicas = false; + private Builder() { } @@ -160,7 +170,8 @@ public DefaultJedisClientConfig build() { return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis, blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl, - sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig); + sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig, + readOnlyForReplicas); } /** @@ -255,6 +266,11 @@ public Builder clientSetInfoConfig(ClientSetInfoConfig setInfoConfig) { this.clientSetInfoConfig = setInfoConfig; return this; } + + public Builder readOnlyForReplicas() { + this.readOnlyForReplicas = true; + return this; + } } public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int soTimeoutMillis, @@ -264,7 +280,8 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s return new DefaultJedisClientConfig(null, connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis, new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database, - clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null); + clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null, + false); } public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { @@ -273,6 +290,6 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { copy.getBlockingSocketTimeoutMillis(), copy.getCredentialsProvider(), copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(), copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(), - copy.getClientSetInfoConfig()); + copy.getClientSetInfoConfig(), copy.isReadOnlyForReplica()); } } diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java index 0ad6e979f6..57b172cb34 100644 --- a/src/main/java/redis/clients/jedis/JedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java @@ -80,6 +80,10 @@ default HostAndPortMapper getHostAndPortMapper() { return null; } + default boolean isReadOnlyForReplica() { + return false; + } + /** * Modify the behavior of internally executing CLIENT SETINFO command. * @return CLIENT SETINFO config diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 6c5843c16e..68d8f4205f 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -7,6 +7,7 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.executors.ClusterCommandExecutor; import redis.clients.jedis.providers.ClusterConnectionProvider; import redis.clients.jedis.util.JedisClusterCRC16; @@ -266,4 +267,11 @@ public ClusterPipeline pipelined() { public AbstractTransaction transaction(boolean doMulti) { throw new UnsupportedOperationException(); } + + public final T executeCommandToReplica(CommandObject commandObject) { + if (!(executor instanceof ClusterCommandExecutor)) { + throw new UnsupportedOperationException("Support only execute to replica in ClusterCommandExecutor"); + } + return ((ClusterCommandExecutor) executor).executeCommandToReplica(commandObject); + } } diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index da88462ef4..79be093ffb 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -38,6 +38,7 @@ public class JedisClusterInfoCache { private final Map nodes = new HashMap<>(); private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS]; private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS]; + private final List[] replicaSlots; private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); @@ -85,6 +86,11 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig, topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(), topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS); } + if (clientConfig.isReadOnlyForReplica()) { + replicaSlots = new ArrayList[Protocol.CLUSTER_HASHSLOTS]; + } else { + replicaSlots = null; + } } /** @@ -144,6 +150,8 @@ public void discoverClusterNodesAndSlots(Connection jedis) { setupNodeIfNotExist(targetNode); if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); + } else if (clientConfig.isReadOnlyForReplica()) { + assignSlotsToReplicaNode(slotNums, targetNode); } } } @@ -236,6 +244,8 @@ private void discoverClusterSlots(Connection jedis) { setupNodeIfNotExist(targetNode); if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); + } else if (clientConfig.isReadOnlyForReplica()) { + assignSlotsToReplicaNode(slotNums, targetNode); } } } @@ -307,6 +317,21 @@ public void assignSlotsToNode(List targetSlots, HostAndPort targetNode) } } + public void assignSlotsToReplicaNode(List targetSlots, HostAndPort targetNode) { + w.lock(); + try { + ConnectionPool targetPool = setupNodeIfNotExist(targetNode); + for (Integer slot : targetSlots) { + if (replicaSlots[slot] == null) { + replicaSlots[slot] = new ArrayList<>(); + } + replicaSlots[slot].add(targetPool); + } + } finally { + w.unlock(); + } + } + public ConnectionPool getNode(String nodeKey) { r.lock(); try { @@ -338,6 +363,15 @@ public HostAndPort getSlotNode(int slot) { } } + public List getSlotReplicaPools(int slot) { + r.lock(); + try { + return replicaSlots[slot]; + } finally { + r.unlock(); + } + } + public Map getNodes() { r.lock(); try { diff --git a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java index 375db99e14..e5049672b8 100644 --- a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java @@ -73,6 +73,14 @@ public final T broadcastCommand(CommandObject commandObject) { @Override public final T executeCommand(CommandObject commandObject) { + return doExecuteCommand(commandObject, false); + } + + public final T executeCommandToReplica(CommandObject commandObject) { + return doExecuteCommand(commandObject, true); + } + + private T doExecuteCommand(CommandObject commandObject, boolean toReplica) { Instant deadline = Instant.now().plus(maxTotalRetriesDuration); JedisRedirectionException redirect = null; @@ -88,7 +96,8 @@ public final T executeCommand(CommandObject commandObject) { connection.executeCommand(Protocol.Command.ASKING); } } else { - connection = provider.getConnection(commandObject.getArguments()); + connection = toReplica ? provider.getReplicaConnection(commandObject.getArguments()) + : provider.getConnection(commandObject.getArguments()); } return execute(connection, commandObject); diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index c21640713d..925645e169 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -6,6 +6,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.ClusterCommandArguments; @@ -102,6 +104,11 @@ public Connection getConnection(CommandArguments args) { return slot >= 0 ? getConnectionFromSlot(slot) : getConnection(); } + public Connection getReplicaConnection(CommandArguments args) { + final int slot = ((ClusterCommandArguments) args).getCommandHashSlot(); + return slot >= 0 ? getReplicaConnectionFromSlot(slot) : getConnection(); + } + @Override public Connection getConnection() { // In antirez's redis-rb-cluster implementation, getRandomConnection always @@ -158,6 +165,25 @@ public Connection getConnectionFromSlot(int slot) { } } + public Connection getReplicaConnectionFromSlot(int slot) { + List connectionPools = cache.getSlotReplicaPools(slot); + ThreadLocalRandom random = ThreadLocalRandom.current(); + if (connectionPools != null && !connectionPools.isEmpty()) { + // pick up randomly a connection + int idx = random.nextInt(connectionPools.size()); + return connectionPools.get(idx).getResource(); + } + + renewSlotCache(); + connectionPools = cache.getSlotReplicaPools(slot); + if (connectionPools != null && !connectionPools.isEmpty()) { + int idx = random.nextInt(connectionPools.size()); + return connectionPools.get(idx).getResource(); + } + + return getConnectionFromSlot(slot); + } + @Override public Map getConnectionMap() { return Collections.unmodifiableMap(getNodes()); diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index 8297eb90c6..2308bb4b07 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -199,6 +199,33 @@ public void testReadonlyAndReadwrite() throws Exception { nodeSlave2.flushDB(); } + @Test + public void testReadFromReplicas() throws Exception { + node1.clusterMeet(LOCAL_IP, nodeInfoSlave2.getPort()); + JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, nodeSlave2); + + for (String nodeInfo : node2.clusterNodes().split("\n")) { + if (nodeInfo.contains("myself")) { + nodeSlave2.clusterReplicate(nodeInfo.split(" ")[0]); + break; + } + } + + DefaultJedisClientConfig READ_REPLICAS_CLIENT_CONFIG + = DefaultJedisClientConfig.builder().password("cluster").readOnlyForReplicas().build(); + ClusterCommandObjects commandObjects = new ClusterCommandObjects(); + try (JedisCluster jedisCluster = new JedisCluster(nodeInfo1, READ_REPLICAS_CLIENT_CONFIG, + DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) { + assertEquals("OK", jedisCluster.set("test", "read-from-replicas")); + + assertEquals("read-from-replicas", jedisCluster.executeCommandToReplica(commandObjects.get("test"))); + // TODO: ensure data being served from replica node(s) + } + + nodeSlave2.clusterReset(ClusterResetType.SOFT); + nodeSlave2.flushDB(); + } + /** * slot->nodes 15363 node3 e */