Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support execute the read-only command on replica nodes #3848

Merged
merged 7 commits into from
Aug 12, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
@@ -435,6 +435,11 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
}
}

// set readonly flag to ALL connections (including master nodes) when enable read from replica
if (config.isReadOnlyForReplica()) {
fireAndForgetMsg.add(new CommandArguments(Command.READONLY));
}

for (CommandArguments arg : fireAndForgetMsg) {
sendCommand(arg);
}
25 changes: 21 additions & 4 deletions src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
@@ -26,11 +26,13 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {

private final ClientSetInfoConfig clientSetInfoConfig;

private final boolean readOnlyForReplica;

private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMillis, int soTimeoutMillis,
int blockingSocketTimeoutMillis, Supplier<RedisCredentials> credentialsProvider, int database,
String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper,
ClientSetInfoConfig clientSetInfoConfig) {
ClientSetInfoConfig clientSetInfoConfig, boolean readOnlyForReplica) {
this.redisProtocol = protocol;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.socketTimeoutMillis = soTimeoutMillis;
@@ -44,6 +46,7 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi
this.hostnameVerifier = hostnameVerifier;
this.hostAndPortMapper = hostAndPortMapper;
this.clientSetInfoConfig = clientSetInfoConfig;
this.readOnlyForReplica = readOnlyForReplica;
}

@Override
@@ -122,6 +125,11 @@ public ClientSetInfoConfig getClientSetInfoConfig() {
return clientSetInfoConfig;
}

@Override
public boolean isReadOnlyForReplica() {
return readOnlyForReplica;
}

public static Builder builder() {
return new Builder();
}
@@ -149,6 +157,8 @@ public static class Builder {

private ClientSetInfoConfig clientSetInfoConfig = ClientSetInfoConfig.DEFAULT;

private boolean readOnlyForReplicas = false;

private Builder() {
}

@@ -160,7 +170,8 @@ public DefaultJedisClientConfig build() {

return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis,
blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl,
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig);
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig,
readOnlyForReplicas);
}

/**
@@ -255,6 +266,11 @@ public Builder clientSetInfoConfig(ClientSetInfoConfig setInfoConfig) {
this.clientSetInfoConfig = setInfoConfig;
return this;
}

public Builder readOnlyForReplicas() {
this.readOnlyForReplicas = true;
return this;
}
}

public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int soTimeoutMillis,
@@ -264,7 +280,8 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s
return new DefaultJedisClientConfig(null,
connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis,
new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database,
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null);
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null,
false);
}

public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
@@ -273,6 +290,6 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
copy.getBlockingSocketTimeoutMillis(), copy.getCredentialsProvider(),
copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(),
copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(),
copy.getClientSetInfoConfig());
copy.getClientSetInfoConfig(), copy.isReadOnlyForReplica());
}
}
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClientConfig.java
Original file line number Diff line number Diff line change
@@ -80,6 +80,10 @@ default HostAndPortMapper getHostAndPortMapper() {
return null;
}

default boolean isReadOnlyForReplica() {
return false;
}

/**
* Modify the behavior of internally executing CLIENT SETINFO command.
* @return CLIENT SETINFO config
38 changes: 38 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
@@ -38,6 +38,8 @@ public class JedisClusterInfoCache {
private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
private List<ConnectionPool>[] replicaSlots;
private List<HostAndPort>[] replicaSlotNodes;
jjz921024 marked this conversation as resolved.
Show resolved Hide resolved
jjz921024 marked this conversation as resolved.
Show resolved Hide resolved

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
@@ -85,6 +87,10 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
if (clientConfig.isReadOnlyForReplica()) {
replicaSlots = new ArrayList[Protocol.CLUSTER_HASHSLOTS];
replicaSlotNodes = new ArrayList[Protocol.CLUSTER_HASHSLOTS];
}
jjz921024 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
@@ -144,6 +150,8 @@ public void discoverClusterNodesAndSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForReplica()) {
assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
@@ -236,6 +244,8 @@ private void discoverClusterSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForReplica()) {
assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
@@ -307,6 +317,25 @@ public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode)
}
}

public void assignSlotsToReplicaNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
ConnectionPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
if (replicaSlots[slot] == null) {
replicaSlots[slot] = new ArrayList<>();
}
replicaSlots[slot].add(targetPool);
if (replicaSlotNodes[slot] == null) {
replicaSlotNodes[slot] = new ArrayList<>();
}
replicaSlotNodes[slot].add(targetNode);
}
} finally {
w.unlock();
}
}

public ConnectionPool getNode(String nodeKey) {
r.lock();
try {
@@ -338,6 +367,15 @@ public HostAndPort getSlotNode(int slot) {
}
}

public List<ConnectionPool> getSlotReplicaPools(int slot) {
r.lock();
try {
return replicaSlots[slot];
} finally {
r.unlock();
}
}

public Map<String, ConnectionPool> getNodes() {
r.lock();
try {
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
@@ -256,6 +256,10 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
return executor.executeCommand(commandObject);
}

public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
jjz921024 marked this conversation as resolved.
Show resolved Hide resolved
return executor.executeCommandToReplica(commandObject);
}

public final <T> T broadcastCommand(CommandObject<T> commandObject) {
return executor.broadcastCommand(commandObject);
}
Original file line number Diff line number Diff line change
@@ -73,6 +73,15 @@ public final <T> T broadcastCommand(CommandObject<T> commandObject) {

@Override
public final <T> T executeCommand(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, false);
}

@Override
public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, true);
}

private <T> T doExecuteCommand(CommandObject<T> commandObject, boolean toReplica) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);

JedisRedirectionException redirect = null;
@@ -88,7 +97,8 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
connection.executeCommand(Protocol.Command.ASKING);
}
} else {
connection = provider.getConnection(commandObject.getArguments());
connection = toReplica ? provider.getReplicaConnection(commandObject.getArguments())
: provider.getConnection(commandObject.getArguments());
}

return execute(connection, commandObject);
Original file line number Diff line number Diff line change
@@ -6,6 +6,10 @@ public interface CommandExecutor extends AutoCloseable {

<T> T executeCommand(CommandObject<T> commandObject);

default <T> T executeCommandToReplica(CommandObject<T> commandObject) {
return executeCommand(commandObject);
}

jjz921024 marked this conversation as resolved.
Show resolved Hide resolved
default <T> T broadcastCommand(CommandObject<T> commandObject) {
return executeCommand(commandObject);
}
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.ClusterCommandArguments;
@@ -102,6 +104,11 @@ public Connection getConnection(CommandArguments args) {
return slot >= 0 ? getConnectionFromSlot(slot) : getConnection();
}

public Connection getReplicaConnection(CommandArguments args) {
final int slot = ((ClusterCommandArguments) args).getCommandHashSlot();
return slot >= 0 ? getReplicaConnectionFromSlot(slot) : getConnection();
}

@Override
public Connection getConnection() {
// In antirez's redis-rb-cluster implementation, getRandomConnection always
@@ -158,6 +165,25 @@ public Connection getConnectionFromSlot(int slot) {
}
}

public Connection getReplicaConnectionFromSlot(int slot) {
List<ConnectionPool> connectionPools = cache.getSlotReplicaPools(slot);
ThreadLocalRandom random = ThreadLocalRandom.current();
if (connectionPools != null && !connectionPools.isEmpty()) {
// pick up randomly a connection
int idx = random.nextInt(connectionPools.size());
return connectionPools.get(idx).getResource();
}

renewSlotCache();
connectionPools = cache.getSlotReplicaPools(slot);
if (connectionPools != null && !connectionPools.isEmpty()) {
int idx = random.nextInt(connectionPools.size());
return connectionPools.get(idx).getResource();
}

return getConnectionFromSlot(slot);
}

@Override
public Map<String, ConnectionPool> getConnectionMap() {
return Collections.unmodifiableMap(getNodes());
26 changes: 26 additions & 0 deletions src/test/java/redis/clients/jedis/JedisClusterTest.java
Original file line number Diff line number Diff line change
@@ -199,6 +199,32 @@ public void testReadonlyAndReadwrite() throws Exception {
nodeSlave2.flushDB();
}

@Test
public void testReadFromReplicas() throws Exception {
node1.clusterMeet(LOCAL_IP, nodeInfoSlave2.getPort());
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, nodeSlave2);

for (String nodeInfo : node2.clusterNodes().split("\n")) {
if (nodeInfo.contains("myself")) {
nodeSlave2.clusterReplicate(nodeInfo.split(" ")[0]);
break;
}
}

DefaultJedisClientConfig READ_REPLICAS_CLIENT_CONFIG
= DefaultJedisClientConfig.builder().password("cluster").readOnlyForReplicas().build();
try (JedisCluster jedisCluster = new JedisCluster(nodeInfo1, READ_REPLICAS_CLIENT_CONFIG,
DEFAULT_REDIRECTIONS, DEFAULT_POOL_CONFIG)) {
assertEquals("OK", jedisCluster.set("test", "read-from-replicas"));

ClusterCommandObjects commandObjects = new ClusterCommandObjects();
assertEquals("read-from-replicas", jedisCluster.executeCommandToReplica(commandObjects.get("test")));
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
}

nodeSlave2.clusterReset(ClusterResetType.SOFT);
nodeSlave2.flushDB();
}

/**
* slot->nodes 15363 node3 e
*/