Skip to content

Commit

Permalink
Support client-side caching from UnifiedJedis (#3691)
Browse files Browse the repository at this point in the history
* Support client side caching from UnifiedJedis

* Support client side caching as a separate parameter

* format imports

* Support CSC in sentinel mode

* undo change
  • Loading branch information
sazzad16 authored Jan 17, 2024
1 parent d87fc6e commit 3ab6bdc
Show file tree
Hide file tree
Showing 21 changed files with 572 additions and 239 deletions.
21 changes: 17 additions & 4 deletions src/main/java/redis/clients/jedis/ClientSideCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,27 @@

public class ClientSideCache {

private final Map<ByteBuffer, Object> cache = new HashMap<>();
private final Map<ByteBuffer, Object> cache;

protected ClientSideCache() {
public ClientSideCache() {
this.cache = new HashMap<>();
}

protected void invalidateKeys(List list) {
/**
* For testing purpose only.
* @param map
*/
ClientSideCache(Map<ByteBuffer, Object> map) {
this.cache = map;
}

public final void clear() {
cache.clear();
}

public final void invalidateKeys(List list) {
if (list == null) {
cache.clear();
clear();
return;
}

Expand Down
35 changes: 26 additions & 9 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ public Connection(final HostAndPort hostAndPort) {
}

public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig));
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig);
}

public Connection(final JedisSocketFactory socketFactory) {
Expand All @@ -65,7 +63,15 @@ public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clie
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
initializeConnection(clientConfig);
}

public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, ClientSideCache csCache) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeConnection(clientConfig);
initializeClientSideCache(csCache);
}

@Override
Expand Down Expand Up @@ -122,10 +128,6 @@ public void rollbackTimeout() {
}
}

final void setClientSideCache(ClientSideCache clientSideCache) {
this.clientSideCache = clientSideCache;
}

public Object executeCommand(final ProtocolCommand cmd) {
return executeCommand(new CommandArguments(cmd));
}
Expand Down Expand Up @@ -389,7 +391,7 @@ private static boolean validateClientInfo(String info) {
return true;
}

private void initializeFromClientConfig(final JedisClientConfig config) {
private void initializeConnection(final JedisClientConfig config) {
try {
connect();

Expand Down Expand Up @@ -516,4 +518,19 @@ public boolean ping() {
}
return true;
}

private void initializeClientSideCache(ClientSideCache csCache) {
this.clientSideCache = csCache;
if (clientSideCache != null) {
if (protocol != RedisProtocol.RESP3) {
throw new JedisException("Client side caching is only supported with RESP3.");
}

sendCommand(Protocol.Command.CLIENT, "TRACKING", "ON");
String reply = getStatusCodeReply();
if (!"OK".equals(reply)) {
throw new JedisException("Could not enable client tracking. Reply: " + reply);
}
}
}
}
18 changes: 13 additions & 5 deletions src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@ public class ConnectionFactory implements PooledObjectFactory<Connection> {
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);

private final JedisSocketFactory jedisSocketFactory;

private final JedisClientConfig clientConfig;
private ClientSideCache clientSideCache = null;

public ConnectionFactory(final HostAndPort hostAndPort) {
this.clientConfig = DefaultJedisClientConfig.builder().build();
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort);
}

public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
}

public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
this.clientSideCache = csCache;
}

public ConnectionFactory(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
this.clientConfig = clientConfig;
this.jedisSocketFactory = jedisSocketFactory;
}

Expand All @@ -54,9 +60,11 @@ public void destroyObject(PooledObject<Connection> pooledConnection) throws Exce

@Override
public PooledObject<Connection> makeObject() throws Exception {
Connection jedis = null;
try {
jedis = new Connection(jedisSocketFactory, clientConfig);
Connection jedis = clientSideCache == null
? new Connection(jedisSocketFactory, clientConfig)
: new Connection(jedisSocketFactory, clientConfig, clientSideCache);

return new DefaultPooledObject<>(jedis);
} catch (JedisException je) {
logger.debug("Error while makeObject", je);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig));
}

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache csCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCache));
}

public ConnectionPool(PooledObjectFactory<Connection> factory) {
super(factory);
}
Expand All @@ -19,6 +23,11 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
}

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache csCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCache), poolConfig);
}

public ConnectionPool(PooledObjectFactory<Connection> factory,
GenericObjectPoolConfig<Connection> poolConfig) {
super(factory, poolConfig);
Expand Down
44 changes: 0 additions & 44 deletions src/main/java/redis/clients/jedis/JedisClientSideCache.java

This file was deleted.

54 changes: 44 additions & 10 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -198,28 +197,63 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts), poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol());
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
this(clusterNodes, clientConfig, clientSideCache, DEFAULT_MAX_ATTEMPTS,
Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()));
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
DEFAULT_MAX_ATTEMPTS, Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()),
clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
RedisProtocol protocol, ClientSideCache clientSideCache) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
}

public Map<String, ConnectionPool> getClusterNodes() {
Expand Down
43 changes: 39 additions & 4 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class JedisClusterInfoCache {

private final GenericObjectPoolConfig<Connection> poolConfig;
private final JedisClientConfig clientConfig;
private final ClientSideCache clientSideCache;
private final Set<HostAndPort> startNodes;

private static final int MASTER_NODE_INDEX = 2;
Expand All @@ -61,19 +62,35 @@ public void run() {
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, null, startNodes);
this(clientConfig, null, null, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache, final Set<HostAndPort> startNodes) {
this(clientConfig, csCache, null, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, poolConfig, startNodes, null);
this(clientConfig, null, poolConfig, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, csCache, poolConfig, startNodes, null);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this(clientConfig, null, poolConfig, startNodes, topologyRefreshPeriod);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.clientSideCache = csCache;
this.startNodes = startNodes;
if (topologyRefreshPeriod != null) {
logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes);
Expand Down Expand Up @@ -209,6 +226,9 @@ private void discoverClusterSlots(Connection jedis) {
try {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
}
Set<String> hostAndPortKeys = new HashSet<>();

for (Object slotInfoObj : slotsInfo) {
Expand Down Expand Up @@ -270,15 +290,30 @@ public ConnectionPool setupNodeIfNotExist(final HostAndPort node) {
ConnectionPool existingPool = nodes.get(nodeKey);
if (existingPool != null) return existingPool;

ConnectionPool nodePool = poolConfig == null ? new ConnectionPool(node, clientConfig)
: new ConnectionPool(node, clientConfig, poolConfig);
ConnectionPool nodePool = createNodePool(node);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
w.unlock();
}
}

private ConnectionPool createNodePool(HostAndPort node) {
if (poolConfig == null) {
if (clientSideCache == null) {
return new ConnectionPool(node, clientConfig);
} else {
return new ConnectionPool(node, clientConfig, clientSideCache);
}
} else {
if (clientSideCache == null) {
return new ConnectionPool(node, clientConfig, poolConfig);
} else {
return new ConnectionPool(node, clientConfig, clientSideCache, poolConfig);
}
}
}

public void assignSlotToNode(int slot, HostAndPort targetNode) {
w.lock();
try {
Expand Down
Loading

0 comments on commit 3ab6bdc

Please sign in to comment.