Skip to content

Commit

Permalink
Add Redis Streams (#1880)
Browse files Browse the repository at this point in the history
  • Loading branch information
gkorland authored Mar 26, 2019
1 parent 3aeb6d4 commit 421b631
Show file tree
Hide file tree
Showing 22 changed files with 2,161 additions and 13 deletions.
190 changes: 190 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1261,4 +1261,194 @@ public void bitfield(final byte[] key, final byte[]... value) {
public void hstrlen(final byte[] key, final byte[] field) {
sendCommand(HSTRLEN, key, field);
}

public void xadd(final byte[] key, final byte[] id, final Map<byte[], byte[]> hash, long maxLen, boolean approximateLength) {
int maxLexArgs = 0;
if(maxLen < Long.MAX_VALUE) { // optional arguments
if(approximateLength) {
maxLexArgs = 3; // e.g. MAXLEN ~ 1000
} else {
maxLexArgs = 2; // e.g. MAXLEN 1000
}
}

final byte[][] params = new byte[2 + maxLexArgs + hash.size() * 2][];
int index = 0;
params[index++] = key;
if(maxLen < Long.MAX_VALUE) {
params[index++] = Keyword.MAXLEN.raw;
if(approximateLength) {
params[index++] = Protocol.BYTES_TILDE;
}
params[index++] = toByteArray(maxLen);
}

params[index++] = id;
for (final Entry<byte[], byte[]> entry : hash.entrySet()) {
params[index++] = entry.getKey();
params[index++] = entry.getValue();
}
sendCommand(XADD, params);
}

public void xlen(final byte[] key) {
sendCommand(XLEN, key);
}

public void xrange(final byte[] key, final byte[] start, final byte[] end, final long count) {
sendCommand(XRANGE, key, start, end, Keyword.COUNT.raw, toByteArray(count));
}

public void xrevrange(final byte[] key, final byte[] end, final byte[] start, final int count) {
sendCommand(XREVRANGE, key, end, start, Keyword.COUNT.raw, toByteArray(count));
}

public void xread(final int count, final long block, final Map<byte[], byte[]> streams) {
final byte[][] params = new byte[3 + streams.size() * 2 + (block > 0 ? 2 : 0)][];

int streamsIndex = 0;
params[streamsIndex++] = Keyword.COUNT.raw;
params[streamsIndex++] = toByteArray(count);
if(block > 0) {
params[streamsIndex++] = Keyword.BLOCK.raw;
params[streamsIndex++] = toByteArray(block);
}

params[streamsIndex++] = Keyword.STREAMS.raw;
int idsIndex = streamsIndex + streams.size();

for (final Entry<byte[], byte[]> entry : streams.entrySet()) {
params[streamsIndex++] = entry.getKey();
params[idsIndex++] = entry.getValue();
}

sendCommand(XREAD, params);
}

public void xack(final byte[] key, final byte[] group, final byte[]... ids) {
final byte[][] params = new byte[2 + ids.length][];
int index = 0;
params[index++] = key;
params[index++] = group;
for (final byte[] id : ids) {
params[index++] = id;
}
sendCommand(XACK, params);
}

public void xgroupCreate(final byte[] key, final byte[] groupname, final byte[] id, boolean makeStream) {
if(makeStream) {
sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id, Keyword.MKSTREAM.raw);
} else {
sendCommand(XGROUP, Keyword.CREATE.raw, key, groupname, id);
}
}

public void xgroupSetID(final byte[] key, final byte[] groupname, final byte[] id) {
sendCommand(XGROUP, Keyword.SETID.raw, key, groupname, id);
}

public void xgroupDestroy(final byte[] key, final byte[] groupname) {
sendCommand(XGROUP, Keyword.DESTROY.raw, key, groupname);
}

public void xgroupDelConsumer(final byte[] key, final byte[] groupname, final byte[] consumerName) {
sendCommand(XGROUP, Keyword.DELCONSUMER.raw, key, groupname, consumerName);
}

public void xdel(final byte[] key, final byte[]... ids) {
final byte[][] params = new byte[1 + ids.length][];
int index = 0;
params[index++] = key;
for (final byte[] id : ids) {
params[index++] = id;
}
sendCommand(XDEL, params);
}

public void xtrim(byte[] key, long maxLen, boolean approximateLength) {
if(approximateLength) {
sendCommand(XTRIM, key, Keyword.MAXLEN.raw, Protocol.BYTES_TILDE ,toByteArray(maxLen));
} else {
sendCommand(XTRIM, key, Keyword.MAXLEN.raw, toByteArray(maxLen));
}
}

public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck, Map<byte[], byte[]> streams) {

int optional = 0;
if(count>0) {
optional += 2;
}
if(block > 0) {
optional += 2;
}
if(noAck) {
optional += 1;
}


final byte[][] params = new byte[4 + optional + streams.size() * 2][];

int streamsIndex = 0;
params[streamsIndex++] = Keyword.GROUP.raw;
params[streamsIndex++] = groupname;
params[streamsIndex++] = consumer;
if(count>0) {
params[streamsIndex++] = Keyword.COUNT.raw;
params[streamsIndex++] = toByteArray(count);
}
if(block > 0) {
params[streamsIndex++] = Keyword.BLOCK.raw;
params[streamsIndex++] = toByteArray(block);
}
if(noAck) {
params[streamsIndex++] = Keyword.NOACK.raw;
}
params[streamsIndex++] = Keyword.STREAMS.raw;

int idsIndex = streamsIndex + streams.size();
for (final Entry<byte[], byte[]> entry : streams.entrySet()) {
params[streamsIndex++] = entry.getKey();
params[idsIndex++] = entry.getValue();
}

sendCommand(XREADGROUP, params);
}


public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) {
if(consumername == null) {
sendCommand(XPENDING, key, groupname, start, end, toByteArray(count));
} else {
sendCommand(XPENDING, key, groupname, start, end, toByteArray(count), consumername);
}
}

public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids) {

ArrayList<byte[]> arguments = new ArrayList<>(10 + ids.length);

arguments.add(key);
arguments.add(groupname);
arguments.add(consumername);
arguments.add(toByteArray(minIdleTime));

for(byte[] id : ids) {
arguments.add(id);
}
if(newIdleTime > 0) {
arguments.add(Keyword.IDLE.raw);
arguments.add(toByteArray(newIdleTime));
}
if(retries > 0) {
arguments.add(Keyword.RETRYCOUNT.raw);
arguments.add(toByteArray(retries));
}
if(force) {
arguments.add(Keyword.FORCE.raw);
}
sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][]));
}

}
119 changes: 112 additions & 7 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public Set<byte[]> keys(final byte[] pattern) {
* Return a randomly selected key from the currently selected DB.
* <p>
* Time complexity: O(1)
* @return Singe line reply, specifically the randomly selected key or an empty string is the
* @return Single line reply, specifically the randomly selected key or an empty string is the
* database is empty
*/
@Override
Expand Down Expand Up @@ -2502,7 +2502,7 @@ protected Set<Tuple> getTupledSet() {
if (membersWithScores.isEmpty()) {
return Collections.emptySet();
}
Set<Tuple> set = new LinkedHashSet<Tuple>(membersWithScores.size() / 2, 1.0f);
Set<Tuple> set = new LinkedHashSet<>(membersWithScores.size() / 2, 1.0f);
Iterator<byte[]> iterator = membersWithScores.iterator();
while (iterator.hasNext()) {
set.add(new Tuple(iterator.next(), BuilderFactory.DOUBLE.build(iterator.next())));
Expand Down Expand Up @@ -3663,7 +3663,7 @@ public ScanResult<Map.Entry<byte[], byte[]>> hscan(final byte[] key, final byte[
while (iterator.hasNext()) {
results.add(new AbstractMap.SimpleEntry<byte[], byte[]>(iterator.next(), iterator.next()));
}
return new ScanResult<Map.Entry<byte[], byte[]>>(newcursor, results);
return new ScanResult<>(newcursor, results);
}

@Override
Expand All @@ -3678,7 +3678,7 @@ public ScanResult<byte[]> sscan(final byte[] key, final byte[] cursor, final Sca
List<Object> result = client.getObjectMultiBulkReply();
byte[] newcursor = (byte[]) result.get(0);
List<byte[]> rawResults = (List<byte[]>) result.get(1);
return new ScanResult<byte[]>(newcursor, rawResults);
return new ScanResult<>(newcursor, rawResults);
}

@Override
Expand All @@ -3692,13 +3692,13 @@ public ScanResult<Tuple> zscan(final byte[] key, final byte[] cursor, final Scan
client.zscan(key, cursor, params);
List<Object> result = client.getObjectMultiBulkReply();
byte[] newcursor = (byte[]) result.get(0);
List<Tuple> results = new ArrayList<Tuple>();
List<Tuple> results = new ArrayList<>();
List<byte[]> rawResults = (List<byte[]>) result.get(1);
Iterator<byte[]> iterator = rawResults.iterator();
while (iterator.hasNext()) {
results.add(new Tuple(iterator.next(), BuilderFactory.DOUBLE.build(iterator.next())));
}
return new ScanResult<Tuple>(newcursor, results);
return new ScanResult<>(newcursor, results);
}

@Override
Expand Down Expand Up @@ -3912,7 +3912,7 @@ public boolean retainAll(Collection<?> c) {
}

protected static <E> SetFromList<E> of(List<E> list) {
return new SetFromList<E>(list);
return new SetFromList<>(list);
}
}

Expand All @@ -3929,4 +3929,109 @@ public Long hstrlen(final byte[] key, final byte[] field) {
client.hstrlen(key, field);
return client.getIntegerReply();
}

@Override
public List<byte[]> xread(int count, long block, Map<byte[], byte[]> streams) {
checkIsInMultiOrPipeline();
client.xread(count, block, streams);
return client.getBinaryMultiBulkReply();
}

@Override
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck,
Map<byte[], byte[]> streams) {
checkIsInMultiOrPipeline();
client.xreadGroup(groupname, consumer, count, block, noAck, streams);
return client.getBinaryMultiBulkReply();
}

@Override
public byte[] xadd(byte[] key, byte[] id, Map<byte[], byte[]> hash, long maxLen, boolean approximateLength) {
checkIsInMultiOrPipeline();
client.xadd(key, id, hash, maxLen, approximateLength);
return client.getBinaryBulkReply();
}

@Override
public Long xlen(byte[] key) {
checkIsInMultiOrPipeline();
client.xlen(key);
return client.getIntegerReply();
}

@Override
public List<byte[]> xrange(byte[] key, byte[] start, byte[] end, long count) {
checkIsInMultiOrPipeline();
client.xrange(key, start, end, count);
return client.getBinaryMultiBulkReply();
}

@Override
public List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count) {
checkIsInMultiOrPipeline();
client.xrevrange(key, end, start, count);
return client.getBinaryMultiBulkReply();
}

@Override
public Long xack(byte[] key, byte[] group, byte[]... ids) {
checkIsInMultiOrPipeline();
client.xack(key, group, ids);
return client.getIntegerReply();
}

@Override
public String xgroupCreate(byte[] key, byte[] consumer, byte[] id, boolean makeStream) {
checkIsInMultiOrPipeline();
client.xgroupCreate(key, consumer, id, makeStream);
return client.getStatusCodeReply();
}

@Override
public String xgroupSetID(byte[] key, byte[] consumer, byte[] id) {
checkIsInMultiOrPipeline();
client.xgroupSetID(key, consumer, id);
return client.getStatusCodeReply();
}

@Override
public Long xgroupDestroy(byte[] key, byte[] consumer) {
checkIsInMultiOrPipeline();
client.xgroupDestroy(key, consumer);
return client.getIntegerReply();
}

@Override
public String xgroupDelConsumer(byte[] key, byte[] consumer, byte[] consumerName) {
checkIsInMultiOrPipeline();
client.xgroupDelConsumer(key, consumer, consumerName);
return client.getStatusCodeReply();
}

@Override
public Long xdel(byte[] key, byte[]... ids) {
checkIsInMultiOrPipeline();
client.xdel(key, ids);
return client.getIntegerReply();
}

@Override
public Long xtrim(byte[] key, long maxLen, boolean approximateLength) {
checkIsInMultiOrPipeline();
client.xtrim(key, maxLen, approximateLength);
return client.getIntegerReply();
}

@Override
public List<byte[]> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) {
checkIsInMultiOrPipeline();
client.xpending(key, groupname, start, end, count, consumername);
return client.getBinaryMultiBulkReply(); }

@Override
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids){
checkIsInMultiOrPipeline();
client.xclaim(key, groupname, consumername, minIdleTime, newIdleTime, retries, force, ids);
return client.getBinaryMultiBulkReply();
}
}
Loading

0 comments on commit 421b631

Please sign in to comment.