Skip to content

Commit

Permalink
Merge 5375c9e into 2480b02
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Feb 27, 2024
2 parents 2480b02 + 5375c9e commit 2f3d98a
Show file tree
Hide file tree
Showing 24 changed files with 348 additions and 107 deletions.
7 changes: 4 additions & 3 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.args.ClientAttributeOption;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
Expand All @@ -34,7 +35,7 @@ public class Connection implements Closeable {
private Socket socket;
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private ClientSideCache clientSideCache;
private ClientSideCacheConfig clientSideCache;
private int soTimeout = 0;
private int infiniteSoTimeout = 0;
private boolean broken = false;
Expand Down Expand Up @@ -66,7 +67,7 @@ public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clie
initializeConnection(clientConfig);
}

public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, ClientSideCache csCache) {
public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, ClientSideCacheConfig csCache) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
Expand Down Expand Up @@ -519,7 +520,7 @@ public boolean ping() {
return true;
}

private void initializeClientSideCache(ClientSideCache csCache) {
private void initializeClientSideCache(ClientSideCacheConfig csCache) {
this.clientSideCache = csCache;
if (clientSideCache != null) {
if (protocol != RedisProtocol.RESP3) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.exceptions.JedisException;

/**
Expand All @@ -18,7 +18,7 @@ public class ConnectionFactory implements PooledObjectFactory<Connection> {

private final JedisSocketFactory jedisSocketFactory;
private final JedisClientConfig clientConfig;
private ClientSideCache clientSideCache = null;
private ClientSideCacheConfig clientSideCache = null;

public ConnectionFactory(final HostAndPort hostAndPort) {
this.clientConfig = DefaultJedisClientConfig.builder().build();
Expand All @@ -30,7 +30,7 @@ public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
}

public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCacheConfig csCache) {
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
this.clientSideCache = csCache;
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/redis/clients/jedis/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.util.Pool;

public class ConnectionPool extends Pool<Connection> {
Expand All @@ -10,7 +11,7 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig));
}

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache csCache) {
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCacheConfig csCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCache));
}

Expand All @@ -23,7 +24,7 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
}

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache csCache,
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCacheConfig csCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCache), poolConfig);
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -215,39 +216,39 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig clientSideCache) {
this(clusterNodes, clientConfig, clientSideCache, DEFAULT_MAX_ATTEMPTS,
Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()));
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
DEFAULT_MAX_ATTEMPTS, Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()),
clientConfig.getRedisProtocol(), clientSideCache);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol, ClientSideCache clientSideCache) {
RedisProtocol protocol, ClientSideCacheConfig clientSideCache) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
}

Expand Down
11 changes: 6 additions & 5 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;
Expand All @@ -42,7 +43,7 @@ public class JedisClusterInfoCache {

private final GenericObjectPoolConfig<Connection> poolConfig;
private final JedisClientConfig clientConfig;
private final ClientSideCache clientSideCache;
private final ClientSideCacheConfig clientSideCache;
private final Set<HostAndPort> startNodes;

private static final int MASTER_NODE_INDEX = 2;
Expand All @@ -65,7 +66,7 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<Hos
this(clientConfig, null, null, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache, final Set<HostAndPort> startNodes) {
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCacheConfig csCache, final Set<HostAndPort> startNodes) {
this(clientConfig, csCache, null, startNodes);
}

Expand All @@ -74,7 +75,7 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
this(clientConfig, null, poolConfig, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache,
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCacheConfig csCache,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, csCache, poolConfig, startNodes, null);
}
Expand All @@ -85,7 +86,7 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
this(clientConfig, null, poolConfig, startNodes, topologyRefreshPeriod);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache,
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCacheConfig csCache,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
Expand Down Expand Up @@ -227,7 +228,7 @@ private void discoverClusterSlots(Connection jedis) {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
clientSideCache.getClientSideCache().clear();
}
Set<String> hostAndPortKeys = new HashSet<>();

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -75,7 +76,7 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
super(hostAndPort, clientConfig);
}

public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCacheConfig csCache) {
super(hostAndPort, clientConfig, csCache);
}

Expand Down Expand Up @@ -379,7 +380,7 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
super(new PooledConnectionProvider(hostAndPort, clientConfig, poolConfig), clientConfig.getRedisProtocol());
}

public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache,
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCacheConfig csCache,
final GenericObjectPoolConfig<Connection> poolConfig) {
super(new PooledConnectionProvider(hostAndPort, clientConfig, csCache, poolConfig),
clientConfig.getRedisProtocol(), csCache);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {
Expand All @@ -12,7 +13,7 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
masterClientConfig.getRedisProtocol());
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCache clientSideCache,
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCacheConfig clientSideCache,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache,
sentinels, sentinelClientConfig), masterClientConfig.getRedisProtocol(), clientSideCache);
Expand All @@ -25,7 +26,7 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
masterClientConfig.getRedisProtocol());
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCache clientSideCache,
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCacheConfig clientSideCache,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache, poolConfig,
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import redis.clients.jedis.exceptions.*;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.util.KeyValue;
import redis.clients.jedis.util.RedisInputStream;
import redis.clients.jedis.util.RedisOutputStream;
Expand Down Expand Up @@ -228,17 +230,17 @@ public static Object read(final RedisInputStream is) {
return process(is);
}

public static Object read(final RedisInputStream is, final ClientSideCache cache) {
public static Object read(final RedisInputStream is, final ClientSideCacheConfig cache) {
readPushes(is, cache);
return process(is);
}

private static void readPushes(final RedisInputStream is, final ClientSideCache cache) {
private static void readPushes(final RedisInputStream is, final ClientSideCacheConfig cache) {
if (cache != null) {
//System.out.println("PEEK: " + is.peekByte());
while (is.peek(GREATER_THAN_BYTE)) {
is.readByte();
processPush(is, cache);
processPush(is, cache.getClientSideCache());
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import redis.clients.jedis.commands.SampleBinaryKeyedCommands;
import redis.clients.jedis.commands.SampleKeyedCommands;
import redis.clients.jedis.commands.RedisModuleCommands;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.executors.*;
import redis.clients.jedis.gears.TFunctionListParams;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class UnifiedJedis implements JedisCommands, JedisBinaryCommands,
AutoCloseable {

@Deprecated protected RedisProtocol protocol = null;
private final ClientSideCache clientSideCache;
private final ClientSideCacheConfig clientSideCache;
protected final ConnectionProvider provider;
protected final CommandExecutor executor;
protected final CommandObjects commandObjects;
Expand Down Expand Up @@ -92,7 +93,7 @@ public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new PooledConnectionProvider(hostAndPort, clientConfig), clientConfig.getRedisProtocol());
}

public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCacheConfig clientSideCache) {
this(new PooledConnectionProvider(hostAndPort, clientConfig, clientSideCache), clientConfig.getRedisProtocol(), clientSideCache);
}

Expand All @@ -104,7 +105,7 @@ protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol) {
this(new DefaultCommandExecutor(provider), provider, new CommandObjects(), protocol);
}

protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol, ClientSideCache clientSideCache) {
protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol, ClientSideCacheConfig clientSideCache) {
this(new DefaultCommandExecutor(provider), provider, new CommandObjects(), protocol, clientSideCache);
}

Expand Down Expand Up @@ -176,7 +177,7 @@ protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Dura
}

protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol, ClientSideCache clientSideCache) {
RedisProtocol protocol, ClientSideCacheConfig clientSideCache) {
this(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider,
new ClusterCommandObjects(), protocol, clientSideCache);
}
Expand Down Expand Up @@ -241,11 +242,11 @@ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, Comm

private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol protocol) {
this(executor, provider, commandObjects, protocol, (ClientSideCache) null);
this(executor, provider, commandObjects, protocol, (ClientSideCacheConfig) null);
}

private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol protocol, ClientSideCache clientSideCache) {
RedisProtocol protocol, ClientSideCacheConfig clientSideCache) {

if (clientSideCache != null && protocol != RedisProtocol.RESP3) {
throw new IllegalArgumentException("Client-side caching is only supported with RESP3.");
Expand Down Expand Up @@ -300,11 +301,13 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
}

private <T> T checkAndClientSideCacheCommand(CommandObject<T> command, Object... keys) {
if (clientSideCache == null) {
return executeCommand(command);
if (clientSideCache != null) {
if (clientSideCache.isCacheable(command.getArguments().getCommand(), (Object[]) keys)) {
return clientSideCache.getClientSideCache().get((cmd) -> executeCommand(cmd), command, keys);
}
}

return clientSideCache.getValue((cmd) -> executeCommand(cmd), command, keys);
return executeCommand(command);
}

public String ping() {
Expand Down
Loading

0 comments on commit 2f3d98a

Please sign in to comment.