Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support INCR argument to ZADD command #2415

Merged
merged 4 commits into from
Mar 11, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into zaddincr
sazzad16 authored Mar 10, 2021
commit b5e176d52ce033ecf4e5e21565ff6ce09be7839c
108 changes: 55 additions & 53 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
@@ -123,7 +123,6 @@ public void setPassword(final String password) {
this.password = password;
}


/**
* This method should be called only after a successful SELECT command.
* @param db
@@ -1494,33 +1493,34 @@ public void hstrlen(final byte[] key, final byte[] field) {
sendCommand(HSTRLEN, key, field);
}

public void xadd(final byte[] key, final byte[] id, final Map<byte[], byte[]> hash, long maxLen, boolean approximateLength) {
int maxLexArgs = 0;
if(maxLen < Long.MAX_VALUE) { // optional arguments
if(approximateLength) {
maxLexArgs = 3; // e.g. MAXLEN ~ 1000
} else {
maxLexArgs = 2; // e.g. MAXLEN 1000
}
public void xadd(final byte[] key, final byte[] id, final Map<byte[], byte[]> hash, long maxLen,
boolean approximateLength) {
int maxLexArgs = 0;
if (maxLen < Long.MAX_VALUE) { // optional arguments
if (approximateLength) {
maxLexArgs = 3; // e.g. MAXLEN ~ 1000
} else {
maxLexArgs = 2; // e.g. MAXLEN 1000
}
}

final byte[][] params = new byte[2 + maxLexArgs + hash.size() * 2][];
int index = 0;
params[index++] = key;
if (maxLen < Long.MAX_VALUE) {
params[index++] = Keyword.MAXLEN.getRaw();
if (approximateLength) {
params[index++] = Protocol.BYTES_TILDE;
}
params[index++] = toByteArray(maxLen);
}

final byte[][] params = new byte[2 + maxLexArgs + hash.size() * 2][];
int index = 0;
params[index++] = key;
if(maxLen < Long.MAX_VALUE) {
params[index++] = Keyword.MAXLEN.getRaw();
if(approximateLength) {
params[index++] = Protocol.BYTES_TILDE;
}
params[index++] = toByteArray(maxLen);
}

params[index++] = id;
for (final Entry<byte[], byte[]> entry : hash.entrySet()) {
params[index++] = entry.getKey();
params[index++] = entry.getValue();
}
sendCommand(XADD, params);
params[index++] = id;
for (final Entry<byte[], byte[]> entry : hash.entrySet()) {
params[index++] = entry.getKey();
params[index++] = entry.getValue();
}
sendCommand(XADD, params);
}

public void xlen(final byte[] key) {
@@ -1563,7 +1563,7 @@ public void xread(final int count, final long block, final Map<byte[], byte[]> s
}

sendCommand(XREAD, params);
}
}

public void xack(final byte[] key, final byte[] group, final byte[]... ids) {
final byte[][] params = new byte[2 + ids.length][];
@@ -1576,8 +1576,9 @@ public void xack(final byte[] key, final byte[] group, final byte[]... ids) {
sendCommand(XACK, params);
}

public void xgroupCreate(final byte[] key, final byte[] groupname, final byte[] id, boolean makeStream) {
if(makeStream) {
public void xgroupCreate(final byte[] key, final byte[] groupname, final byte[] id,
boolean makeStream) {
if (makeStream) {
sendCommand(XGROUP, Keyword.CREATE.getRaw(), key, groupname, id, Keyword.MKSTREAM.getRaw());
} else {
sendCommand(XGROUP, Keyword.CREATE.getRaw(), key, groupname, id);
@@ -1614,7 +1615,8 @@ public void xtrim(byte[] key, long maxLen, boolean approximateLength) {
}
}

public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map<byte[], byte[]> streams) {
public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck,
Map<byte[], byte[]> streams) {

int optional = 0;
if (count > 0) {
@@ -1627,7 +1629,6 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block,
optional += 1;
}


final byte[][] params = new byte[4 + optional + streams.size() * 2][];

int streamsIndex = 0;
@@ -1656,38 +1657,39 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block,
sendCommand(XREADGROUP, params);
}


public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) {
if(consumername == null) {
public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count,
byte[] consumername) {
if (consumername == null) {
sendCommand(XPENDING, key, groupname, start, end, toByteArray(count));
} else {
sendCommand(XPENDING, key, groupname, start, end, toByteArray(count), consumername);
}
}

public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids) {
public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[][] ids) {

ArrayList<byte[]> arguments = new ArrayList<>(10 + ids.length);
ArrayList<byte[]> arguments = new ArrayList<>(10 + ids.length);

arguments.add(key);
arguments.add(groupname);
arguments.add(consumername);
arguments.add(toByteArray(minIdleTime));
arguments.add(key);
arguments.add(groupname);
arguments.add(consumername);
arguments.add(toByteArray(minIdleTime));

Collections.addAll(arguments, ids);
Collections.addAll(arguments, ids);

if(newIdleTime > 0) {
arguments.add(Keyword.IDLE.getRaw());
arguments.add(toByteArray(newIdleTime));
}
if(retries > 0) {
arguments.add(Keyword.RETRYCOUNT.getRaw());
arguments.add(toByteArray(retries));
}
if(force) {
arguments.add(Keyword.FORCE.getRaw());
}
sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][]));
if (newIdleTime > 0) {
arguments.add(Keyword.IDLE.getRaw());
arguments.add(toByteArray(newIdleTime));
}
if (retries > 0) {
arguments.add(Keyword.RETRYCOUNT.getRaw());
arguments.add(toByteArray(retries));
}
if (force) {
arguments.add(Keyword.FORCE.getRaw());
}
sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][]));
}

public void xinfoStream(byte[] key) {
7 changes: 4 additions & 3 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
@@ -3346,8 +3346,9 @@ public String configResetStat() {
/**
* The CONFIG REWRITE command rewrites the redis.conf file the server was started with, applying
* the minimal changes needed to make it reflect the configuration currently used by the server,
* which may be different compared to the original one because of the use of the CONFIG SET command.
*
* which may be different compared to the original one because of the use of the CONFIG SET
* command.
* <p>
* The rewrite is performed in a very conservative way:
* <ul>
* <li>Comments and the overall structure of the original redis.conf are preserved as much as
@@ -3362,7 +3363,7 @@ public String configResetStat() {
* the current configuration has fewer or none as you disabled RDB persistence, all the lines will
* be blanked.</li>
* </ul>
*
* <p>
* CONFIG REWRITE is also able to rewrite the configuration file from scratch if the original one
* no longer exists for some reason. However if the server was started without a configuration
* file at all, the CONFIG REWRITE will just return an error.
14 changes: 8 additions & 6 deletions src/main/java/redis/clients/jedis/Client.java
Original file line number Diff line number Diff line change
@@ -1354,7 +1354,7 @@ public void xack(final String key, final String group, final StreamEntryID... id
final byte[][] bids = new byte[ids.length][];
for (int i = 0; i < ids.length; ++i) {
StreamEntryID id = ids[i];
bids[i] = SafeEncoder.encode(id==null ? "0-0" : id.toString());
bids[i] = SafeEncoder.encode(id == null ? "0-0" : id.toString());
}
xack(SafeEncoder.encode(key), SafeEncoder.encode(group), bids);
}
@@ -1367,7 +1367,8 @@ public void xgroupCreate(String key, String groupname, StreamEntryID id, boolean

@Override
public void xgroupSetID(String key, String groupname, StreamEntryID id) {
xgroupSetID(SafeEncoder.encode(key), SafeEncoder.encode(groupname), SafeEncoder.encode(id==null ? "0-0" : id.toString()));
xgroupSetID(SafeEncoder.encode(key), SafeEncoder.encode(groupname),
SafeEncoder.encode(id == null ? "0-0" : id.toString()));
}

@Override
@@ -1377,15 +1378,16 @@ public void xgroupDestroy(String key, String groupname) {

@Override
public void xgroupDelConsumer(String key, String groupname, String consumerName) {
xgroupDelConsumer(SafeEncoder.encode(key), SafeEncoder.encode(groupname), SafeEncoder.encode(consumerName));
xgroupDelConsumer(SafeEncoder.encode(key), SafeEncoder.encode(groupname),
SafeEncoder.encode(consumerName));
}

@Override
public void xdel(final String key, final StreamEntryID... ids) {
final byte[][] bids = new byte[ids.length][];
for (int i = 0; i < ids.length; ++i) {
StreamEntryID id = ids[i];
bids[i] = SafeEncoder.encode(id==null ? "0-0" : id.toString());
bids[i] = SafeEncoder.encode(id == null ? "0-0" : id.toString());
}
xdel(SafeEncoder.encode(key), bids);
}
@@ -1413,8 +1415,8 @@ public void xpending(String key, String groupname, StreamEntryID start, StreamEn
}

@Override
public void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries,
boolean force, StreamEntryID... ids) {
public void xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {

final byte[][] bids = new byte[ids.length][];
for (int i = 0; i < ids.length; i++) {
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
@@ -4121,7 +4121,7 @@ public List<StreamPendingEntry> xpending(final String key, final String groupnam
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
checkIsInMultiOrPipeline();
client.xclaim( key, group, consumername, minIdleTime, newIdleTime, retries, force, ids);
client.xclaim(key, group, consumername, minIdleTime, newIdleTime, retries, force, ids);

return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply());
}
8 changes: 4 additions & 4 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
@@ -279,10 +279,10 @@ public static enum Keyword {
AGGREGATE, ALPHA, ASC, BY, DESC, GET, LIMIT, MESSAGE, NO, NOSORT, PMESSAGE, PSUBSCRIBE,
PUNSUBSCRIBE, OK, ONE, QUEUED, SET, STORE, SUBSCRIBE, UNSUBSCRIBE, WEIGHTS, WITHSCORES,
RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, KILL, LEN, REFCOUNT, ENCODING, IDLETIME,
GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR,
BLOCK, NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP,
ID, IDLE, TIME, RETRYCOUNT, FORCE, USAGE, SAMPLES, STREAM, GROUPS, CONSUMERS, HELP, FREQ,
SETUSER, GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS, LOG, INCR;
GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR, BLOCK,
NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, ID, IDLE,
TIME, RETRYCOUNT, FORCE, USAGE, SAMPLES, STREAM, GROUPS, CONSUMERS, HELP, FREQ, SETUSER,
GETUSER, DELUSER, WHOAMI, CAT, GENPASS, USERS, LOG, INCR;

/**
* @deprecated This will be private in future. Use {@link #getRaw()}.
Original file line number Diff line number Diff line change
@@ -390,9 +390,9 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(byte[] key, byte[] member, dou
* @return lenth of the value for key
*/
Long hstrlen(byte[] key, byte[] field);


byte[] xadd(final byte[] key, final byte[] id, final Map<byte[], byte[]> hash, long maxLen, boolean approximateLength);
byte[] xadd(byte[] key, byte[] id, Map<byte[], byte[]> hash, long maxLen, boolean approximateLength);

Long xlen(byte[] key);

@@ -408,17 +408,17 @@ default List<byte[]> xrange(byte[] key, byte[] start, byte[] end, long count) {

List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count);

Long xack(final byte[] key, final byte[] group, final byte[]... ids);

String xgroupCreate(final byte[] key, final byte[] consumer, final byte[] id, boolean makeStream);
Long xack(byte[] key, byte[] group, byte[]... ids);
String xgroupCreate(byte[] key, byte[] consumer, byte[] id, boolean makeStream);

String xgroupSetID(byte[] key, byte[] consumer, byte[] id);

Long xgroupDestroy(byte[] key, byte[] consumer);

Long xgroupDelConsumer(final byte[] key, final byte[] consumer, final byte[] consumerName);

Long xdel(final byte[] key, final byte[]... ids);
Long xgroupDelConsumer(byte[] key, byte[] consumer, byte[] consumerName);
Long xdel(byte[] key, byte[]... ids);

Long xtrim(byte[] key, long maxLen, boolean approximateLength);

You are viewing a condensed version of this merge commit. You can view the full changes here.