Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue #2220 - client side caching PoC test 1 #2236

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.params.ClientKillParams;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.ClientTrackingParams;
import redis.clients.jedis.util.SafeEncoder;

public class BinaryClient extends Connection {
Expand All @@ -44,6 +47,9 @@ public class BinaryClient extends Connection {

private boolean isInWatch;

private RedisClientCache redisClientCache = null;
protected static Logger log = LoggerFactory.getLogger(BinaryClient.class.getName());

public BinaryClient() {
super();
}
Expand Down Expand Up @@ -1101,6 +1107,40 @@ public void clientId() {
sendCommand(CLIENT, Keyword.ID.raw);
}

public boolean isCachedConnection() {
return (redisClientCache != null);
}

public Object getValueFromClientCache(final String key) {
// TODO Remove/change level
log.info("Try to get the key: {} from client cache", key);
return redisClientCache.get(key);
}

public void putValueInClientCache(final String key, Object value )
{
// TODO Remove/change level
log.info("Put key: {} in client cache", key);
redisClientCache.put(key, value);
}

// TODO: Check the best way to retrieve/pass the invalidation connection
public void clientTracking(boolean enabled, BinaryJedis jedis, ClientTrackingParams params){
if (params == null) {
params = ClientTrackingParams.clientTrackingParams();
}

// TODO check RESP 3 vs RESP 2
redisClientCache = new RedisClientCache(jedis);
Long cacheListernerClientId = redisClientCache.getRedisCacheClientId();
params.redirect(cacheListernerClientId);

// TODO Remove/change level
log.info("BinaryJedis clientTracking {} .", params);

sendCommand(CLIENT, joinParameters(Keyword.TRACKING.raw, (enabled?"ON":"OFF").getBytes(), params.getByteParams()));
}

public void time() {
sendCommand(TIME);
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.ClientTrackingParams;
import redis.clients.jedis.util.JedisByteHashMap;
import redis.clients.jedis.util.JedisURIHelper;

Expand Down Expand Up @@ -3723,6 +3724,13 @@ public Long clientId() {
return client.getIntegerReply();
}

// TODO Check the best way to pass the invalidation connection
@Override
public String clientTracking(boolean enabled, BinaryJedis jedis, ClientTrackingParams params) {
client.clientTracking(enabled, jedis, params);
return client.getBulkReply();
}

public String clientPause(final long timeout) {
checkIsInMultiOrPipeline();
client.clientPause(timeout);
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/redis/clients/jedis/BinaryJedisPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;

import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

public abstract class BinaryJedisPubSub {
private int subscribedChannels = 0;
Expand All @@ -19,6 +20,9 @@ public abstract class BinaryJedisPubSub {
public void onMessage(byte[] channel, byte[] message) {
}

public void onMessage(byte[] channel, List<byte[]> messages) {
}

public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
}

Expand Down Expand Up @@ -100,8 +104,16 @@ private void process(Client client) {
onUnsubscribe(bchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
onMessage(bchannel, bmesg);

// Client Side Caching messages are sent as an Array List not a simple String
if ( reply.get(2) instanceof List ) {
final List bmesgs = (List)reply.get(2);
final List messages = (bmesgs == null) ? null : bmesgs ;
onMessage(bchannel, messages);
} else {
final byte[] bmesg = (byte[]) reply.get(2);
onMessage(bchannel, bmesg);
}
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
}
}

/**
* TODO: check a better way, especially when using pool
* @return the socket factory of this connection
*/
protected JedisSocketFactory getJedisSocketFactory(){
return jedisSocketFactory;
}

public String getHost() {
return jedisSocketFactory.getHost();
}
Expand Down
52 changes: 42 additions & 10 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.commands.ScriptingCommands;
import redis.clients.jedis.commands.SentinelCommands;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.SafeEncoder;
import redis.clients.jedis.util.Slowlog;

Expand Down Expand Up @@ -180,9 +176,24 @@ public String set(final String key, final String value, final SetParams params)
*/
@Override
public String get(final String key) {
checkIsInMultiOrPipeline();
client.get(key);
return client.getBulkReply();
String value = null;
// TODO : for Client Side Caching define the work that must be done with multi/pipeline
checkIsInMultiOrPipeline();

if (client.isCachedConnection()) {
value = (String)client.getValueFromClientCache(key);
if (value == null) { // value is not found in the cache, we must check Redis
client.get(key);
value = client.getBulkReply();
if (client.isCachedConnection() && value != null) {
client.putValueInClientCache(key, value);
}
}
} else { // connection is not using ClienSideCaching
client.get(key);
value = client.getBulkReply();
}
return value;
}

/**
Expand Down Expand Up @@ -941,9 +952,23 @@ public List<String> hvals(final String key) {
*/
@Override
public Map<String, String> hgetAll(final String key) {
Map<String, String> value = null;
checkIsInMultiOrPipeline();
client.hgetAll(key);
return BuilderFactory.STRING_MAP.build(client.getBinaryMultiBulkReply());

if (client.isCachedConnection()) {
value = (Map<String, String>)client.getValueFromClientCache(key);
if (value == null) {
client.hgetAll(key);
value = BuilderFactory.STRING_MAP.build(client.getBinaryMultiBulkReply());
if (client.isCachedConnection() && value != null) {
client.putValueInClientCache(key, value);
}
}
} else { // connection is not using ClienSideCaching
client.hgetAll(key);
value = BuilderFactory.STRING_MAP.build(client.getBinaryMultiBulkReply());
}
return value;
}

/**
Expand Down Expand Up @@ -3250,6 +3275,13 @@ public Long clientId() {
return client.getIntegerReply();
}

// TODO check the best way to pass the invalidation connection
@Override
public String clientTracking(boolean enabled, Jedis jedis, ClientTrackingParams params) {
client.clientTracking(enabled,jedis, params);
return client.getBulkReply();
}

@Override
public String migrate(final String host, final int port, final String key,
final int destinationDb, final int timeout) {
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/redis/clients/jedis/JedisPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public abstract class JedisPubSub {
public void onMessage(String channel, String message) {
}

public void onMessage(String channel, List<String> messages) {
}

public void onPMessage(String pattern, String channel, String message) {
}

Expand Down Expand Up @@ -138,10 +141,20 @@ private void process(Client client) {
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg);

// Client Side Caching messages are sent as an Array List not a simple String
if ( reply.get(2) instanceof List ) {
final List bmesgs = (List)reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final List messages = (bmesgs == null) ? null : SafeEncoder.encode(bmesgs) ;
onMessage(strchannel, messages);
} else {
final byte[] bmesg = (byte[]) reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg);
}

} else if (Arrays.equals(PMESSAGE.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ public static enum Keyword {
GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR,
BLOCK, NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP,
ID, IDLE, TIME, RETRYCOUNT, FORCE, STREAM, GROUPS, CONSUMERS, HELP, FREQ,
SETUSER, GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS;
SETUSER, GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS,TRACKING, REDIRECT, BCAST, PREFIX, OPTIN, CACHING,
NOLOOP ;

public final byte[] raw;

Expand Down
85 changes: 85 additions & 0 deletions src/main/java/redis/clients/jedis/RedisClientCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package redis.clients.jedis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.util.SafeEncoder;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* This class is used to manage the client side caching service
* This class starts a new thread to listen to invalitation event RESP2
*/
public class RedisClientCache {

protected static Logger log = LoggerFactory.getLogger(RedisClientCache.class.getName());

public final static String INVALIDATION_CHANNEL = "__redis__:invalidate";

private Map<String, Object> simpleCache = new HashMap();

// TODO : need to check what is the best way to get it from pool/configuration
BinaryJedis jedis = null; // the connection to Redis
Long clientId = null;

/**
* Create a new client cache
* For RESP2, the application must create a new connection on the same instances
* to get the Pub/Sub on invalidate channel
* @param client
*/
public RedisClientCache(BinaryJedis jedis) {
this.jedis = jedis;
clientId = jedis.clientId();
this.startInvalidationListener();
}

public Long getRedisCacheClientId(){
if (jedis != null && clientId == null) {
clientId = jedis.clientId();
}
return clientId;
}

/**
* RESP2 Create a new thread that will listen to invalidation event
*
*/
private void startInvalidationListener() {
new Thread(new Runnable() {

@Override
public void run() {

jedis.subscribe(new BinaryJedisPubSub() {



@Override
public void onMessage(byte[] channel, List<byte[]> messages) {
log.info("Invalidate cache value for {} ", SafeEncoder.encode(messages));
// iterate on each and remove it from the cache
for (byte[] key : messages) {
simpleCache.remove(SafeEncoder.encode(key)); // TODO : cleanup
}
}

}, SafeEncoder.encode(INVALIDATION_CHANNEL));

}
}, "ClientCacheThread-"+ clientId).start();


}

public void put(String key, Object value) {
simpleCache.put(key, value);
}

public Object get(String key) {
return simpleCache.get(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import java.util.List;

import redis.clients.jedis.AccessControlUser;
import redis.clients.jedis.BinaryJedis;
import redis.clients.jedis.Connection;
import redis.clients.jedis.params.ClientTrackingParams;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.ClientKillParams;

Expand Down Expand Up @@ -48,6 +51,9 @@ public interface AdvancedBinaryJedisCommands {

Long clientId();

// TODO check the best way to pass the client side caching connection
String clientTracking(boolean enabled, BinaryJedis jedis, ClientTrackingParams params);

byte[] memoryDoctorBinary();

byte[] aclWhoAmIBinary();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.List;

import redis.clients.jedis.AccessControlUser;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.ClientTrackingParams;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.ClientKillParams;
import redis.clients.jedis.util.Slowlog;
Expand Down Expand Up @@ -48,6 +50,9 @@ public interface AdvancedJedisCommands {

Long clientId();

// TODO check the best way to pass the client side caching connection
String clientTracking(boolean enabled, Jedis jedis, ClientTrackingParams params);

String memoryDoctor();

String aclWhoAmI();
Expand Down
Loading