diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 39be157f08..78d68c9eb5 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1455,4 +1455,21 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][])); } + public void xinfoStream(byte[] key) { + + sendCommand(XINFO,Keyword.STREAM.raw,key); + + } + + public void xinfoGroup(byte[] key) { + + sendCommand(XINFO,Keyword.GROUPS.raw,key); + + } + + public void xinfoConsumers (byte[] key, byte[] group) { + + sendCommand(XINFO,Keyword.CONSUMERS.raw,key,group); + } + } diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 78ef561ea5..d5afbc9661 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -4051,6 +4051,30 @@ public Object sendCommand(ProtocolCommand cmd, byte[]... args) { return client.getOne(); } + @Override + public StreamInfo xinfoStream(byte[] key) { + checkIsInMultiOrPipeline(); + client.xinfoStream(key); + + return BuilderFactory.STREAM_INFO.build(client.getOne()); + + } + + @Override + public List xinfoGroup (byte[] key) { + checkIsInMultiOrPipeline(); + client.xinfoGroup(key); + + return BuilderFactory.STREAM_GROUP_INFO_LIST.build(client.getBinaryMultiBulkReply()); + } + @Override + public List xinfoConsumers (byte[] key, byte[] group) { + checkIsInMultiOrPipeline(); + client.xinfoConsumers(key,group); + + return BuilderFactory.STREAM_CONSUMERS_INFO_LIST.build(client.getBinaryMultiBulkReply()); + } + public Object sendCommand(ProtocolCommand cmd) { return sendCommand(cmd, dummyArray); } diff --git a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java index 9cd5c61829..6c0ffd1e70 100644 --- a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java @@ -1039,6 +1039,24 @@ public List xclaim(byte[] key, byte[] groupname, byte[] consumername, lo return j.xclaim(key, groupname, consumername, minIdleTime, newIdleTime, retries, force, ids); } + @Override + public StreamInfo xinfoStream(byte[] key) { + Jedis j = getShard(key); + return j.xinfoStream(key); + } + + @Override + public List xinfoGroup(byte[] key) { + Jedis j = getShard(key); + return j.xinfoGroup(key); + } + + @Override + public List xinfoConsumers(byte[] key, byte[] group) { + Jedis j = getShard(key); + return j.xinfoConsumers(key, group); + } + public Object sendCommand(ProtocolCommand cmd, byte[]... args) { // default since no sample key provided in JedisCommands interface byte[] sampleKey = args.length > 0 ? args[0] : cmd.getRaw(); diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index ac03f44c34..e991192528 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -514,7 +514,7 @@ public List build(Object data) { Iterator hashIterator = hash.iterator(); Map map = new HashMap<>(hash.size()/2); while(hashIterator.hasNext()) { - map.put(SafeEncoder.encode((byte[])hashIterator.next()), SafeEncoder.encode((byte[])hashIterator.next())); + map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next())); } responses.add(new StreamEntry(entryID, map)); } @@ -527,6 +527,41 @@ public String toString() { return "List"; } }; + + public static final Builder STREAM_ENTRY = new Builder() { + @Override + @SuppressWarnings("unchecked") + public StreamEntry build(Object data) { + if (null == data) { + return null; + } + List objectList = (List) data; + + if (objectList.isEmpty()) { + return null; + } + + String entryIdString = SafeEncoder.encode((byte[]) objectList.get(0)); + StreamEntryID entryID = new StreamEntryID(entryIdString); + List hash = (List) objectList.get(1); + + Iterator hashIterator = hash.iterator(); + Map map = new HashMap<>(hash.size() / 2); + while (hashIterator.hasNext()) { + map.put(SafeEncoder.encode(hashIterator.next()), + SafeEncoder.encode(hashIterator.next())); + } + StreamEntry streamEntry = new StreamEntry(entryID, map); + + + return streamEntry; + } + + @Override + public String toString() { + return "StreamEntry"; + } + }; public static final Builder> STREAM_PENDING_ENTRY_LIST = new Builder>() { @Override @@ -555,6 +590,139 @@ public String toString() { } }; + public static final Builder STREAM_INFO = new Builder() { + + + Map mappingFunctions = createDecoderMap(); + + private Map createDecoderMap() { + + Map tempMappingFunctions = new HashMap<>(); + tempMappingFunctions.put(StreamInfo.LAST_GENERATED_ID,STREAM_ENTRY_ID); + tempMappingFunctions.put(StreamInfo.FIRST_ENTRY,STREAM_ENTRY); + tempMappingFunctions.put(StreamInfo.LENGTH, LONG); + tempMappingFunctions.put(StreamInfo.RADIX_TREE_KEYS, LONG); + tempMappingFunctions.put(StreamInfo.RADIX_TREE_NODES, LONG); + tempMappingFunctions.put(StreamInfo.LAST_ENTRY,STREAM_ENTRY); + tempMappingFunctions.put(StreamInfo.GROUPS, LONG); + + return tempMappingFunctions; + } + + @Override + @SuppressWarnings("unchecked") + public StreamInfo build(Object data) { + if (null == data) { + return null; + } + + List streamsEntries = (List)data; + Iterator iterator = streamsEntries.iterator(); + + StreamInfo streamInfo = new StreamInfo( + createMapFromDecodingFunctions(iterator,mappingFunctions)); + return streamInfo; + } + + @Override + public String toString() { + return "StreamInfo"; + } + }; + + public static final Builder> STREAM_GROUP_INFO_LIST = new Builder>() { + + Map mappingFunctions = createDecoderMap(); + + private Map createDecoderMap() { + + Map tempMappingFunctions = new HashMap<>(); + tempMappingFunctions.put(StreamGroupInfo.NAME,STRING); + tempMappingFunctions.put(StreamGroupInfo.CONSUMERS, LONG); + tempMappingFunctions.put(StreamGroupInfo.PENDING, LONG); + tempMappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STREAM_ENTRY_ID); + + return tempMappingFunctions; + } + + @Override + @SuppressWarnings("unchecked") + public List build(Object data) { + if (null == data) { + return null; + } + + List list = new ArrayList<>(); + List streamsEntries = (List)data; + Iterator groupsArray = streamsEntries.iterator(); + + while (groupsArray.hasNext()) { + + List groupInfo = (List) groupsArray.next(); + + Iterator groupInfoIterator = groupInfo.iterator(); + + StreamGroupInfo streamGroupInfo = new StreamGroupInfo( + createMapFromDecodingFunctions(groupInfoIterator,mappingFunctions)); + list.add(streamGroupInfo); + + } + return list; + + } + + @Override + public String toString() { + return "List"; + } + }; + + public static final Builder> STREAM_CONSUMERS_INFO_LIST = new Builder>() { + + Map mappingFunctions = createDecoderMap(); + + private Map createDecoderMap() { + Map tempMappingFunctions = new HashMap<>(); + tempMappingFunctions.put(StreamConsumersInfo.NAME,STRING); + tempMappingFunctions.put(StreamConsumersInfo.IDLE, LONG); + tempMappingFunctions.put(StreamGroupInfo.PENDING, LONG); + tempMappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STRING); + return tempMappingFunctions; + + } + + @Override + @SuppressWarnings("unchecked") + public List build(Object data) { + if (null == data) { + return null; + } + + List list = new ArrayList<>(); + List streamsEntries = (List)data; + Iterator groupsArray = streamsEntries.iterator(); + + while (groupsArray.hasNext()) { + + List groupInfo = (List) groupsArray.next(); + + Iterator consumerInfoIterator = groupInfo.iterator(); + + StreamConsumersInfo streamGroupInfo = new StreamConsumersInfo( + createMapFromDecodingFunctions(consumerInfoIterator,mappingFunctions)); + list.add(streamGroupInfo); + + } + return list; + + } + + @Override + public String toString() { + return "List"; + } + }; + public static final Builder OBJECT = new Builder() { @Override public Object build(Object data) { @@ -572,4 +740,28 @@ private BuilderFactory() { throw new InstantiationError( "Must not instantiate this class" ); } + private static Map createMapFromDecodingFunctions( Iterator iterator, Map mappingFunctions) { + + Map resultMap = new HashMap<>(); + while (iterator.hasNext()) { + + String mapKey = STRING.build(iterator.next()); + if (mappingFunctions.containsKey(mapKey)) { + resultMap.put(mapKey, mappingFunctions.get(mapKey).build(iterator.next())); + } else { //For future - if we don't find an element in our builder map + Object unknownData = iterator.next(); + for (Builder b:mappingFunctions.values()) { + try { + resultMap.put(mapKey,b.build(unknownData)); + break; + } catch (ClassCastException e) { + //We continue with next builder + + } + } + } + } + return resultMap; + } + } diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index 20193fb911..05e0526c4d 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -1262,5 +1262,25 @@ public void xclaim(String key, String group, String consumername, long minIdleTi xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername), minIdleTime, newIdleTime, retries, force, bids); } + @Override + public void xinfoStream(String key) { + + xinfoStream(SafeEncoder.encode(key)); + + } + + @Override + public void xinfoGroup(String key) { + + xinfoGroup(SafeEncoder.encode(key)); + + } + + @Override + public void xinfoConsumers(String key, String group) { + + xinfoConsumers(SafeEncoder.encode(key),SafeEncoder.encode(group)); + + } } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index e5e7aec9a2..535cb37a1b 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3830,6 +3830,30 @@ public List xclaim(String key, String group, String consumername, l return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply()); } + @Override + public StreamInfo xinfoStream(String key) { + client.xinfoStream(key); + + return BuilderFactory.STREAM_INFO.build(client.getObjectMultiBulkReply()); + + } + + @Override + public List xinfoGroup(String key) { + client.xinfoGroup(key); + + return BuilderFactory.STREAM_GROUP_INFO_LIST.build(client.getObjectMultiBulkReply()); + + } + + @Override + public List xinfoConsumers(String key, String group) { + client.xinfoConsumers(key,group); + + return BuilderFactory.STREAM_CONSUMERS_INFO_LIST.build(client.getObjectMultiBulkReply()); + + } + public Object sendCommand(ProtocolCommand cmd, String... args) { client.sendCommand(cmd, args); return client.getOne(); diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 130eb3f5c3..fb845782d9 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -259,7 +259,8 @@ public static enum Command implements ProtocolCommand { PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, PFADD, PFCOUNT, PFMERGE, READONLY, GEOADD, GEODIST, GEOHASH, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, MODULE, BITFIELD, HSTRLEN, TOUCH, SWAPDB, MEMORY, - XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM; + XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM, + XINFO; private final byte[] raw; @@ -279,7 +280,7 @@ public static enum Keyword { RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, KILL, LEN, REFCOUNT, ENCODING, IDLETIME, GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR, BLOCK, NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, - IDLE, TIME, RETRYCOUNT, FORCE; + IDLE, TIME, RETRYCOUNT, FORCE, STREAM, GROUPS, CONSUMERS; public final byte[] raw; diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 21e240f576..6a8585606b 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -1055,6 +1055,26 @@ public List xclaim(String key, String group, String consumername, l return j.xclaim(key, group, consumername, minIdleTime, newIdleTime, retries, force, ids); } + @Override + public StreamInfo xinfoStream(String key) { + + Jedis j = getShard(key); + return j.xinfoStream(key); + } + + @Override + public List xinfoGroup(String key) { + + Jedis j = getShard(key); + return j.xinfoGroup(key); + } + + @Override + public List xinfoConsumers(String key, String group){ + Jedis j = getShard(key); + return j.xinfoConsumers(key, group); + } + public Object sendCommand(ProtocolCommand cmd, String... args) { // default since no sample key provided in JedisCommands interface String sampleKey = args.length > 0 ? args[0] : cmd.toString(); diff --git a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java new file mode 100644 index 0000000000..c97d865492 --- /dev/null +++ b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java @@ -0,0 +1,55 @@ +package redis.clients.jedis; + +import java.util.Map; + +/** + * This class holds information about a consumer + * They can be access via getters. + * For future purpose there is also {@link #getConsumerInfo()} ()} method + * that returns a generic {@code Map} - in case where more info is returned from a server + * + */ +public class StreamConsumersInfo { + + public final static String NAME = "name"; + public final static String IDLE = "idle"; + public final static String PENDING = "pending"; + + private final String name; + private final long idle; + private final long pending; + private final Map consumerInfo; + + /** + * @param map contains key-value pairs with consumer info + * + */ + public StreamConsumersInfo(Map map) { + + consumerInfo = map; + name = (String) map.get(NAME); + idle = (long) map.get(IDLE); + pending = (long) map.get(PENDING); + + } + + public String getName() { + return name; + } + + public long getIdle() { + return idle; + } + + public long getPending() { + return pending; + } + + /** + * @return Generic map containing all key-value pairs returned by the server + */ + public Map getConsumerInfo() { + return consumerInfo; + } + +} diff --git a/src/main/java/redis/clients/jedis/StreamGroupInfo.java b/src/main/java/redis/clients/jedis/StreamGroupInfo.java new file mode 100644 index 0000000000..8a5d62a0ce --- /dev/null +++ b/src/main/java/redis/clients/jedis/StreamGroupInfo.java @@ -0,0 +1,64 @@ +package redis.clients.jedis; + +import java.io.Serializable; +import java.util.Map; + +/** + * This class holds information about a stream group + * They can be access via getters. + * For future purpose there is also {@link #getGroupInfo()} method + * that returns a generic {@code Map} - in case where more info is returned from a server + * + */ +public class StreamGroupInfo implements Serializable { + + public final static String NAME = "name"; + public final static String CONSUMERS = "consumers"; + public final static String PENDING = "pending"; + public final static String LAST_DELIVERED = "last-delivered-id"; + + + private final String name; + private final long consumers; + private final long pending; + private final StreamEntryID lastDeliveredId; + private final Map groupInfo; + + /** + * @param map contains key-value pairs with group info + * + */ + public StreamGroupInfo(Map map) { + + groupInfo = map; + name = (String) map.get(NAME); + consumers = (long) map.get(CONSUMERS); + pending = (long) map.get(PENDING); + lastDeliveredId = (StreamEntryID) map.get(LAST_DELIVERED); + + } + + public String getName() { + return name; + } + + public long getConsumers() { + return consumers; + } + + public long getPending() { + return pending; + } + + public StreamEntryID getLastDeliveredId() { + return lastDeliveredId; + } + + /** + * @return Generic map containing all key-value pairs returned by the server + */ + public Map getGroupInfo() { + return groupInfo; + } + +} diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java new file mode 100644 index 0000000000..ef6c7c502d --- /dev/null +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -0,0 +1,85 @@ +package redis.clients.jedis; + +import java.io.Serializable; +import java.util.Map; + +/** + * This class holds information about stream + * They can be access via getters. + * For future purpose there is also {@link #getStreamInfo} method + * that returns a generic {@code Map} - in case where more info is returned from a server + * + */ + +public class StreamInfo implements Serializable { + + public static final String LENGTH = "length"; + public static final String RADIX_TREE_KEYS = "radix-tree-keys"; + public static final String RADIX_TREE_NODES = "radix-tree-nodes"; + public static final String GROUPS = "groups"; + public static final String LAST_GENERATED_ID = "last-generated-id"; + public static final String FIRST_ENTRY = "first-entry"; + public static final String LAST_ENTRY = "last-entry"; + + private final long length; + private final long radixTreeKeys; + private final long radixTreeNodes; + private final long groups; + private final StreamEntryID lastGeneratedId; + private final StreamEntry firstEntry; + private final StreamEntry lastEntry; + private final Map streamInfo; + + /** + * @param map contains key-value pairs with stream info + * + */ + public StreamInfo(Map map) { + + streamInfo = map; + length = (Long) map.get(LENGTH); + radixTreeKeys = (Long) map.get(RADIX_TREE_KEYS); + radixTreeNodes = (Long) map.get(RADIX_TREE_NODES); + groups = (Long) map.get(GROUPS); + lastGeneratedId = (StreamEntryID) map.get(LAST_GENERATED_ID); + firstEntry = (StreamEntry) map.get(FIRST_ENTRY); + lastEntry = (StreamEntry) map.get(LAST_ENTRY); + + } + + public long getLength() { + return length; + } + + public long getRadixTreeKeys() { + return radixTreeKeys; + } + + public long getRadixTreeNodes() { + return radixTreeNodes; + } + + public long getGroups() { + return groups; + } + + public StreamEntryID getLastGeneratedId() { + return lastGeneratedId; + } + + public StreamEntry getFirstEntry() { + return firstEntry; + } + + public StreamEntry getLastEntry() { + return lastEntry; + } + + /** + * @return Generic map containing all key-value pairs returned by the server + */ + public Map getStreamInfo() { + return streamInfo; + } + +} diff --git a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java index cff42790c4..01a6b6a393 100644 --- a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java @@ -12,6 +12,9 @@ import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; import redis.clients.jedis.SortingParams; +import redis.clients.jedis.StreamConsumersInfo; +import redis.clients.jedis.StreamGroupInfo; +import redis.clients.jedis.StreamInfo; import redis.clients.jedis.Tuple; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.SetParams; @@ -356,4 +359,10 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou List xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername); List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids); + + StreamInfo xinfoStream (byte[] key); + + List xinfoGroup (byte[] key); + + List xinfoConsumers (byte[] key, byte[] group); } diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index 1ca788908a..5fd7167958 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -4,10 +4,13 @@ import java.util.Map.Entry; import redis.clients.jedis.BitOP; +import redis.clients.jedis.StreamConsumersInfo; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.ListPosition; import redis.clients.jedis.ScanParams; import redis.clients.jedis.SortingParams; +import redis.clients.jedis.StreamGroupInfo; +import redis.clients.jedis.StreamInfo; import redis.clients.jedis.ZParams; import redis.clients.jedis.params.MigrateParams; import redis.clients.jedis.params.ClientKillParams; @@ -391,4 +394,7 @@ void zrevrangeByScoreWithScores(String key, String max, String min, void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); + void xinfoStream (String key); + void xinfoGroup (String key); + void xinfoConsumers (String key, String group); } diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index 52a656818d..e8f42a1c63 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -5,11 +5,14 @@ import java.util.Set; import redis.clients.jedis.BitPosParams; +import redis.clients.jedis.StreamConsumersInfo; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.GeoCoordinate; import redis.clients.jedis.GeoRadiusResponse; import redis.clients.jedis.GeoUnit; import redis.clients.jedis.ListPosition; +import redis.clients.jedis.StreamGroupInfo; +import redis.clients.jedis.StreamInfo; import redis.clients.jedis.StreamPendingEntry; import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; @@ -480,4 +483,27 @@ List georadiusByMemberReadonly(String key, String member, dou */ List xclaim( String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); + + /** + * Introspection command used in order to retrieve different information about the stream + * @param key Stream name + * @return {@link StreamInfo} that contains information about the stream + */ + StreamInfo xinfoStream (String key); + + /** + * Introspection command used in order to retrieve different information about groups in the stream + * @param key Stream name + * @return List of {@link StreamGroupInfo} containing information about groups + */ + List xinfoGroup (String key); + + /** + * Introspection command used in order to retrieve different information about consumers in the group + * @param key Stream name + * @param group Group name + * @return List of {@link StreamConsumersInfo} containing information about consumers that belong + * to the the group + */ + List xinfoConsumers (String key, String group); } diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index 99a40e4bde..90842764f3 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -5,9 +5,21 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static redis.clients.jedis.StreamGroupInfo.CONSUMERS; +import static redis.clients.jedis.StreamGroupInfo.LAST_DELIVERED; +import static redis.clients.jedis.StreamGroupInfo.NAME; +import static redis.clients.jedis.StreamGroupInfo.PENDING; +import static redis.clients.jedis.StreamInfo.FIRST_ENTRY; +import static redis.clients.jedis.StreamInfo.GROUPS; +import static redis.clients.jedis.StreamInfo.LAST_ENTRY; +import static redis.clients.jedis.StreamInfo.LAST_GENERATED_ID; +import static redis.clients.jedis.StreamInfo.LENGTH; +import static redis.clients.jedis.StreamInfo.RADIX_TREE_KEYS; +import static redis.clients.jedis.StreamInfo.RADIX_TREE_NODES; +import static redis.clients.jedis.StreamConsumersInfo.IDLE; + import java.util.AbstractMap; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -17,6 +29,8 @@ import redis.clients.jedis.*; import redis.clients.jedis.Protocol.Keyword; import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.util.SafeEncoder; public class StreamsCommandsTest extends JedisCommandTestBase { @@ -306,6 +320,176 @@ public void xpendeing() { jedis.xclaim("xpendeing-stream", "xpendeing-group", "xpendeing-consumer2", 500, 0, 0, false, pendingRange.get(0).getID()); } + @Test + public void xinfo() throws InterruptedException { + + final String STREAM_NAME = "xadd-stream1"; + final String F1 = "f1"; + final String V1 = "v1"; + final String V2 = "v2"; + final String G1 = "G1"; + final String G2 = "G2"; + final String MY_CONSUMER = "myConsumer"; + final String MY_CONSUMER2 = "myConsumer2"; + + Map map1 = new HashMap<>(); + map1.put(F1, V1); + StreamEntryID id1 = jedis.xadd(STREAM_NAME, null, map1); + map1.put(F1, V2); + StreamEntryID id2 = jedis.xadd(STREAM_NAME, null, map1); + assertNotNull(id1); + StreamInfo streamInfo =jedis.xinfoStream(STREAM_NAME); + assertNotNull(id2); + + jedis.xgroupCreate(STREAM_NAME,G1, StreamEntryID.LAST_ENTRY,false); + Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>(STREAM_NAME, new StreamEntryID("0-0")); + jedis.xreadGroup(G1, MY_CONSUMER,1,0,false,streamQeury11); + + Thread.sleep(1); + + List groupInfo = jedis.xinfoGroup(STREAM_NAME); + List consumersInfo = jedis.xinfoConsumers(STREAM_NAME, G1); + + //Stream info test + assertEquals(2L,streamInfo.getStreamInfo().get(LENGTH)); + assertEquals(1L,streamInfo.getStreamInfo().get(RADIX_TREE_KEYS)); + assertEquals(2L,streamInfo.getStreamInfo().get(RADIX_TREE_NODES)); + assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); + assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); + assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); + assertEquals(id2,streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); + + //Using getters + assertEquals(2,streamInfo.getLength()); + assertEquals(1,streamInfo.getRadixTreeKeys()); + assertEquals(2,streamInfo.getRadixTreeNodes()); + assertEquals(0,streamInfo.getGroups()); + assertEquals(V1,streamInfo.getFirstEntry().getFields().get(F1)); + assertEquals(V2,streamInfo.getLastEntry().getFields().get(F1)); + assertEquals(id2,streamInfo.getLastGeneratedId()); + + + //Group info test + assertEquals(1,groupInfo.size()); + assertEquals(G1,groupInfo.get(0).getGroupInfo().get(NAME)); + assertEquals(1L,groupInfo.get(0).getGroupInfo().get(CONSUMERS)); + assertEquals(0L,groupInfo.get(0).getGroupInfo().get(PENDING)); + assertEquals(id2,groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); + + //Using getters + assertEquals(1,groupInfo.size()); + assertEquals(G1,groupInfo.get(0).getName()); + assertEquals(1,groupInfo.get(0).getConsumers()); + assertEquals(0,groupInfo.get(0).getPending()); + assertEquals(id2,groupInfo.get(0).getLastDeliveredId()); + + //Consumer info test + assertEquals(MY_CONSUMER,consumersInfo.get(0).getConsumerInfo().get(redis.clients.jedis.StreamConsumersInfo.NAME)); + assertEquals(0L,consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.PENDING)); + assertTrue((Long)consumersInfo.get(0).getConsumerInfo().get(IDLE)>0); + + //Using getters + assertEquals(MY_CONSUMER,consumersInfo.get(0).getName()); + assertEquals(0L,consumersInfo.get(0).getPending()); + assertTrue(consumersInfo.get(0).getIdle()>0); + + //test with more groups and consumers + jedis.xgroupCreate(STREAM_NAME,G2, StreamEntryID.LAST_ENTRY,false); + jedis.xreadGroup(G1, MY_CONSUMER2,1,0,false,streamQeury11); + jedis.xreadGroup(G2, MY_CONSUMER,1,0,false,streamQeury11); + jedis.xreadGroup(G2, MY_CONSUMER2,1,0,false,streamQeury11); + + List manyGroupsInfo = jedis.xinfoGroup(STREAM_NAME); + List manyConsumersInfo = jedis.xinfoConsumers(STREAM_NAME, G2); + + assertEquals(2,manyGroupsInfo.size()); + assertEquals(2,manyConsumersInfo.size()); + + //Not existing key - redis cli return error so we expect exception + try { + jedis.xinfoStream("random"); + fail("Command should fail"); + } catch (JedisException e) { + assertEquals("ERR no such key", e.getMessage()); + } + + } + + @Test + public void xinfoBinary() throws InterruptedException { + + final String STREAM_NAME = "xadd-stream1"; + final String F1 = "f1"; + final String V1 = "v1"; + final String V2 = "v2"; + final String G1 = "G1"; + final String G2 = "G2"; + final String MY_CONSUMER = "myConsumer"; + final String MY_CONSUMER2 = "myConsumer2"; + + Map map1 = new HashMap<>(); + map1.put(F1, V1); + StreamEntryID id1 = jedis.xadd(STREAM_NAME, null, map1); + map1.put(F1, V2); + StreamEntryID id2 = jedis.xadd(STREAM_NAME, null, map1); + assertNotNull(id1); + StreamInfo streamInfo = jedis.xinfoStream(SafeEncoder.encode(STREAM_NAME)); + assertNotNull(id2); + + jedis.xgroupCreate(STREAM_NAME,G1, StreamEntryID.LAST_ENTRY,false); + Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>(STREAM_NAME, new StreamEntryID("0-0")); + jedis.xreadGroup(G1, MY_CONSUMER,1,0,false,streamQeury11); + + Thread.sleep(1); + + List groupInfo = jedis.xinfoGroup(SafeEncoder.encode(STREAM_NAME)); + List consumersInfo = jedis.xinfoConsumers(SafeEncoder.encode(STREAM_NAME), SafeEncoder.encode(G1)); + + //Stream info test + assertEquals(2L,streamInfo.getStreamInfo().get(LENGTH)); + assertEquals(1L,streamInfo.getStreamInfo().get(RADIX_TREE_KEYS)); + assertEquals(2L,streamInfo.getStreamInfo().get(RADIX_TREE_NODES)); + assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); + assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); + assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); + assertEquals(id2,streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); + + //Group info test + assertEquals(1,groupInfo.size()); + assertEquals(G1,groupInfo.get(0).getGroupInfo().get(NAME)); + assertEquals(1L,groupInfo.get(0).getGroupInfo().get(CONSUMERS)); + assertEquals(0L,groupInfo.get(0).getGroupInfo().get(PENDING)); + assertEquals(id2,groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); + + //Consumer info test + assertEquals(MY_CONSUMER,consumersInfo.get(0).getConsumerInfo().get(redis.clients.jedis.StreamConsumersInfo.NAME)); + assertEquals(0L,consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.PENDING)); + assertTrue((Long)consumersInfo.get(0).getConsumerInfo().get(IDLE)>0); + + //test with more groups and consumers + jedis.xgroupCreate(STREAM_NAME,G2, StreamEntryID.LAST_ENTRY,false); + jedis.xreadGroup(G1, MY_CONSUMER2,1,0,false,streamQeury11); + jedis.xreadGroup(G2, MY_CONSUMER,1,0,false,streamQeury11); + jedis.xreadGroup(G2, MY_CONSUMER2,1,0,false,streamQeury11); + + List manyGroupsInfo = jedis.xinfoGroup(STREAM_NAME); + List manyConsumersInfo = jedis.xinfoConsumers(STREAM_NAME, G2); + + assertEquals(2,manyGroupsInfo.size()); + assertEquals(2,manyConsumersInfo.size()); + + //Not existing key - redis cli return error so we expect exception + try { + jedis.xinfoStream(SafeEncoder.encode("random")); + fail("Command should fail"); + } catch (JedisException e) { + assertEquals("ERR no such key", e.getMessage()); + } + + } + + + @Test public void pipeline() { Map map = new HashMap<>();