Skip to content

Commit

Permalink
XRead(Group) Params with allowing block=0
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Dec 7, 2020
1 parent 6b419d5 commit b7dcaea
Show file tree
Hide file tree
Showing 16 changed files with 491 additions and 113 deletions.
58 changes: 45 additions & 13 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import static redis.clients.jedis.Protocol.Keyword.WITHSCORES;
import static redis.clients.jedis.Protocol.Keyword.FREQ;
import static redis.clients.jedis.Protocol.Keyword.HELP;
import static redis.clients.jedis.Protocol.Keyword.COUNT;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -27,14 +26,7 @@
import javax.net.ssl.SSLSocketFactory;

import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.params.ClientKillParams;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.SafeEncoder;

public class BinaryClient extends Connection {
Expand Down Expand Up @@ -1457,10 +1449,28 @@ public void xread(final int count, final long block, final Map<byte[], byte[]> s
params[streamsIndex++] = entry.getKey();
params[idsIndex++] = entry.getValue();
}

sendCommand(XREAD, params);
}

}

public void xread(final XReadParams params, final Entry<byte[], byte[]>... streams) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;

final byte[][] args = new byte[paramLength + 1 + streams.length * 2][];
System.arraycopy(bparams, 0, args, 0, paramLength);

args[paramLength] = Keyword.STREAMS.raw;
int keyIndex = paramLength + 1;
int idsIndex = keyIndex + streams.length;
for (final Entry<byte[], byte[]> entry : streams) {
args[keyIndex++] = entry.getKey();
args[idsIndex++] = entry.getValue();
}

sendCommand(XREAD, args);
}

public void xack(final byte[] key, final byte[] group, final byte[]... ids) {
final byte[][] params = new byte[2 + ids.length][];
int index = 0;
Expand Down Expand Up @@ -1552,7 +1562,29 @@ public void xreadGroup(byte[] groupname, byte[] consumer, int count, long block,
sendCommand(XREADGROUP, params);
}


public void xreadGroup(byte[] groupname, byte[] consumer, final XReadGroupParams params, final Entry<byte[], byte[]>... streams) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;

final byte[][] args = new byte[3 + paramLength + 1 + streams.length * 2][];
int index = 0;
args[index++] = Keyword.GROUP.raw;
args[index++] = groupname;
args[index++] = consumer;
System.arraycopy(bparams, 0, args, index, paramLength);
index += paramLength;

args[index++] = Keyword.STREAMS.raw;
int keyIndex = index;
int idsIndex = keyIndex + streams.length;
for (final Entry<byte[], byte[]> entry : streams) {
args[keyIndex++] = entry.getKey();
args[idsIndex++] = entry.getValue();
}

sendCommand(XREADGROUP, args);
}

public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) {
if(consumername == null) {
sendCommand(XPENDING, key, groupname, start, end, toByteArray(count));
Expand Down
45 changes: 37 additions & 8 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import javax.net.ssl.HostnameVerifier;
Expand All @@ -29,14 +30,7 @@
import redis.clients.jedis.exceptions.InvalidURIException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.ClientKillParams;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.JedisByteHashMap;
import redis.clients.jedis.util.JedisURIHelper;

Expand Down Expand Up @@ -4247,6 +4241,23 @@ public List<byte[]> xread(int count, long block, Map<byte[], byte[]> streams) {
}
}

@Override
public List<byte[]> xread(XReadParams xReadParams, Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
client.xread(xReadParams, streams);

if (!xReadParams.hasBlock()) {
return client.getBinaryMultiBulkReply();
}

client.setTimeoutInfinite();
try {
return client.getBinaryMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}

@Override
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, long block, boolean noAck,
Map<byte[], byte[]> streams) {
Expand All @@ -4260,6 +4271,24 @@ public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, int count, lon
}
}

@Override
public List<byte[]> xreadGroup(byte[] groupname, byte[] consumer, XReadGroupParams xReadGroupParams,
Entry<byte[], byte[]>... streams) {
checkIsInMultiOrPipeline();
client.xreadGroup(groupname, consumer, xReadGroupParams, streams);

if (!xReadGroupParams.hasBlock()) {
return client.getBinaryMultiBulkReply();
}

client.setTimeoutInfinite();
try {
return client.getBinaryMultiBulkReply();
} finally {
client.rollbackTimeout();
}
}

@Override
public byte[] xadd(byte[] key, byte[] id, Map<byte[], byte[]> hash, long maxLen, boolean approximateLength) {
checkIsInMultiOrPipeline();
Expand Down
37 changes: 31 additions & 6 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@
import redis.clients.jedis.commands.JedisClusterBinaryScriptingCommands;
import redis.clients.jedis.commands.MultiKeyBinaryJedisClusterCommands;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.JedisClusterHashTagUtil;
import redis.clients.jedis.util.KeyMergeUtil;
import redis.clients.jedis.util.SafeEncoder;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
Expand Down Expand Up @@ -2243,6 +2239,16 @@ public List<byte[]> execute(Jedis connection) {
}.runBinary(keys.length, keys);
}

@Override
public List<byte[]> xread(final XReadParams xReadParams, final Entry<byte[], byte[]>... streams) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.xread(xReadParams, streams);
}
}.runBinary(streams.length, getKeys(streams));
}

@Override
public Long xack(final byte[] key, final byte[] group, final byte[]... ids) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
Expand Down Expand Up @@ -2307,6 +2313,17 @@ public List<byte[]> execute(Jedis connection) {
}.runBinary(keys.length, keys);
}

@Override
public List<byte[]> xreadGroup(final byte[] groupname, final byte[] consumer, final XReadGroupParams xReadGroupParams,
final Entry<byte[], byte[]>... streams) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxAttempts) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.xreadGroup(groupname, consumer, xReadGroupParams, streams);
}
}.runBinary(streams.length, getKeys(streams));
}

@Override
public Long xdel(final byte[] key, final byte[]... ids) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
Expand Down Expand Up @@ -2367,4 +2384,12 @@ public Object execute(Jedis connection){
}
}.runBinary(sampleKey);
}

private static byte[][] getKeys(final Entry<byte[], ?>... entries) {
byte[][] keys = new byte[entries.length][];
for (int i = 0; i < entries.length; i++) {
keys[i] = entries[i].getKey();
}
return keys;
}
}
28 changes: 27 additions & 1 deletion src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -643,7 +644,32 @@ public String toString() {
return "StreamEntryID";
}
};


public static final Builder<List<Map.Entry<String, List<StreamEntry>>>> STREAM_READ_RESPONSE
= new Builder<List<Map.Entry<String, List<StreamEntry>>>>() {
@Override
public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
if (data == null) {
return null;
}
List<Object> streams = (List<Object>) data;

List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(streams.size());
for (Object streamObj : streams) {
List<Object> stream = (List<Object>) streamObj;
String streamId = SafeEncoder.encode((byte[]) stream.get(0));
List<StreamEntry> streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1));
result.add(new AbstractMap.SimpleEntry<>(streamId, streamEntries));
}

return result;
}

@Override
public String toString() {
return "List<Entry<String, List<StreamEntry>>>";
}
};

public static final Builder<List<StreamEntry>> STREAM_ENTRY_LIST = new Builder<List<StreamEntry>>() {
@Override
Expand Down
55 changes: 46 additions & 9 deletions src/main/java/redis/clients/jedis/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@
import javax.net.ssl.SSLSocketFactory;

import redis.clients.jedis.commands.Commands;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.GeoRadiusStoreParam;
import redis.clients.jedis.params.MigrateParams;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.params.LPosParams;
import redis.clients.jedis.params.*;
import redis.clients.jedis.util.SafeEncoder;

public class Client extends BinaryClient implements Commands {
Expand Down Expand Up @@ -1292,7 +1286,26 @@ public void xread(final int count, final long block, final Entry<String, StreamE
}
xread(count, block, bhash);
}


@Override
public void xread(final XReadParams params, final Map<String, StreamEntryID> streams) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;

final byte[][] args = new byte[paramLength + 1 + streams.size() * 2][];
System.arraycopy(bparams, 0, args, 0, paramLength);

args[paramLength] = Protocol.Keyword.STREAMS.raw;
int keyIndex = paramLength + 1;
int idsIndex = keyIndex + streams.size();
for (Entry<String, StreamEntryID> entry : streams.entrySet()) {
args[keyIndex++] = SafeEncoder.encode(entry.getKey());
args[idsIndex++] = SafeEncoder.encode(entry.getValue().toString());
}

sendCommand(Protocol.Command.XREAD, args);
}

@Override
public void xack(final String key, final String group, final StreamEntryID... ids) {
final byte[][] bids = new byte[ids.length][];
Expand Down Expand Up @@ -1347,6 +1360,30 @@ public void xreadGroup(String groupname, String consumer, int count, long block,
xreadGroup(SafeEncoder.encode(groupname), SafeEncoder.encode(consumer), count, block, noAck, bhash);
}

@Override
public void xreadGroup(String groupname, String consumer, XReadGroupParams params, Map<String, StreamEntryID> streams) {
final byte[][] bparams = params.getByteParams();
final int paramLength = bparams.length;

final byte[][] args = new byte[3 + paramLength + 1 + streams.size() * 2][];
int index = 0;
args[index++] = Protocol.Keyword.GROUP.raw;
args[index++] = SafeEncoder.encode(groupname);
args[index++] = SafeEncoder.encode(consumer);
System.arraycopy(bparams, 0, args, index, paramLength);
index += paramLength;

args[index++] = Protocol.Keyword.STREAMS.raw;
int keyIndex = index;
int idsIndex = keyIndex + streams.size();
for (Entry<String, StreamEntryID> entry : streams.entrySet()) {
args[keyIndex++] = SafeEncoder.encode(entry.getKey());
args[idsIndex++] = SafeEncoder.encode(entry.getValue().toString());
}

sendCommand(Protocol.Command.XREADGROUP, args);
}

@Override
public void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername) {
xpending(SafeEncoder.encode(key), SafeEncoder.encode(groupname), SafeEncoder.encode(start==null ? "-" : start.toString()),
Expand Down Expand Up @@ -1384,5 +1421,5 @@ public void xinfoConsumers(String key, String group) {
xinfoConsumers(SafeEncoder.encode(key),SafeEncoder.encode(group));

}

}
Loading

0 comments on commit b7dcaea

Please sign in to comment.