Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding CacheConfig #3919

Merged
merged 11 commits into from
Aug 28, 2024
10 changes: 9 additions & 1 deletion src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.csc.CacheConfig;
import redis.clients.jedis.csc.CacheProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

public class JedisCluster extends UnifiedJedis {
Expand Down Expand Up @@ -225,10 +227,16 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()));
}

@Experimental
public JedisCluster(Set<HostAndPort> hnp, JedisClientConfig jedisClientConfig, CacheConfig cacheConfig) {
this(hnp, jedisClientConfig, new CacheProvider().getCache(cacheConfig));
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, Cache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts, maxTotalRetriesDuration,
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts,
maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), clientSideCache);
Comment on lines +238 to 240
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts,
maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), clientSideCache);
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts,
maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);

}

Expand Down
13 changes: 13 additions & 0 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.csc.CacheConfig;
import redis.clients.jedis.csc.CacheProvider;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -82,6 +84,11 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
super(hostAndPort, clientConfig, clientSideCache);
}

@Experimental
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, CacheConfig cacheConfig) {
this(hostAndPort, clientConfig, new CacheProvider().getCache(cacheConfig));
}

public JedisPooled(PooledObjectFactory<Connection> factory) {
this(new PooledConnectionProvider(factory));
}
Expand Down Expand Up @@ -389,6 +396,12 @@ public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig client
clientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
public JedisPooled(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, CacheConfig cacheConfig,
final GenericObjectPoolConfig<Connection> poolConfig) {
this(hostAndPort, clientConfig, new CacheProvider().getCache(cacheConfig), poolConfig);
}

public JedisPooled(final GenericObjectPoolConfig<Connection> poolConfig,
final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
super(new PooledConnectionProvider(new ConnectionFactory(jedisSocketFactory, clientConfig), poolConfig),
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.csc.CacheConfig;
import redis.clients.jedis.csc.CacheProvider;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {
Expand All @@ -21,6 +23,13 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
sentinels, sentinelClientConfig), masterClientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, CacheConfig cacheConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, new CacheProvider().getCache(cacheConfig),
sentinels, sentinelClientConfig);
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,8 @@ public static void readPushes(final RedisInputStream is, final Cache cache, bool
is.readByte();
processPush(is, cache);
}
} catch (JedisConnectionException e) {
// TODO handle it properly
} catch (IOException e) {
// TODO handle it properly
throw new JedisConnectionException("Failed to read pending buffer for push messages !", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new JedisConnectionException("Failed to read pending buffer for push messages !", e);
throw new JedisConnectionException("Failed to read pending buffer for push messages!", e);

}
} else {
while (is.peek(GREATER_THAN_BYTE)) {
Expand Down
38 changes: 28 additions & 10 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import redis.clients.jedis.commands.SampleKeyedCommands;
import redis.clients.jedis.commands.RedisModuleCommands;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.csc.CacheConfig;
import redis.clients.jedis.csc.CacheConnection;
import redis.clients.jedis.csc.CacheProvider;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.executors.*;
import redis.clients.jedis.gears.TFunctionListParams;
Expand Down Expand Up @@ -58,6 +61,7 @@ public class UnifiedJedis implements JedisCommands, JedisBinaryCommands,
protected final CommandObjects commandObjects;
private final GraphCommandObjects graphCommandObjects;
private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;
private final Cache cache;

public UnifiedJedis() {
this(new HostAndPort(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT));
Expand Down Expand Up @@ -95,9 +99,14 @@ public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
}

@Experimental
public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig, Cache clientSideCache) {
this(new PooledConnectionProvider(hostAndPort, clientConfig, clientSideCache), clientConfig.getRedisProtocol(),
clientSideCache);
public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig, Cache cache) {
this(new PooledConnectionProvider(hostAndPort, clientConfig, cache), clientConfig.getRedisProtocol(),
cache);
}

@Experimental
public UnifiedJedis(HostAndPort hostAndPort, JedisClientConfig clientConfig, CacheConfig cacheConfig) {
this(hostAndPort, clientConfig, new CacheProvider().getCache(cacheConfig));
}

public UnifiedJedis(ConnectionProvider provider) {
Expand All @@ -109,8 +118,8 @@ protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol) {
}

@Experimental
protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol, Cache clientSideCache) {
this(new DefaultCommandExecutor(provider), provider, new CommandObjects(), protocol, clientSideCache);
protected UnifiedJedis(ConnectionProvider provider, RedisProtocol protocol, Cache cache) {
this(new DefaultCommandExecutor(provider), provider, new CommandObjects(), protocol, cache);
}

/**
Expand Down Expand Up @@ -147,6 +156,11 @@ public UnifiedJedis(Connection connection) {
if (proto != null)
this.commandObjects.setProtocol(proto);
this.graphCommandObjects = new GraphCommandObjects(this);
if (connection instanceof CacheConnection) {
this.cache = ((CacheConnection) connection).getCache();
} else {
this.cache = null;
}
}

@Deprecated
Expand Down Expand Up @@ -183,9 +197,9 @@ protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Dura

@Experimental
protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol, Cache clientSideCache) {
RedisProtocol protocol, Cache cache) {
this(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider,
new ClusterCommandObjects(), protocol, clientSideCache);
new ClusterCommandObjects(), protocol, cache);
}

/**
Expand Down Expand Up @@ -259,9 +273,9 @@ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, Comm

@Experimental
private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol protocol, Cache clientSideCache) {
RedisProtocol protocol, Cache cache) {

if (clientSideCache != null && protocol != RedisProtocol.RESP3) {
if (cache != null && protocol != RedisProtocol.RESP3) {
throw new IllegalArgumentException("Client-side caching is only supported with RESP3.");
}

Expand All @@ -274,7 +288,7 @@ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, Comm

this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));

this.cache = cache;
}

@Override
Expand Down Expand Up @@ -314,6 +328,10 @@ public void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig co
this.commandObjects.setBroadcastAndRoundRobinConfig(this.broadcastAndRoundRobinConfig);
}

public Cache getCache() {
return cache;
}

public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/redis/clients/jedis/csc/CacheConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package redis.clients.jedis.csc;

public class CacheConfig {

private int maxSize;
private Cacheable cacheable;
private EvictionPolicy evictionPolicy;

public int getMaxSize() {
return maxSize;
}

public Cacheable getCacheable() {
return cacheable;
}

public EvictionPolicy getEvictionPolicy() {
return evictionPolicy;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private int maxSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set the default value.

private Cacheable cacheable = DefaultCacheable.INSTANCE;
private EvictionPolicy evictionPolicy;

public Builder maxSize(int maxSize) {
this.maxSize = maxSize;
return this;
}

public Builder evictionPolicy(EvictionPolicy policy) {
this.evictionPolicy = policy;
return this;
}

public Builder cacheable(Cacheable cacheable) {
this.cacheable = cacheable;
return this;
}

public CacheConfig build() {
CacheConfig cacheConfig = new CacheConfig();
cacheConfig.maxSize = this.maxSize;
cacheConfig.cacheable = this.cacheable;
cacheConfig.evictionPolicy = this.evictionPolicy;
return cacheConfig;
}
}
}
34 changes: 19 additions & 15 deletions src/main/java/redis/clients/jedis/csc/CacheConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

public class CacheConnection extends Connection {

private final Cache clientSideCache;
private final Cache cache;
private ReentrantLock lock;

public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, Cache clientSideCache) {
public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, Cache cache) {
super(socketFactory, clientConfig);

if (protocol != RedisProtocol.RESP3) {
throw new JedisException("Client side caching is only supported with RESP3.");
}
this.clientSideCache = Objects.requireNonNull(clientSideCache);
this.cache = Objects.requireNonNull(cache);
initializeClientSideCache();
}

Expand All @@ -37,7 +37,7 @@ protected void initializeFromClientConfig(JedisClientConfig config) {
protected Object protocolRead(RedisInputStream inputStream) {
lock.lock();
try {
return Protocol.read(inputStream, clientSideCache);
return Protocol.read(inputStream, cache);
} finally {
lock.unlock();
}
Expand All @@ -47,7 +47,7 @@ protected Object protocolRead(RedisInputStream inputStream) {
protected void protocolReadPushes(RedisInputStream inputStream) {
if (lock.tryLock()) {
try {
Protocol.readPushes(inputStream, clientSideCache, true);
Protocol.readPushes(inputStream, cache, true);
} finally {
lock.unlock();
}
Expand All @@ -57,37 +57,41 @@ protected void protocolReadPushes(RedisInputStream inputStream) {
@Override
public void disconnect() {
super.disconnect();
clientSideCache.flush();
cache.flush();
}

@Override
public <T> T executeCommand(final CommandObject<T> commandObject) {
final CacheKey cacheKey = new CacheKey(commandObject);
if (!clientSideCache.isCacheable(cacheKey)) {
clientSideCache.getStats().nonCacheable();
if (!cache.isCacheable(cacheKey)) {
cache.getStats().nonCacheable();
return super.executeCommand(commandObject);
}

CacheEntry<T> cacheEntry = clientSideCache.get(cacheKey);
CacheEntry<T> cacheEntry = cache.get(cacheKey);
if (cacheEntry != null) { // (probable) CACHE HIT !!
cacheEntry = validateEntry(cacheEntry);
if (cacheEntry != null) {
// CACHE HIT confirmed !!!
clientSideCache.getStats().hit();
cache.getStats().hit();
return cacheEntry.getValue();
}
}

// CACHE MISS !!
clientSideCache.getStats().miss();
cache.getStats().miss();
T value = super.executeCommand(commandObject);
cacheEntry = new CacheEntry<>(cacheKey, value, this);
clientSideCache.set(cacheKey, cacheEntry);
cache.set(cacheKey, cacheEntry);
// this line actually provides a deep copy of cached object instance
value = cacheEntry.getValue();
return value;
}

public Cache getCache() {
return cache;
}

private void initializeClientSideCache() {
sendCommand(Protocol.Command.CLIENT, "TRACKING", "ON");
String reply = getStatusCodeReply();
Expand All @@ -99,17 +103,17 @@ private void initializeClientSideCache() {
private CacheEntry validateEntry(CacheEntry cacheEntry) {
CacheConnection cacheOwner = cacheEntry.getConnection();
if (cacheOwner == null || cacheOwner.isBroken() || !cacheOwner.isConnected()) {
clientSideCache.delete(cacheEntry.getCacheKey());
cache.delete(cacheEntry.getCacheKey());
return null;
} else {
try {
cacheOwner.readPushesWithCheckingBroken();
} catch (JedisException e) {
clientSideCache.delete(cacheEntry.getCacheKey());
cache.delete(cacheEntry.getCacheKey());
return null;
}

return clientSideCache.get(cacheEntry.getCacheKey());
return cache.get(cacheEntry.getCacheKey());
}
}
}
23 changes: 23 additions & 0 deletions src/main/java/redis/clients/jedis/csc/CacheProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package redis.clients.jedis.csc;

import java.util.HashMap;

public class CacheProvider {
atakavci marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rename this class. IMHO, the way the term Provider is used in this repo doesn't go along with the usage/behavior of this class.

(Suggestions: CacheManager, CacheUtil, CacheConfigManager, CacheConfigHandler.)


public Cache getCache(CacheConfig config) {
return getCache(config, new HashMap<CacheKey, CacheEntry>());
}

public Cache getCache(CacheConfig config, HashMap<CacheKey, CacheEntry> map) {
return new DefaultCache(config.getMaxSize(), map, config.getCacheable(),
getEvictionPolicy(config));
}
Comment on lines +7 to +14
Copy link
Collaborator

@sazzad16 sazzad16 Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just asking/discussing: How about renaming getCache to createCache?

BTW, if this class is rename to CacheManager or CacheUtil, createCache or simply create makes more sense.


private EvictionPolicy getEvictionPolicy(CacheConfig config) {
if (config.getEvictionPolicy() == null) {
// It will be default to LRUEviction, until we have other eviction implementations
return new LRUEviction(config.getMaxSize());
}
return config.getEvictionPolicy();
}
}
Loading
Loading