Skip to content

Commit

Permalink
Support CSC in sentinel mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Jan 17, 2024
1 parent e1f86c2 commit fbb24aa
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 15 deletions.
6 changes: 5 additions & 1 deletion src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ public ClientSideCache() {
this.cache = map;
}

public final void clear() {
cache.clear();
}

public final void invalidateKeys(List list) {
if (list == null) {
cache.clear();
clear();
return;
}

Expand Down
10 changes: 4 additions & 6 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.executors.ClusterCommandExecutor;

import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -193,14 +191,14 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
this(clusterNodes, clientConfig, DEFAULT_MAX_ATTEMPTS, poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
int maxAttempts, GenericObjectPoolConfig<Connection> poolConfig) {
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
GenericObjectPoolConfig<Connection> poolConfig) {
this(clusterNodes, clientConfig, maxAttempts,
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts), poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ private void discoverClusterSlots(Connection jedis) {
try {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
}
Set<String> hostAndPortKeys = new HashSet<>();

for (Object slotInfoObj : slotsInfo) {
Expand Down
1 change: 0 additions & 1 deletion src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,26 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
masterClientConfig.getRedisProtocol());
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCache clientSideCache,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache,
sentinels, sentinelClientConfig), masterClientConfig.getRedisProtocol(), clientSideCache);
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig),
masterClientConfig.getRedisProtocol());
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCache clientSideCache,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache, poolConfig,
sentinels, sentinelClientConfig), masterClientConfig.getRedisProtocol(), clientSideCache);
}

public JedisSentineled(SentineledConnectionProvider sentineledConnectionProvider) {
super(sentineledConnectionProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.ClientSideCache;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.Connection;
import redis.clients.jedis.ConnectionPool;
Expand All @@ -35,6 +36,8 @@ public class SentineledConnectionProvider implements ConnectionProvider {

private final JedisClientConfig masterClientConfig;

private final ClientSideCache clientSideCache;

private final GenericObjectPoolConfig<Connection> masterPoolConfig;

protected final Collection<SentinelListener> sentinelListeners = new ArrayList<>();
Expand All @@ -47,7 +50,12 @@ public class SentineledConnectionProvider implements ConnectionProvider {

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, /*poolConfig*/ null, sentinels, sentinelClientConfig);
this(masterName, masterClientConfig, null, null, sentinels, sentinelClientConfig);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
ClientSideCache clientSideCache, Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, clientSideCache, null, sentinels, sentinelClientConfig);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Expand All @@ -57,13 +65,28 @@ public SentineledConnectionProvider(String masterName, final JedisClientConfig m
DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
ClientSideCache clientSideCache, final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, clientSideCache, poolConfig, sentinels, sentinelClientConfig,
DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig,
final long subscribeRetryWaitTimeMillis) {
this(masterName, masterClientConfig, null, poolConfig, sentinels, sentinelClientConfig, subscribeRetryWaitTimeMillis);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
ClientSideCache clientSideCache, final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig,
final long subscribeRetryWaitTimeMillis) {

this.masterName = masterName;
this.masterClientConfig = masterClientConfig;
this.clientSideCache = clientSideCache;
this.masterPoolConfig = poolConfig;

this.sentinelClientConfig = sentinelClientConfig;
Expand Down Expand Up @@ -99,13 +122,14 @@ private void initMaster(HostAndPort master) {
if (!master.equals(currentMaster)) {
currentMaster = master;

ConnectionPool newPool = masterPoolConfig != null
? new ConnectionPool(currentMaster, masterClientConfig, masterPoolConfig)
: new ConnectionPool(currentMaster, masterClientConfig);
ConnectionPool newPool = createNodePool(currentMaster);

ConnectionPool existingPool = pool;
pool = newPool;
LOG.info("Created connection pool to master at {}.", master);
if (clientSideCache != null) {
clientSideCache.clear();
}

if (existingPool != null) {
// although we clear the pool, we still have to check the returned object in getResource,
Expand All @@ -117,6 +141,22 @@ private void initMaster(HostAndPort master) {
}
}

private ConnectionPool createNodePool(HostAndPort master) {
if (masterPoolConfig == null) {
if (clientSideCache == null) {
return new ConnectionPool(master, masterClientConfig);
} else {
return new ConnectionPool(master, masterClientConfig, clientSideCache);
}
} else {
if (clientSideCache == null) {
return new ConnectionPool(master, masterClientConfig, masterPoolConfig);
} else {
return new ConnectionPool(master, masterClientConfig, clientSideCache, masterPoolConfig);
}
}
}

private HostAndPort initSentinels(Set<HostAndPort> sentinels) {

HostAndPort master = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package redis.clients.jedis;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.hamcrest.Matchers;
import org.junit.Test;

public class JedisSentineledClientSideCacheTest {

private static final String MASTER_NAME = "mymaster";

protected static final HostAndPort sentinel1 = HostAndPorts.getSentinelServers().get(1);
protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(3);

private static final Set<HostAndPort> sentinels = Arrays.asList(sentinel1, sentinel2).stream().collect(Collectors.toSet());

private static final JedisClientConfig masterClientConfig = DefaultJedisClientConfig.builder().resp3().password("foobared").build();

private static final JedisClientConfig sentinelClientConfig = DefaultJedisClientConfig.builder().resp3().build();

private static final Supplier<GenericObjectPoolConfig<Connection>> singleConnectionPoolConfig
= () -> {
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(1);
return poolConfig;
};

@Test
public void simple() {
try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new ClientSideCache(), sentinels, sentinelClientConfig)) {
jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo"));
jedis.del("foo");
assertThat(jedis.get("foo"), Matchers.oneOf("bar", null)); // ?
}
}

@Test
public void simpleWithSimpleMap() {
HashMap<ByteBuffer, Object> map = new HashMap<>();
try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new ClientSideCache(map), sentinels, sentinelClientConfig)) {
jedis.set("foo", "bar");
assertThat(map, Matchers.aMapWithSize(0));
assertEquals("bar", jedis.get("foo"));
assertThat(map, Matchers.aMapWithSize(1));
jedis.del("foo");
assertThat(map, Matchers.aMapWithSize(1));
assertEquals("bar", jedis.get("foo"));
assertThat(map, Matchers.aMapWithSize(1));
jedis.ping();
assertThat(map, Matchers.aMapWithSize(0));
assertNull(jedis.get("foo"));
assertThat(map, Matchers.aMapWithSize(0));
}
}

@Test
public void flushAll() {
try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new ClientSideCache(), sentinels, sentinelClientConfig)) {
jedis.set("foo", "bar");
assertEquals("bar", jedis.get("foo"));
jedis.flushAll();
assertThat(jedis.get("foo"), Matchers.oneOf("bar", null)); // ?
}
}

@Test
public void flushAllWithSimpleMap() {
HashMap<ByteBuffer, Object> map = new HashMap<>();
try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, masterClientConfig, new ClientSideCache(map), sentinels, sentinelClientConfig)) {
jedis.set("foo", "bar");
assertThat(map, Matchers.aMapWithSize(0));
assertEquals("bar", jedis.get("foo"));
assertThat(map, Matchers.aMapWithSize(1));
jedis.flushAll();
assertThat(map, Matchers.aMapWithSize(1));
assertEquals("bar", jedis.get("foo"));
assertThat(map, Matchers.aMapWithSize(1));
jedis.ping();
assertThat(map, Matchers.aMapWithSize(0));
assertNull(jedis.get("foo"));
assertThat(map, Matchers.aMapWithSize(0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ public class SentineledConnectionProviderTest {

private static final String MASTER_NAME = "mymaster";

//protected static HostAndPort master = HostAndPorts.getRedisServers().get(2);
//protected static HostAndPort slave1 = HostAndPorts.getRedisServers().get(3);

protected static final HostAndPort sentinel1 = HostAndPorts.getSentinelServers().get(1);
protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(3);

Expand Down

0 comments on commit fbb24aa

Please sign in to comment.