Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support white-list and black-list commands and keys #3740

Closed
wants to merge 12 commits into from
16 changes: 9 additions & 7 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.args.ClientAttributeOption;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
Expand All @@ -34,7 +35,7 @@ public class Connection implements Closeable {
private Socket socket;
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private ClientSideCache clientSideCache;
private ClientSideCacheConfig clientSideCacheConfig;
private int soTimeout = 0;
private int infiniteSoTimeout = 0;
private boolean broken = false;
Expand Down Expand Up @@ -66,12 +67,13 @@ public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clie
initializeConnection(clientConfig);
}

public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, ClientSideCache csCache) {
public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig,
ClientSideCacheConfig csCacheConfig) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeConnection(clientConfig);
initializeClientSideCache(csCache);
initializeClientSideCache(csCacheConfig);
}

@Override
Expand Down Expand Up @@ -354,7 +356,7 @@ protected Object readProtocolWithCheckingBroken() {
}

try {
return Protocol.read(inputStream, clientSideCache);
return Protocol.read(inputStream, clientSideCacheConfig);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
Expand Down Expand Up @@ -519,9 +521,9 @@ public boolean ping() {
return true;
}

private void initializeClientSideCache(ClientSideCache csCache) {
this.clientSideCache = csCache;
if (clientSideCache != null) {
private void initializeClientSideCache(ClientSideCacheConfig csCacheConfig) {
this.clientSideCacheConfig = csCacheConfig;
if (clientSideCacheConfig != null) {
if (protocol != RedisProtocol.RESP3) {
throw new JedisException("Client side caching is only supported with RESP3.");
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.exceptions.JedisException;

/**
Expand All @@ -18,7 +18,7 @@ public class ConnectionFactory implements PooledObjectFactory<Connection> {

private final JedisSocketFactory jedisSocketFactory;
private final JedisClientConfig clientConfig;
private ClientSideCache clientSideCache = null;
private ClientSideCacheConfig clientSideCacheConfig = null;

public ConnectionFactory(final HostAndPort hostAndPort) {
this.clientConfig = DefaultJedisClientConfig.builder().build();
Expand All @@ -30,10 +30,11 @@ public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
}

public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig,
ClientSideCacheConfig csCacheConfig) {
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
this.clientSideCache = csCache;
this.clientSideCacheConfig = csCacheConfig;
}

public ConnectionFactory(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
Expand Down Expand Up @@ -61,9 +62,9 @@ public void destroyObject(PooledObject<Connection> pooledConnection) throws Exce
@Override
public PooledObject<Connection> makeObject() throws Exception {
try {
Connection jedis = clientSideCache == null
Connection jedis = clientSideCacheConfig == null
? new Connection(jedisSocketFactory, clientConfig)
: new Connection(jedisSocketFactory, clientConfig, clientSideCache);
: new Connection(jedisSocketFactory, clientConfig, clientSideCacheConfig);

return new DefaultPooledObject<>(jedis);
} catch (JedisException je) {
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/redis/clients/jedis/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.util.Pool;

public class ConnectionPool extends Pool<Connection> {
Expand All @@ -10,8 +11,8 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig));
}

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache csCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCache));
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCacheConfig csCacheConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCacheConfig));
}

public ConnectionPool(PooledObjectFactory<Connection> factory) {
Expand All @@ -23,9 +24,9 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
}

public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache csCache,
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCacheConfig csCacheConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig, csCache), poolConfig);
this(new ConnectionFactory(hostAndPort, clientConfig, csCacheConfig), poolConfig);
}

public ConnectionPool(PooledObjectFactory<Connection> factory,
Expand Down
33 changes: 17 additions & 16 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -215,40 +216,40 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
this(clusterNodes, clientConfig, clientSideCache, DEFAULT_MAX_ATTEMPTS,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig csCacheConfig) {
this(clusterNodes, clientConfig, csCacheConfig, DEFAULT_MAX_ATTEMPTS,
Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()));
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig csCacheConfig,
int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), clientSideCache);
this(new ClusterConnectionProvider(clusterNodes, clientConfig, csCacheConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), csCacheConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig csCacheConfig,
int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
this(new ClusterConnectionProvider(clusterNodes, clientConfig, csCacheConfig, poolConfig),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), csCacheConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig csCacheConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
this(new ClusterConnectionProvider(clusterNodes, clientConfig, csCacheConfig, poolConfig),
DEFAULT_MAX_ATTEMPTS, Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()),
clientConfig.getRedisProtocol(), clientSideCache);
clientConfig.getRedisProtocol(), csCacheConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCacheConfig csCacheConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
this(new ClusterConnectionProvider(clusterNodes, clientConfig, csCacheConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), csCacheConfig);
}

private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol, ClientSideCache clientSideCache) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
RedisProtocol protocol, ClientSideCacheConfig csCacheConfig) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, csCacheConfig);
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;
Expand All @@ -42,7 +43,7 @@ public class JedisClusterInfoCache {

private final GenericObjectPoolConfig<Connection> poolConfig;
private final JedisClientConfig clientConfig;
private final ClientSideCache clientSideCache;
private final ClientSideCacheConfig csCacheConfig;
private final Set<HostAndPort> startNodes;

private static final int MASTER_NODE_INDEX = 2;
Expand All @@ -65,7 +66,7 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<Hos
this(clientConfig, null, null, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache, final Set<HostAndPort> startNodes) {
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCacheConfig csCache, final Set<HostAndPort> startNodes) {
this(clientConfig, csCache, null, startNodes);
}

Expand All @@ -74,7 +75,7 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
this(clientConfig, null, poolConfig, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache,
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCacheConfig csCache,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, csCache, poolConfig, startNodes, null);
}
Expand All @@ -85,12 +86,12 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
this(clientConfig, null, poolConfig, startNodes, topologyRefreshPeriod);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCache csCache,
public JedisClusterInfoCache(final JedisClientConfig clientConfig, ClientSideCacheConfig csCacheConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.clientSideCache = csCache;
this.csCacheConfig = csCacheConfig;
this.startNodes = startNodes;
if (topologyRefreshPeriod != null) {
logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes);
Expand Down Expand Up @@ -226,8 +227,8 @@ private void discoverClusterSlots(Connection jedis) {
try {
Arrays.fill(slots, null);
Arrays.fill(slotNodes, null);
if (clientSideCache != null) {
clientSideCache.clear();
if (csCacheConfig != null) {
csCacheConfig.getClientSideCache().clear();
}
Set<String> hostAndPortKeys = new HashSet<>();

Expand Down Expand Up @@ -300,16 +301,16 @@ public ConnectionPool setupNodeIfNotExist(final HostAndPort node) {

private ConnectionPool createNodePool(HostAndPort node) {
if (poolConfig == null) {
if (clientSideCache == null) {
if (csCacheConfig == null) {
return new ConnectionPool(node, clientConfig);
} else {
return new ConnectionPool(node, clientConfig, clientSideCache);
return new ConnectionPool(node, clientConfig, csCacheConfig);
}
} else {
if (clientSideCache == null) {
if (csCacheConfig == null) {
return new ConnectionPool(node, clientConfig, poolConfig);
} else {
return new ConnectionPool(node, clientConfig, clientSideCache, poolConfig);
return new ConnectionPool(node, clientConfig, csCacheConfig, poolConfig);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -75,7 +76,7 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
super(hostAndPort, clientConfig);
}

public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCacheConfig csCache) {
super(hostAndPort, clientConfig, csCache);
}

Expand Down Expand Up @@ -379,7 +380,7 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
super(new PooledConnectionProvider(hostAndPort, clientConfig, poolConfig), clientConfig.getRedisProtocol());
}

public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache,
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCacheConfig csCache,
final GenericObjectPoolConfig<Connection> poolConfig) {
super(new PooledConnectionProvider(hostAndPort, clientConfig, csCache, poolConfig),
clientConfig.getRedisProtocol(), csCache);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {
Expand All @@ -12,7 +13,7 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
masterClientConfig.getRedisProtocol());
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCache clientSideCache,
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCacheConfig clientSideCache,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache,
sentinels, sentinelClientConfig), masterClientConfig.getRedisProtocol(), clientSideCache);
Expand All @@ -25,7 +26,7 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
masterClientConfig.getRedisProtocol());
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCache clientSideCache,
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, ClientSideCacheConfig clientSideCache,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
super(new SentineledConnectionProvider(masterName, masterClientConfig, clientSideCache, poolConfig,
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import redis.clients.jedis.exceptions.*;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.csc.ClientSideCacheConfig;
import redis.clients.jedis.util.KeyValue;
import redis.clients.jedis.util.RedisInputStream;
import redis.clients.jedis.util.RedisOutputStream;
Expand Down Expand Up @@ -228,17 +230,17 @@ public static Object read(final RedisInputStream is) {
return process(is);
}

public static Object read(final RedisInputStream is, final ClientSideCache cache) {
public static Object read(final RedisInputStream is, final ClientSideCacheConfig cache) {
readPushes(is, cache);
return process(is);
}

private static void readPushes(final RedisInputStream is, final ClientSideCache cache) {
private static void readPushes(final RedisInputStream is, final ClientSideCacheConfig cache) {
if (cache != null) {
//System.out.println("PEEK: " + is.peekByte());
while (is.peek(GREATER_THAN_BYTE)) {
is.readByte();
processPush(is, cache);
processPush(is, cache.getClientSideCache());
}
}
}
Expand Down
Loading
Loading