Skip to content

Commit

Permalink
Support automatic namespacing (#3781)
Browse files Browse the repository at this point in the history
* Proof-of-concept for automatic key prefixing

* Iteration on key-prefixing POC

- Demonstrated automatic key-prefixing for all subclasses of
  UnifiedJedis: JedisCluster, JedisPooled, and JedisSentineled
- Key-prefixing is possible as long as the underlying CommandObjects can
  be customized.
- CommandObjects cannot use commandArguments in its constructor since
  in the specific case of key-prefixing, commandArguments depends on the
  child constructor running first. So we lose caching of argument-less
  CommandObjects.
- Based on this POC, the minimum changes required to jedis would be:
  - public constructors that allow UnifiedJedis and its subclasses to
    take a custom CommandObjects.
  - Consistent use of supplied CommandObjects throughout code (e.g. in
    Pipeline, Transaction, etc).
  - Removal of caching of argument-less CommandObjects in the
    constructor of CommandObjects.
- Applications can then supply CommandObjects with custom behavior as
  necessary. Sample classes that implement the behavior of prefixed keys,
  etc are provided but these can be supplied by the application as long
  as required constructors are available.

* Second iteration on key-prefixing POC

- Restore cached key-less commands in CommandObjects
- Support Transactions
- New constructors do not take CommandExecutor
- Requested JavaDoc regarding new constructors specifying RedisProtocol
- New classes moved into 'prefix' packages
- De-duplicate prefixing code

* Support automatic key prefixing by handler interface

---------

Co-authored-by: R-J Lim <rj@n3twork.com>
  • Loading branch information
sazzad16 and R-J Lim authored Aug 21, 2024
1 parent 5570879 commit 35505f1
Show file tree
Hide file tree
Showing 17 changed files with 316 additions and 28 deletions.
1 change: 1 addition & 0 deletions src/main/java/redis/clients/jedis/AbstractTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

public abstract class AbstractTransaction extends PipeliningBase implements Closeable {

@Deprecated
protected AbstractTransaction() {
super(new CommandObjects());
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/redis/clients/jedis/ClusterCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ public class ClusterCommandObjects extends CommandObjects {

@Override
protected ClusterCommandArguments commandArguments(ProtocolCommand command) {
return new ClusterCommandArguments(command);
ClusterCommandArguments comArgs = new ClusterCommandArguments(command);
if (keyPreProcessor != null) comArgs.setKeyArgumentPreProcessor(keyPreProcessor);
return comArgs;
}

private static final String CLUSTER_UNSUPPORTED_MESSAGE = "Not supported in cluster mode.";
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/redis/clients/jedis/CommandArguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collection;
import java.util.Iterator;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.args.RawableFactory;
import redis.clients.jedis.commands.ProtocolCommand;
Expand All @@ -13,6 +14,7 @@

public class CommandArguments implements Iterable<Rawable> {

private CommandKeyArgumentPreProcessor keyPreProc = null;
private final ArrayList<Rawable> args;

private boolean blocking;
Expand All @@ -30,6 +32,11 @@ public ProtocolCommand getCommand() {
return (ProtocolCommand) args.get(0);
}

@Experimental
void setKeyArgumentPreProcessor(CommandKeyArgumentPreProcessor keyPreProcessor) {
this.keyPreProc = keyPreProcessor;
}

public CommandArguments add(Rawable arg) {
args.add(arg);
return this;
Expand Down Expand Up @@ -100,6 +107,10 @@ public CommandArguments addObjects(Collection args) {
}

public CommandArguments key(Object key) {
if (keyPreProc != null) {
key = keyPreProc.actualKey(key);
}

if (key instanceof Rawable) {
Rawable raw = (Rawable) key;
processKey(raw.getRaw());
Expand All @@ -115,6 +126,7 @@ public CommandArguments key(Object key) {
} else {
throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}

return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package redis.clients.jedis;

import redis.clients.jedis.annots.Experimental;

@Experimental
public interface CommandKeyArgumentPreProcessor {

/**
* @param paramKey key name in application
* @return key name in Redis server
*/
Object actualKey(Object paramKey);
}
22 changes: 20 additions & 2 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.args.*;
import redis.clients.jedis.bloom.*;
import redis.clients.jedis.bloom.RedisBloomProtocol.*;
Expand Down Expand Up @@ -52,18 +53,25 @@ protected RedisProtocol getProtocol() {
return protocol;
}

protected volatile CommandKeyArgumentPreProcessor keyPreProcessor = null;
private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;
private Lock mapperLock = new ReentrantLock(true);
private volatile JsonObjectMapper jsonObjectMapper;
private final AtomicInteger searchDialect = new AtomicInteger(0);

private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;
@Experimental
void setKeyArgumentPreProcessor(CommandKeyArgumentPreProcessor keyPreProcessor) {
this.keyPreProcessor = keyPreProcessor;
}

void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig config) {
this.broadcastAndRoundRobinConfig = config;
}

protected CommandArguments commandArguments(ProtocolCommand command) {
return new CommandArguments(command);
CommandArguments comArgs = new CommandArguments(command);
if (keyPreProcessor != null) comArgs.setKeyArgumentPreProcessor(keyPreProcessor);
return comArgs;
}

private final CommandObject<String> PING_COMMAND_OBJECT = new CommandObject<>(commandArguments(PING), BuilderFactory.STRING);
Expand Down Expand Up @@ -4424,6 +4432,16 @@ public final CommandObject<Object> tFunctionCallAsync(String library, String fun
}
// RedisGears commands

// Transaction commands
public final CommandObject<String> watch(String... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.STRING);
}

public final CommandObject<String> watch(byte[]... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.STRING);
}
// Transaction commands

/**
* Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with
* default implementation will be created and returned.
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,23 @@ public Pipeline(Connection connection) {
}

public Pipeline(Connection connection, boolean closeConnection) {
super(new CommandObjects());
this(connection, closeConnection, createCommandObjects(connection));
}

private static CommandObjects createCommandObjects(Connection connection) {
CommandObjects commandObjects = new CommandObjects();
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

Pipeline(Connection connection, boolean closeConnection, CommandObjects commandObjects) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
RedisProtocol proto = this.connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
}

@Override
Expand Down
37 changes: 29 additions & 8 deletions src/main/java/redis/clients/jedis/ReliableTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static redis.clients.jedis.Protocol.Command.EXEC;
import static redis.clients.jedis.Protocol.Command.MULTI;
import static redis.clients.jedis.Protocol.Command.UNWATCH;
import static redis.clients.jedis.Protocol.Command.WATCH;

import java.util.ArrayList;
import java.util.LinkedList;
Expand All @@ -17,8 +16,7 @@
import redis.clients.jedis.graph.GraphCommandObjects;

/**
* ReliableTransaction is a transaction where commands are immediately sent to Redis server and the
* 'QUEUED' reply checked.
* A transaction where commands are immediately sent to Redis server and the {@code QUEUED} reply checked.
*/
public class ReliableTransaction extends TransactionBase {

Expand Down Expand Up @@ -66,12 +64,37 @@ public ReliableTransaction(Connection connection, boolean doMulti) {
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public ReliableTransaction(Connection connection, boolean doMulti, boolean closeConnection) {
this(connection, doMulti, closeConnection, createCommandObjects(connection));
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param commandObjects command objects
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
ReliableTransaction(Connection connection, boolean doMulti, boolean closeConnection, CommandObjects commandObjects) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
if (doMulti) multi();
}

private static CommandObjects createCommandObjects(Connection connection) {
CommandObjects commandObjects = new CommandObjects();
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

@Override
public final void multi() {
connection.sendCommand(MULTI);
Expand All @@ -84,16 +107,14 @@ public final void multi() {

@Override
public String watch(final String... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}

@Override
public String watch(final byte[]... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/redis/clients/jedis/ShardedCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public ShardedCommandObjects(Hashing algo, Pattern tagPattern) {

@Override
protected ShardedCommandArguments commandArguments(ProtocolCommand command) {
return new ShardedCommandArguments(algo, tagPattern, command);
ShardedCommandArguments comArgs = new ShardedCommandArguments(algo, tagPattern, command);
if (keyPreProcessor != null) comArgs.setKeyArgumentPreProcessor(keyPreProcessor);
return comArgs;
}

@Override
Expand Down
38 changes: 30 additions & 8 deletions src/main/java/redis/clients/jedis/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static redis.clients.jedis.Protocol.Command.EXEC;
import static redis.clients.jedis.Protocol.Command.MULTI;
import static redis.clients.jedis.Protocol.Command.UNWATCH;
import static redis.clients.jedis.Protocol.Command.WATCH;

import java.util.ArrayList;
import java.util.LinkedList;
Expand All @@ -16,7 +15,7 @@
import redis.clients.jedis.graph.GraphCommandObjects;

/**
* A pipeline based transaction.
* A transaction based on <a href="https://redis.io/docs/manual/pipelining/">pipelining</a>.
*/
public class Transaction extends TransactionBase {

Expand Down Expand Up @@ -59,7 +58,7 @@ public Transaction(Connection connection) {
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
public Transaction(Connection connection, boolean doMulti) {
this(connection, doMulti, false);
this(connection, doMulti, false, createCommandObjects(connection));
}

/**
Expand All @@ -73,12 +72,37 @@ public Transaction(Connection connection, boolean doMulti) {
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public Transaction(Connection connection, boolean doMulti, boolean closeConnection) {
this(connection, doMulti, closeConnection, createCommandObjects(connection));
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param commandObjects command objects
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
Transaction(Connection connection, boolean doMulti, boolean closeConnection, CommandObjects commandObjects) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
if (doMulti) multi();
}

private static CommandObjects createCommandObjects(Connection connection) {
CommandObjects commandObjects = new CommandObjects();
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

@Override
public final void multi() {
connection.sendCommand(MULTI);
Expand All @@ -88,16 +112,14 @@ public final void multi() {

@Override
public String watch(final String... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}

@Override
public String watch(final byte[]... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@Deprecated
public abstract class TransactionBase extends AbstractTransaction {

@Deprecated
protected TransactionBase() {
super();
}
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -5019,7 +5019,7 @@ public PipelineBase pipelined() {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider, commandObjects);
} else {
return new Pipeline(provider.getConnection(), true);
return new Pipeline(provider.getConnection(), true, commandObjects);
}
}

Expand All @@ -5040,7 +5040,7 @@ public AbstractTransaction transaction(boolean doMulti) {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider, doMulti, commandObjects);
} else {
return new Transaction(provider.getConnection(), doMulti, true);
return new Transaction(provider.getConnection(), doMulti, true, commandObjects);
}
}

Expand Down Expand Up @@ -5084,6 +5084,11 @@ public Object executeCommand(CommandArguments args) {
return executeCommand(new CommandObject<>(args, BuilderFactory.RAW_OBJECT));
}

@Experimental
public void setKeyArgumentPreProcessor(CommandKeyArgumentPreProcessor keyPreProcessor) {
this.commandObjects.setKeyArgumentPreProcessor(keyPreProcessor);
}

public void setJsonObjectMapper(JsonObjectMapper jsonObjectMapper) {
this.commandObjects.setJsonObjectMapper(jsonObjectMapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public final void multi() {
*/
@Override
public final String watch(String... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand All @@ -106,7 +106,7 @@ public final String watch(String... keys) {
*/
@Override
public final String watch(byte[]... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand Down
Loading

0 comments on commit 35505f1

Please sign in to comment.