From f3750c3beb38305905313b43bb03c06d61f139d5 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Tue, 10 Sep 2019 11:32:19 +0200 Subject: [PATCH 01/15] Initial commit for XINFO command --- .../redis/clients/jedis/BinaryClient.java | 9 +++ src/main/java/redis/clients/jedis/Client.java | 6 ++ src/main/java/redis/clients/jedis/Jedis.java | 61 +++++++++++++++++++ .../java/redis/clients/jedis/Protocol.java | 3 +- .../redis/clients/jedis/ShardedJedis.java | 7 +++ .../java/redis/clients/jedis/StreamInfo.java | 22 +++++++ .../clients/jedis/commands/Commands.java | 1 + .../clients/jedis/commands/JedisCommands.java | 10 +++ 8 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 src/main/java/redis/clients/jedis/StreamInfo.java diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 39be157f08..f4facc7f11 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1455,4 +1455,13 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][])); } + public void xinfo(byte[] key, byte[] type) { + + ArrayList arguments = new ArrayList<>(10); + arguments.add(type); + arguments.add(key); + sendCommand(XINFO,arguments.toArray(new byte[arguments.size()][])); + + } + } diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index 20193fb911..b0038d1e4b 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -1262,5 +1262,11 @@ public void xclaim(String key, String group, String consumername, long minIdleTi xclaim(SafeEncoder.encode(key), SafeEncoder.encode(group), SafeEncoder.encode(consumername), minIdleTime, newIdleTime, retries, force, bids); } + @Override + public void xinfo(String key, String type) { + xinfo(SafeEncoder.encode(key),SafeEncoder.encode(type)); + + } + } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index e5e7aec9a2..7b3baf8601 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3830,6 +3830,67 @@ public List xclaim(String key, String group, String consumername, l return BuilderFactory.STREAM_ENTRY_LIST.build(client.getObjectMultiBulkReply()); } + @Override + public String xinfo(String key, String type) { + + client.xinfo(key,type); + StringBuilder sb = new StringBuilder(); + List streamsEntries = client.getObjectMultiBulkReply(); + if(streamsEntries == null) { + return null; + } + + List>> result = new ArrayList<>(streamsEntries.size()); + Iterator iterator = streamsEntries.iterator(); + + while (iterator.hasNext()) { + Object object = iterator.next(); + if (object instanceof byte[]) { + + String mapKey = SafeEncoder.encode((byte[]) object); + if (mapKey.equals("first-entry")|| mapKey.equals("last-entry")) { + sb.append(mapKey).append(" "); + ArrayList list = (ArrayList)iterator.next(); + sb.append(SafeEncoder.encode((byte[]) ((ArrayList) list).get(0))).append(" "); + ArrayList innerList = (ArrayList) list.get(1); + sb.append(SafeEncoder.encode((byte[]) innerList.get(0))).append(" "); + sb.append(SafeEncoder.encode((byte[]) innerList.get(1))).append(" "); + + } else if (mapKey.equals("last-generated-id")) { + sb.append(mapKey).append(" "); + sb.append(SafeEncoder.encode((byte[]) iterator.next())).append(" "); + + } else { + sb.append(mapKey).append(" "); + sb.append((Long)iterator.next()).append(" "); + } + /*sb.append(SafeEncoder.encode((byte[]) object)).append(" "); + sb.append((Long)iterator.next()).append(" "); + } else if (object instanceof ArrayList ) { + + }*/ + } + /*for(Object streamObj : streamsEntries) { + if (streamObj instanceof byte[]) { + sb.append(SafeEncoder.encode((byte[]) streamObj)).append(" "); + } else if (streamObj instanceof Long) { + sb.append((Long)streamObj).append(" "); + } else { + ArrayList list = (ArrayList) streamObj; + sb.append(SafeEncoder.encode((byte[]) ((ArrayList) list).get(0))).append(" "); + ArrayList innerList = (ArrayList) list.get(1); + sb.append(SafeEncoder.encode((byte[]) innerList.get(0))).append(" "); + sb.append(SafeEncoder.encode((byte[]) innerList.get(1))).append(" "); + }*/ + /*List stream = (List)streamObj; + String streamId = SafeEncoder.encode((byte[])stream.get(0)); + List streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1)); + result.add(new AbstractMap.SimpleEntry>(streamId, streamEntries));*/ + } + return sb.toString(); + //return ""; + } + public Object sendCommand(ProtocolCommand cmd, String... args) { client.sendCommand(cmd, args); return client.getOne(); diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 130eb3f5c3..9e21067c53 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -259,7 +259,8 @@ public static enum Command implements ProtocolCommand { PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, PFADD, PFCOUNT, PFMERGE, READONLY, GEOADD, GEODIST, GEOHASH, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, MODULE, BITFIELD, HSTRLEN, TOUCH, SWAPDB, MEMORY, - XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM; + XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM, + XINFO; private final byte[] raw; diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 21e240f576..0bcb453bba 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -14,6 +14,7 @@ import redis.clients.jedis.params.ZAddParams; import redis.clients.jedis.params.ZIncrByParams; import redis.clients.jedis.util.Hashing; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable { @@ -1055,6 +1056,12 @@ public List xclaim(String key, String group, String consumername, l return j.xclaim(key, group, consumername, minIdleTime, newIdleTime, retries, force, ids); } + @Override + public String xinfo(String key, String type) { + + throw new NotImplementedException(); + } + public Object sendCommand(ProtocolCommand cmd, String... args) { // default since no sample key provided in JedisCommands interface String sampleKey = args.length > 0 ? args[0] : cmd.toString(); diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java new file mode 100644 index 0000000000..7fedbc9806 --- /dev/null +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -0,0 +1,22 @@ +package redis.clients.jedis; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class StreamInfo implements Serializable { + + private Map infoMap; + + public StreamInfo(Map map) { + + if (map!= null) { + infoMap = map; + } else throw new IllegalArgumentException("InfoMap can not be null"); + } + + public Map getInfoMap() { + + return infoMap; + } +} diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index 1ca788908a..d96259b86f 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -391,4 +391,5 @@ void zrevrangeByScoreWithScores(String key, String max, String min, void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); + void xinfo (String key, String type); } diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index 52a656818d..f1e15af306 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -480,4 +480,14 @@ List georadiusByMemberReadonly(String key, String member, dou */ List xclaim( String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); + + /** + * Introspection command used in order to retrieve different information about the streams + * and associated consumer groups. + * @param key + * @param type + * @return + */ + String xinfo (String key, String type); + //TODO add an enum to select stream/group/consumers } From 71787755d7f46018fafb4f9d6e71247f572fb160 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Tue, 1 Oct 2019 10:37:33 +0200 Subject: [PATCH 02/15] Refactor xinfo stream command Builder classes used for decoding Returned type changed to Map --- .../redis/clients/jedis/BuilderFactory.java | 91 +++++++++++++++++++ src/main/java/redis/clients/jedis/Jedis.java | 60 ++---------- .../redis/clients/jedis/ShardedJedis.java | 2 +- .../clients/jedis/commands/JedisCommands.java | 2 +- .../tests/commands/StreamsCommandsTest.java | 27 ++++++ 5 files changed, 129 insertions(+), 53 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index ac03f44c34..7df09a9d4c 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -527,6 +528,41 @@ public String toString() { return "List"; } }; + + public static final Builder STREAM_ENTRY = new Builder() { + @Override + @SuppressWarnings("unchecked") + public StreamEntry build(Object data) { + if (null == data) { + return null; + } + ArrayList objectList = (ArrayList) data; + + if (objectList.isEmpty()) { + return null; + } + + String entryIdString = SafeEncoder.encode((byte[]) objectList.get(0)); + StreamEntryID entryID = new StreamEntryID(entryIdString); + List hash = (List) objectList.get(1); + + Iterator hashIterator = hash.iterator(); + Map map = new HashMap<>(hash.size() / 2); + while (hashIterator.hasNext()) { + map.put(SafeEncoder.encode((byte[]) hashIterator.next()), + SafeEncoder.encode((byte[]) hashIterator.next())); + } + StreamEntry streamEntry = new StreamEntry(entryID, map); + + + return streamEntry; + } + + @Override + public String toString() { + return "List"; + } + }; public static final Builder> STREAM_PENDING_ENTRY_LIST = new Builder>() { @Override @@ -555,6 +591,61 @@ public String toString() { } }; + public static final Builder> STREAM_INFO = new Builder>() { + @Override + @SuppressWarnings("unchecked") + public Map build(Object data) { + if (null == data) { + return null; + } + + Map mappingFunctions = new HashMap<>(); + mappingFunctions.put("last-generated-id",STRING); + mappingFunctions.put("first-entry",STREAM_ENTRY); + mappingFunctions.put("length", LONG); + mappingFunctions.put("radix-tree-keys", LONG); + mappingFunctions.put("radix-tree-nodes", LONG); + mappingFunctions.put("last-generated-id", STRING); + mappingFunctions.put("last-entry",STREAM_ENTRY); + mappingFunctions.put("groups", LONG); + + + Map resultMap = new HashMap<>(); + List streamsEntries = (List)data; + Iterator iterator = streamsEntries.iterator(); + + while (iterator.hasNext()) { + + String mapKey = STRING.build(iterator.next()); + resultMap.put(mapKey,mappingFunctions.get(mapKey).build(iterator.next())); + + } + return resultMap; + } + + @Override + public String toString() { + return "Map"; + } + }; + + public static final Builder> STREAM_GROUP_INFO = new Builder>() { + @Override + @SuppressWarnings("unchecked") + public Map build(Object data) { + if (null == data) { + return null; + } + + throw new RuntimeException("Not implemented yet"); + } + + @Override + public String toString() { + return "Map"; + } + }; + public static final Builder OBJECT = new Builder() { @Override public Object build(Object data) { diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 7b3baf8601..3b699b8077 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3,6 +3,7 @@ import java.net.URI; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -3831,64 +3832,21 @@ public List xclaim(String key, String group, String consumername, l } @Override - public String xinfo(String key, String type) { - + public Map xinfo(String key, String type) { + Map resultMap; client.xinfo(key,type); - StringBuilder sb = new StringBuilder(); List streamsEntries = client.getObjectMultiBulkReply(); if(streamsEntries == null) { return null; } - List>> result = new ArrayList<>(streamsEntries.size()); - Iterator iterator = streamsEntries.iterator(); - - while (iterator.hasNext()) { - Object object = iterator.next(); - if (object instanceof byte[]) { - - String mapKey = SafeEncoder.encode((byte[]) object); - if (mapKey.equals("first-entry")|| mapKey.equals("last-entry")) { - sb.append(mapKey).append(" "); - ArrayList list = (ArrayList)iterator.next(); - sb.append(SafeEncoder.encode((byte[]) ((ArrayList) list).get(0))).append(" "); - ArrayList innerList = (ArrayList) list.get(1); - sb.append(SafeEncoder.encode((byte[]) innerList.get(0))).append(" "); - sb.append(SafeEncoder.encode((byte[]) innerList.get(1))).append(" "); - - } else if (mapKey.equals("last-generated-id")) { - sb.append(mapKey).append(" "); - sb.append(SafeEncoder.encode((byte[]) iterator.next())).append(" "); - - } else { - sb.append(mapKey).append(" "); - sb.append((Long)iterator.next()).append(" "); - } - /*sb.append(SafeEncoder.encode((byte[]) object)).append(" "); - sb.append((Long)iterator.next()).append(" "); - } else if (object instanceof ArrayList ) { - - }*/ - } - /*for(Object streamObj : streamsEntries) { - if (streamObj instanceof byte[]) { - sb.append(SafeEncoder.encode((byte[]) streamObj)).append(" "); - } else if (streamObj instanceof Long) { - sb.append((Long)streamObj).append(" "); - } else { - ArrayList list = (ArrayList) streamObj; - sb.append(SafeEncoder.encode((byte[]) ((ArrayList) list).get(0))).append(" "); - ArrayList innerList = (ArrayList) list.get(1); - sb.append(SafeEncoder.encode((byte[]) innerList.get(0))).append(" "); - sb.append(SafeEncoder.encode((byte[]) innerList.get(1))).append(" "); - }*/ - /*List stream = (List)streamObj; - String streamId = SafeEncoder.encode((byte[])stream.get(0)); - List streamEntries = BuilderFactory.STREAM_ENTRY_LIST.build(stream.get(1)); - result.add(new AbstractMap.SimpleEntry>(streamId, streamEntries));*/ + if ("stream".equalsIgnoreCase(type)) { + resultMap = BuilderFactory.STREAM_INFO.build(streamsEntries); + } else { + throw new RuntimeException("Not implemented yet"); } - return sb.toString(); - //return ""; + + return resultMap; } public Object sendCommand(ProtocolCommand cmd, String... args) { diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 0bcb453bba..7e4402cd85 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -1057,7 +1057,7 @@ public List xclaim(String key, String group, String consumername, l } @Override - public String xinfo(String key, String type) { + public Map xinfo(String key, String type) { throw new NotImplementedException(); } diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index f1e15af306..59bf2ad24b 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -488,6 +488,6 @@ List xclaim( String key, String group, String consumername, long mi * @param type * @return */ - String xinfo (String key, String type); + Map xinfo (String key, String type); //TODO add an enum to select stream/group/consumers } diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index 99a40e4bde..12087b9c6f 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -306,6 +306,33 @@ public void xpendeing() { jedis.xclaim("xpendeing-stream", "xpendeing-group", "xpendeing-consumer2", 500, 0, 0, false, pendingRange.get(0).getID()); } + @Test + public void xinfo() { + + + try { + Map map1 = new HashMap(); + jedis.xadd("stream1", null, map1); + fail(); + } catch (JedisDataException expected) { + assertEquals("ERR wrong number of arguments for 'xadd' command", expected.getMessage()); + } + + Map map1 = new HashMap(); + map1.put("f1", "v1"); + StreamEntryID id1 = jedis.xadd("xadd-stream1", null, map1); + assertNotNull(id1); + + Map xinfo =jedis.xinfo("xadd-stream1", "stream"); + assertEquals(1L,xinfo.get("length")); + assertEquals(1L,xinfo.get("radix-tree-keys")); + assertEquals(2L,xinfo.get("radix-tree-nodes")); + assertEquals(0L,xinfo.get("groups")); + assertTrue(xinfo.get("first-entry") instanceof StreamEntry ); + assertTrue(xinfo.get("last-entry") instanceof StreamEntry ); + + } + @Test public void pipeline() { Map map = new HashMap<>(); From 00943841224e07b328fce38dcd0624ae114780dc Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Sun, 29 Dec 2019 18:36:09 +0100 Subject: [PATCH 03/15] Happy case implementation of xinfo A lot of refactoring to be done. This commit is just for showing the idea --- .../redis/clients/jedis/BinaryClient.java | 9 ++ .../redis/clients/jedis/BuilderFactory.java | 115 +++++++++++++++--- src/main/java/redis/clients/jedis/Client.java | 22 +++- src/main/java/redis/clients/jedis/Jedis.java | 37 ++++-- .../redis/clients/jedis/ShardedJedis.java | 18 ++- .../clients/jedis/StreamConsumersInfo.java | 52 ++++++++ .../redis/clients/jedis/StreamGroupInfo.java | 59 +++++++++ .../java/redis/clients/jedis/StreamInfo.java | 67 +++++++++- .../clients/jedis/commands/Commands.java | 7 +- .../clients/jedis/commands/JedisCommands.java | 9 +- .../tests/commands/StreamsCommandsTest.java | 28 +++-- 11 files changed, 381 insertions(+), 42 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/StreamConsumersInfo.java create mode 100644 src/main/java/redis/clients/jedis/StreamGroupInfo.java diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index f4facc7f11..6b0acca7f0 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1464,4 +1464,13 @@ public void xinfo(byte[] key, byte[] type) { } + public void xinfo (byte[] key, byte[] group, byte[] type) { + + ArrayList arguments = new ArrayList<>(10); + arguments.add(type); + arguments.add(key); + arguments.add(group); + sendCommand(XINFO,arguments.toArray(new byte[arguments.size()][])); + } + } diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 7df09a9d4c..e8fc244776 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1,7 +1,7 @@ package redis.clients.jedis; -import java.lang.reflect.Array; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import redis.clients.jedis.util.JedisByteHashMap; import redis.clients.jedis.util.SafeEncoder; @@ -591,23 +592,22 @@ public String toString() { } }; - public static final Builder> STREAM_INFO = new Builder>() { + public static final Builder STREAM_INFO = new Builder() { @Override @SuppressWarnings("unchecked") - public Map build(Object data) { + public StreamInfo build(Object data) { if (null == data) { return null; } Map mappingFunctions = new HashMap<>(); - mappingFunctions.put("last-generated-id",STRING); - mappingFunctions.put("first-entry",STREAM_ENTRY); - mappingFunctions.put("length", LONG); - mappingFunctions.put("radix-tree-keys", LONG); - mappingFunctions.put("radix-tree-nodes", LONG); - mappingFunctions.put("last-generated-id", STRING); - mappingFunctions.put("last-entry",STREAM_ENTRY); - mappingFunctions.put("groups", LONG); + mappingFunctions.put(StreamInfo.LAST_GENERATED_ID,STRING); + mappingFunctions.put(StreamInfo.FIRST_ENTRY,STREAM_ENTRY); + mappingFunctions.put(StreamInfo.LENGHT, LONG); + mappingFunctions.put(StreamInfo.RADIX_TREE_KEYS, LONG); + mappingFunctions.put(StreamInfo.RADIX_TREE_NODES, LONG); + mappingFunctions.put(StreamInfo.LAST_ENTRY,STREAM_ENTRY); + mappingFunctions.put(StreamInfo.GROUPS, LONG); Map resultMap = new HashMap<>(); @@ -620,24 +620,107 @@ public Map build(Object data) { resultMap.put(mapKey,mappingFunctions.get(mapKey).build(iterator.next())); } - return resultMap; + StreamInfo streamInfo = new StreamInfo(resultMap); + return streamInfo; } @Override public String toString() { - return "Map"; + return "StreamInfo"; } }; - public static final Builder> STREAM_GROUP_INFO = new Builder>() { + public static final Builder> STREAM_GROUP_INFO = new Builder>() { @Override @SuppressWarnings("unchecked") - public Map build(Object data) { + public List build(Object data) { if (null == data) { return null; } - throw new RuntimeException("Not implemented yet"); + Map mappingFunctions = new HashMap<>(); + mappingFunctions.put(StreamGroupInfo.NAME,STRING); + mappingFunctions.put(StreamGroupInfo.CONSUMERS, LONG); + mappingFunctions.put(StreamGroupInfo.PENDING, LONG); + mappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STRING); + + List list = new ArrayList<>(); + List streamsEntries = (List)data; + Iterator groupsArray = streamsEntries.iterator(); + + while (groupsArray.hasNext()) { + + Map resultMap = new HashMap<>(); + List groupInfo = (List) groupsArray.next(); + + Iterator groupInfoIterator = groupInfo.iterator(); + + while (groupInfoIterator.hasNext()) { + + String mapKey = STRING.build(groupInfoIterator.next()); + resultMap.put(mapKey, mappingFunctions.get(mapKey).build(groupInfoIterator.next())); + } + StreamGroupInfo streamGroupInfo = new StreamGroupInfo(resultMap); + list.add(streamGroupInfo); + + } + //StreamGroupInfo streamInfo = new StreamGroupInfo(resultMap); + return list; + + + + + //throw new RuntimeException("Not implemented yet"); + } + + @Override + public String toString() { + return "Map"; + } + }; + + + public static final Builder> STREAM_CONSUMERS_INFO = new Builder>() { + @Override + @SuppressWarnings("unchecked") + public List build(Object data) { + if (null == data) { + return null; + } + + Map mappingFunctions = new HashMap<>(); + mappingFunctions.put(StreamConsumersInfo.NAME,STRING); + mappingFunctions.put(StreamConsumersInfo.IDLE, LONG); + mappingFunctions.put(StreamGroupInfo.PENDING, LONG); + mappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STRING); + + List list = new ArrayList<>(); + List streamsEntries = (List)data; + Iterator groupsArray = streamsEntries.iterator(); + + while (groupsArray.hasNext()) { + + Map resultMap = new HashMap<>(); + List groupInfo = (List) groupsArray.next(); + + Iterator groupInfoIterator = groupInfo.iterator(); + + while (groupInfoIterator.hasNext()) { + + String mapKey = STRING.build(groupInfoIterator.next()); + resultMap.put(mapKey, mappingFunctions.get(mapKey).build(groupInfoIterator.next())); + } + StreamConsumersInfo streamGroupInfo = new StreamConsumersInfo(resultMap); + list.add(streamGroupInfo); + + } + //StreamGroupInfo streamInfo = new StreamGroupInfo(resultMap); + return list; + + + + + //throw new RuntimeException("Not implemented yet"); } @Override diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index b0038d1e4b..b33eb768a1 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -1263,8 +1263,26 @@ public void xclaim(String key, String group, String consumername, long minIdleTi } @Override - public void xinfo(String key, String type) { - xinfo(SafeEncoder.encode(key),SafeEncoder.encode(type)); + public void xinfo(String key, StreamInfo.StreamInfoType type) { + + + xinfo(SafeEncoder.encode(key),SafeEncoder.encode(StreamInfo.STREAM_INFO)); + + } + + @Override + public void xinfo(String key, StreamGroupInfo.StreamGroupInfoType type) { + + + xinfo(SafeEncoder.encode(key),SafeEncoder.encode(StreamGroupInfo.GROUP_INFO)); + + } + + @Override + public void xinfo(String key, String group, StreamConsumersInfo.StreamConsumersInfoType type) { + + + xinfo(SafeEncoder.encode(key),SafeEncoder.encode(group),SafeEncoder.encode(StreamConsumersInfo.CONSUMERS_INFO)); } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 3b699b8077..c0cca941a2 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3,7 +3,6 @@ import java.net.URI; import java.util.AbstractMap; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -3832,19 +3831,43 @@ public List xclaim(String key, String group, String consumername, l } @Override - public Map xinfo(String key, String type) { - Map resultMap; + public StreamInfo xinfo(String key, StreamInfo.StreamInfoType type) { + StreamInfo resultMap; client.xinfo(key,type); List streamsEntries = client.getObjectMultiBulkReply(); if(streamsEntries == null) { return null; } + resultMap = BuilderFactory.STREAM_INFO.build(streamsEntries); - if ("stream".equalsIgnoreCase(type)) { - resultMap = BuilderFactory.STREAM_INFO.build(streamsEntries); - } else { - throw new RuntimeException("Not implemented yet"); + + return resultMap; + } + + @Override + public List xinfo(String key, StreamGroupInfo.StreamGroupInfoType type) { + List resultMap; + client.xinfo(key,type); + List streamsEntries = client.getObjectMultiBulkReply(); + if(streamsEntries == null) { + return null; } + resultMap = BuilderFactory.STREAM_GROUP_INFO.build(streamsEntries); + + + return resultMap; + } + + @Override + public List xinfo(String key, String group, StreamConsumersInfo.StreamConsumersInfoType type) { + List resultMap; + client.xinfo(key,group,type); + List streamsEntries = client.getObjectMultiBulkReply(); + if(streamsEntries == null) { + return null; + } + resultMap = BuilderFactory.STREAM_CONSUMERS_INFO.build(streamsEntries); + return resultMap; } diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 7e4402cd85..6ca904a10a 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -1057,9 +1057,23 @@ public List xclaim(String key, String group, String consumername, l } @Override - public Map xinfo(String key, String type) { + public StreamInfo xinfo(String key, StreamInfo.StreamInfoType streamInfoType) { - throw new NotImplementedException(); + Jedis j = getShard(key); + return j.xinfo(key,streamInfoType); + } + + @Override + public List xinfo(String key, StreamGroupInfo.StreamGroupInfoType streamInfoType) { + + Jedis j = getShard(key); + return j.xinfo(key,streamInfoType); + } + + @Override + public List xinfo (String key, String group, StreamConsumersInfo.StreamConsumersInfoType streamConsumersInfoType){ + Jedis j = getShard(key); + return j.xinfo(key,group,streamConsumersInfoType); } public Object sendCommand(ProtocolCommand cmd, String... args) { diff --git a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java new file mode 100644 index 0000000000..4f6fa9a965 --- /dev/null +++ b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java @@ -0,0 +1,52 @@ +package redis.clients.jedis; + +import java.util.Map; + +public class StreamConsumersInfo { + + public final static String NAME = "name"; + public final static String IDLE = "idle"; + public final static String PENDING = "pending"; + public static final String CONSUMERS_INFO = "consumers"; + + + private final String name; + private final long idle; + private final long pending; + + public StreamConsumersInfo(Map map) { + + if (map!= null && map.size()>0) { + name = (String) map.get(NAME); + idle = (long) map.get(IDLE); + pending = (long) map.get(PENDING); + + + } else throw new IllegalArgumentException(); + + } + + public String getName() { + return name; + } + + public long getIdle() { + return idle; + } + + public long getPending() { + return pending; + } + + public static class StreamConsumersInfoType { + + private static final StreamConsumersInfoType streamConsumersInfoType = new StreamConsumersInfoType(); + private StreamConsumersInfoType() { + //Should not be used + }; + + public static StreamConsumersInfoType getStreamGroupInfoType() { + return streamConsumersInfoType; + } + } +} diff --git a/src/main/java/redis/clients/jedis/StreamGroupInfo.java b/src/main/java/redis/clients/jedis/StreamGroupInfo.java new file mode 100644 index 0000000000..dddda8c583 --- /dev/null +++ b/src/main/java/redis/clients/jedis/StreamGroupInfo.java @@ -0,0 +1,59 @@ +package redis.clients.jedis; + +import java.io.Serializable; +import java.util.Map; + +public class StreamGroupInfo implements Serializable { + + public final static String NAME = "name"; + public final static String CONSUMERS = "consumers"; + public final static String PENDING = "pending"; + public final static String LAST_DELIVERED = "last-delivered-id"; + public static final String GROUP_INFO = "groups"; + + + private final String name; + private final long consumers; + private final long pending; + private final String lastDeliveredId; + + + public StreamGroupInfo(Map map) { + if (map!= null && map.size()>0) { + name = (String) map.get(NAME); + consumers = (long) map.get(CONSUMERS); + pending = (long) map.get(PENDING); + lastDeliveredId = (String) map.get(LAST_DELIVERED); + + + } else throw new IllegalArgumentException(); + } + + public String getName() { + return name; + } + + public long getConsumers() { + return consumers; + } + + public long getPending() { + return pending; + } + + public String getLastDeliveredId() { + return lastDeliveredId; + } + + public static class StreamGroupInfoType { + + private static final StreamGroupInfoType streamInfoType = new StreamGroupInfoType(); + private StreamGroupInfoType() { + //Should not be used + }; + + public static StreamGroupInfoType getStreamGroupInfoType() { + return streamInfoType; + } + } +} diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java index 7fedbc9806..640c13e29d 100644 --- a/src/main/java/redis/clients/jedis/StreamInfo.java +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -1,22 +1,79 @@ package redis.clients.jedis; import java.io.Serializable; -import java.util.HashMap; import java.util.Map; public class StreamInfo implements Serializable { - private Map infoMap; + public static final String LENGHT = "length"; + public static final String RADIX_TREE_KEYS = "radix-tree-keys"; + public static final String RADIX_TREE_NODES = "radix-tree-nodes"; + public static final String GROUPS = "groups"; + public static final String LAST_GENERATED_ID = "last-generated-id"; + public static final String FIRST_ENTRY = "first-entry"; + public static final String LAST_ENTRY = "last-entry"; + public static final String STREAM_INFO = "stream"; + + private final Long length; + private final Long radixTreeKeys; + + private final Long radixTreeNodes; + private final Long groups; + private final String lastGeneratedId; + private final StreamEntry firstEntry; + private final StreamEntry lastEntry; public StreamInfo(Map map) { if (map!= null) { - infoMap = map; + length = (Long) map.get(LENGHT); + radixTreeKeys = (Long) map.get(RADIX_TREE_KEYS); + radixTreeNodes = (Long) map.get(RADIX_TREE_NODES); + groups = (Long) map.get(GROUPS); + lastGeneratedId = (String) map.get(LAST_GENERATED_ID); + firstEntry = (StreamEntry) map.get(FIRST_ENTRY); + lastEntry = (StreamEntry) map.get(LAST_ENTRY); + } else throw new IllegalArgumentException("InfoMap can not be null"); } - public Map getInfoMap() { + public Long getLength() { + return length; + } + + public Long getRadixTreeKeys() { + return radixTreeKeys; + } + + public Long getRadixTreeNodes() { + return radixTreeNodes; + } + + public Long getGroups() { + return groups; + } + + public String getLastGeneratedId() { + return lastGeneratedId; + } + + public StreamEntry getFirstEntry() { + return firstEntry; + } + + public StreamEntry getLastEntry() { + return lastEntry; + } + + public static class StreamInfoType { + + private static final StreamInfoType streamInfoType = new StreamInfoType(); + private StreamInfoType() { + //Should not be used + }; - return infoMap; + public static StreamInfoType getStreamInfoType() { + return streamInfoType; + } } } diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index d96259b86f..3f6c3adfd4 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -4,10 +4,13 @@ import java.util.Map.Entry; import redis.clients.jedis.BitOP; +import redis.clients.jedis.StreamConsumersInfo; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.ListPosition; import redis.clients.jedis.ScanParams; import redis.clients.jedis.SortingParams; +import redis.clients.jedis.StreamGroupInfo; +import redis.clients.jedis.StreamInfo; import redis.clients.jedis.ZParams; import redis.clients.jedis.params.MigrateParams; import redis.clients.jedis.params.ClientKillParams; @@ -391,5 +394,7 @@ void zrevrangeByScoreWithScores(String key, String max, String min, void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); - void xinfo (String key, String type); + void xinfo (String key, StreamInfo.StreamInfoType type); + void xinfo (String key, StreamGroupInfo.StreamGroupInfoType type); + void xinfo (String key, String group, StreamConsumersInfo.StreamConsumersInfoType type); } diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index 59bf2ad24b..4292238d97 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -5,11 +5,14 @@ import java.util.Set; import redis.clients.jedis.BitPosParams; +import redis.clients.jedis.StreamConsumersInfo; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.GeoCoordinate; import redis.clients.jedis.GeoRadiusResponse; import redis.clients.jedis.GeoUnit; import redis.clients.jedis.ListPosition; +import redis.clients.jedis.StreamGroupInfo; +import redis.clients.jedis.StreamInfo; import redis.clients.jedis.StreamPendingEntry; import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; @@ -488,6 +491,10 @@ List xclaim( String key, String group, String consumername, long mi * @param type * @return */ - Map xinfo (String key, String type); + StreamInfo xinfo (String key, StreamInfo.StreamInfoType type); //TODO add an enum to select stream/group/consumers + + List xinfo (String key, StreamGroupInfo.StreamGroupInfoType type); + + List xinfo (String key, String group, StreamConsumersInfo.StreamConsumersInfoType type); } diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index 12087b9c6f..f64e3a6970 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -7,7 +7,6 @@ import static org.junit.Assert.fail; import java.util.AbstractMap; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -322,17 +321,30 @@ public void xinfo() { map1.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xadd-stream1", null, map1); assertNotNull(id1); + StreamInfo xinfo =jedis.xinfo("xadd-stream1", StreamInfo.StreamInfoType.getStreamInfoType()); + assertEquals(1L,xinfo.getLength().longValue()); + assertEquals(1L,xinfo.getRadixTreeKeys().longValue()); + assertEquals(2L,xinfo.getRadixTreeNodes().longValue()); + assertEquals(0L,xinfo.getGroups().longValue()); + assertEquals("v1",xinfo.getFirstEntry().getFields().get("f1")); + assertEquals("v1",xinfo.getLastEntry().getFields().get("f1")); - Map xinfo =jedis.xinfo("xadd-stream1", "stream"); - assertEquals(1L,xinfo.get("length")); - assertEquals(1L,xinfo.get("radix-tree-keys")); - assertEquals(2L,xinfo.get("radix-tree-nodes")); - assertEquals(0L,xinfo.get("groups")); - assertTrue(xinfo.get("first-entry") instanceof StreamEntry ); - assertTrue(xinfo.get("last-entry") instanceof StreamEntry ); + jedis.xgroupCreate("xadd-stream1","G1", StreamEntryID.LAST_ENTRY,false); + jedis.xgroupCreate("xadd-stream1","G2", StreamEntryID.LAST_ENTRY,false); + + Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>("xadd-stream1", new StreamEntryID("0-0")); + + jedis.xreadGroup("G1", "myConsumer",1,0,false,streamQeury11); + + + List info = jedis.xinfo("xadd-stream1", StreamGroupInfo.StreamGroupInfoType.getStreamGroupInfoType()); + List consumersInfos = jedis.xinfo("xadd-stream1", "G1", + StreamConsumersInfo.StreamConsumersInfoType.getStreamGroupInfoType()); } + + @Test public void pipeline() { Map map = new HashMap<>(); From 5d051dd6546974bea9db26938c35198682a3a5cb Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Thu, 9 Jan 2020 19:00:36 +0100 Subject: [PATCH 04/15] Refactor of xinfo after review comments xinfo commands return now just a Map In case when (in the future) redis includes additional fields in a reply we will try to decode them using known decoders --- .../redis/clients/jedis/BinaryClient.java | 8 +- .../redis/clients/jedis/BuilderFactory.java | 133 +++++++++++++----- src/main/java/redis/clients/jedis/Client.java | 16 +-- src/main/java/redis/clients/jedis/Jedis.java | 36 ++--- .../redis/clients/jedis/ShardedJedis.java | 13 +- .../clients/jedis/StreamConsumersInfo.java | 15 +- .../redis/clients/jedis/StreamGroupInfo.java | 18 +-- .../java/redis/clients/jedis/StreamInfo.java | 20 +-- .../clients/jedis/commands/Commands.java | 6 +- .../clients/jedis/commands/JedisCommands.java | 8 +- .../tests/commands/StreamsCommandsTest.java | 22 +-- 11 files changed, 168 insertions(+), 127 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index 6b0acca7f0..ebe8f3f613 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1455,22 +1455,22 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][])); } - public void xinfo(byte[] key, byte[] type) { + public void xinfo(byte[] type, byte[] key) { ArrayList arguments = new ArrayList<>(10); arguments.add(type); arguments.add(key); - sendCommand(XINFO,arguments.toArray(new byte[arguments.size()][])); + sendCommand(XINFO,type,key); } - public void xinfo (byte[] key, byte[] group, byte[] type) { + public void xinfo ( byte[] type, byte[] key, byte[] group) { ArrayList arguments = new ArrayList<>(10); arguments.add(type); arguments.add(key); arguments.add(group); - sendCommand(XINFO,arguments.toArray(new byte[arguments.size()][])); + sendCommand(XINFO,type,key,group); } } diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index e8fc244776..89336745da 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1,7 +1,6 @@ package redis.clients.jedis; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -9,7 +8,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Stream; import redis.clients.jedis.util.JedisByteHashMap; import redis.clients.jedis.util.SafeEncoder; @@ -561,7 +559,7 @@ public StreamEntry build(Object data) { @Override public String toString() { - return "List"; + return "StreamEntry"; } }; @@ -593,6 +591,24 @@ public String toString() { }; public static final Builder STREAM_INFO = new Builder() { + + + Map mappingFunctions = createDecoderMap(); + + private Map createDecoderMap() { + + Map tempMappingFunctions = new HashMap<>(); + tempMappingFunctions.put(StreamInfo.LAST_GENERATED_ID,STRING); + tempMappingFunctions.put(StreamInfo.FIRST_ENTRY,STREAM_ENTRY); + tempMappingFunctions.put(StreamInfo.LENGHT, LONG); + tempMappingFunctions.put(StreamInfo.RADIX_TREE_KEYS, LONG); + tempMappingFunctions.put(StreamInfo.RADIX_TREE_NODES, LONG); + tempMappingFunctions.put(StreamInfo.LAST_ENTRY,STREAM_ENTRY); + tempMappingFunctions.put(StreamInfo.GROUPS, LONG); + + return tempMappingFunctions; + } + @Override @SuppressWarnings("unchecked") public StreamInfo build(Object data) { @@ -600,16 +616,6 @@ public StreamInfo build(Object data) { return null; } - Map mappingFunctions = new HashMap<>(); - mappingFunctions.put(StreamInfo.LAST_GENERATED_ID,STRING); - mappingFunctions.put(StreamInfo.FIRST_ENTRY,STREAM_ENTRY); - mappingFunctions.put(StreamInfo.LENGHT, LONG); - mappingFunctions.put(StreamInfo.RADIX_TREE_KEYS, LONG); - mappingFunctions.put(StreamInfo.RADIX_TREE_NODES, LONG); - mappingFunctions.put(StreamInfo.LAST_ENTRY,STREAM_ENTRY); - mappingFunctions.put(StreamInfo.GROUPS, LONG); - - Map resultMap = new HashMap<>(); List streamsEntries = (List)data; Iterator iterator = streamsEntries.iterator(); @@ -617,7 +623,20 @@ public StreamInfo build(Object data) { while (iterator.hasNext()) { String mapKey = STRING.build(iterator.next()); - resultMap.put(mapKey,mappingFunctions.get(mapKey).build(iterator.next())); + if (mappingFunctions.containsKey(mapKey)) { + resultMap.put(mapKey, mappingFunctions.get(mapKey).build(iterator.next())); + } else { //For future - if we don't find an element in our builder map + Object unknownData = iterator.next(); + for (Builder b:mappingFunctions.values()) { + try { + resultMap.put(mapKey,b.build(unknownData)); + break; + } catch (ClassCastException e) { + //We continue with next builder + + } + } + } } StreamInfo streamInfo = new StreamInfo(resultMap); @@ -631,6 +650,21 @@ public String toString() { }; public static final Builder> STREAM_GROUP_INFO = new Builder>() { + + Map mappingFunctions = createDecoderMap(); + + private Map createDecoderMap() { + + Map tempMappingFunctions = new HashMap<>(); + tempMappingFunctions.put(StreamGroupInfo.NAME,STRING); + tempMappingFunctions.put(StreamGroupInfo.CONSUMERS, LONG); + tempMappingFunctions.put(StreamGroupInfo.PENDING, LONG); + tempMappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STRING); + + return tempMappingFunctions; + } + + @Override @SuppressWarnings("unchecked") public List build(Object data) { @@ -638,12 +672,6 @@ public List build(Object data) { return null; } - Map mappingFunctions = new HashMap<>(); - mappingFunctions.put(StreamGroupInfo.NAME,STRING); - mappingFunctions.put(StreamGroupInfo.CONSUMERS, LONG); - mappingFunctions.put(StreamGroupInfo.PENDING, LONG); - mappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STRING); - List list = new ArrayList<>(); List streamsEntries = (List)data; Iterator groupsArray = streamsEntries.iterator(); @@ -658,7 +686,20 @@ public List build(Object data) { while (groupInfoIterator.hasNext()) { String mapKey = STRING.build(groupInfoIterator.next()); - resultMap.put(mapKey, mappingFunctions.get(mapKey).build(groupInfoIterator.next())); + if (mappingFunctions.containsKey(mapKey)) { + resultMap.put(mapKey, mappingFunctions.get(mapKey).build(groupInfoIterator.next())); + } else { //For future - if we don't find an element in our builder map + Object unknownData = groupInfoIterator.next(); + for (Builder b:mappingFunctions.values()) { + try { + resultMap.put(mapKey,b.build(unknownData)); + break; + } catch (ClassCastException e) { + //We continue with next builder + + } + } + } } StreamGroupInfo streamGroupInfo = new StreamGroupInfo(resultMap); list.add(streamGroupInfo); @@ -675,12 +716,26 @@ public List build(Object data) { @Override public String toString() { - return "Map"; + return "List"; } }; public static final Builder> STREAM_CONSUMERS_INFO = new Builder>() { + + Map mappingFunctions = createDecoderMap(); + + private Map createDecoderMap() { + Map tempMappingFunctions = new HashMap<>(); + tempMappingFunctions.put(StreamConsumersInfo.NAME,STRING); + tempMappingFunctions.put(StreamConsumersInfo.IDLE, LONG); + tempMappingFunctions.put(StreamGroupInfo.PENDING, LONG); + tempMappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STRING); + return tempMappingFunctions; + + } + + @Override @SuppressWarnings("unchecked") public List build(Object data) { @@ -688,12 +743,6 @@ public List build(Object data) { return null; } - Map mappingFunctions = new HashMap<>(); - mappingFunctions.put(StreamConsumersInfo.NAME,STRING); - mappingFunctions.put(StreamConsumersInfo.IDLE, LONG); - mappingFunctions.put(StreamGroupInfo.PENDING, LONG); - mappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STRING); - List list = new ArrayList<>(); List streamsEntries = (List)data; Iterator groupsArray = streamsEntries.iterator(); @@ -703,29 +752,37 @@ public List build(Object data) { Map resultMap = new HashMap<>(); List groupInfo = (List) groupsArray.next(); - Iterator groupInfoIterator = groupInfo.iterator(); + Iterator consumerInfoIterator = groupInfo.iterator(); - while (groupInfoIterator.hasNext()) { + while (consumerInfoIterator.hasNext()) { - String mapKey = STRING.build(groupInfoIterator.next()); - resultMap.put(mapKey, mappingFunctions.get(mapKey).build(groupInfoIterator.next())); + String mapKey = STRING.build(consumerInfoIterator.next()); + if (mappingFunctions.containsKey(mapKey)) { + resultMap.put(mapKey, mappingFunctions.get(mapKey).build(consumerInfoIterator.next())); + } else { //For future - if we don't find an element in our builder map + Object unknownData = consumerInfoIterator.next(); + for (Builder b:mappingFunctions.values()) { + try { + resultMap.put(mapKey,b.build(unknownData)); + break; + } catch (ClassCastException e) { + //We continue with next builder + + } + } + } } StreamConsumersInfo streamGroupInfo = new StreamConsumersInfo(resultMap); list.add(streamGroupInfo); } - //StreamGroupInfo streamInfo = new StreamGroupInfo(resultMap); return list; - - - - //throw new RuntimeException("Not implemented yet"); } @Override public String toString() { - return "Map"; + return "List"; } }; diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index b33eb768a1..042eb955e9 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -1263,28 +1263,24 @@ public void xclaim(String key, String group, String consumername, long minIdleTi } @Override - public void xinfo(String key, StreamInfo.StreamInfoType type) { + public void xinfo(StreamInfo.StreamInfoType type, String key) { - - xinfo(SafeEncoder.encode(key),SafeEncoder.encode(StreamInfo.STREAM_INFO)); + xinfo(SafeEncoder.encode(StreamInfo.STREAM_INFO), SafeEncoder.encode(key)); } @Override - public void xinfo(String key, StreamGroupInfo.StreamGroupInfoType type) { - + public void xinfo(StreamGroupInfo.StreamGroupInfoType type, String key) { - xinfo(SafeEncoder.encode(key),SafeEncoder.encode(StreamGroupInfo.GROUP_INFO)); + xinfo(SafeEncoder.encode(StreamGroupInfo.GROUP_INFO), SafeEncoder.encode(key)); } @Override - public void xinfo(String key, String group, StreamConsumersInfo.StreamConsumersInfoType type) { + public void xinfo(StreamConsumersInfo.StreamConsumersInfoType type, String key, String group) { - - xinfo(SafeEncoder.encode(key),SafeEncoder.encode(group),SafeEncoder.encode(StreamConsumersInfo.CONSUMERS_INFO)); + xinfo(SafeEncoder.encode(StreamConsumersInfo.CONSUMERS_INFO), SafeEncoder.encode(key),SafeEncoder.encode(group)); } - } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index c0cca941a2..c3e20afd36 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3831,45 +3831,27 @@ public List xclaim(String key, String group, String consumername, l } @Override - public StreamInfo xinfo(String key, StreamInfo.StreamInfoType type) { - StreamInfo resultMap; - client.xinfo(key,type); - List streamsEntries = client.getObjectMultiBulkReply(); - if(streamsEntries == null) { - return null; - } - resultMap = BuilderFactory.STREAM_INFO.build(streamsEntries); + public StreamInfo xinfo(StreamInfo.StreamInfoType type, String key) { + client.xinfo(type,key); + return BuilderFactory.STREAM_INFO.build(client.getObjectMultiBulkReply()); - return resultMap; } @Override - public List xinfo(String key, StreamGroupInfo.StreamGroupInfoType type) { - List resultMap; - client.xinfo(key,type); - List streamsEntries = client.getObjectMultiBulkReply(); - if(streamsEntries == null) { - return null; - } - resultMap = BuilderFactory.STREAM_GROUP_INFO.build(streamsEntries); + public List xinfo(StreamGroupInfo.StreamGroupInfoType type, String key) { + client.xinfo(type,key); + return BuilderFactory.STREAM_GROUP_INFO.build(client.getObjectMultiBulkReply()); - return resultMap; } @Override - public List xinfo(String key, String group, StreamConsumersInfo.StreamConsumersInfoType type) { - List resultMap; - client.xinfo(key,group,type); - List streamsEntries = client.getObjectMultiBulkReply(); - if(streamsEntries == null) { - return null; - } - resultMap = BuilderFactory.STREAM_CONSUMERS_INFO.build(streamsEntries); + public List xinfo(StreamConsumersInfo.StreamConsumersInfoType type, String key, String group) { + client.xinfo(type,key,group); + return BuilderFactory.STREAM_CONSUMERS_INFO.build(client.getObjectMultiBulkReply()); - return resultMap; } public Object sendCommand(ProtocolCommand cmd, String... args) { diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index 6ca904a10a..a06578be80 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -14,7 +14,6 @@ import redis.clients.jedis.params.ZAddParams; import redis.clients.jedis.params.ZIncrByParams; import redis.clients.jedis.util.Hashing; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable { @@ -1057,23 +1056,23 @@ public List xclaim(String key, String group, String consumername, l } @Override - public StreamInfo xinfo(String key, StreamInfo.StreamInfoType streamInfoType) { + public StreamInfo xinfo(StreamInfo.StreamInfoType streamInfoType, String key) { Jedis j = getShard(key); - return j.xinfo(key,streamInfoType); + return j.xinfo(streamInfoType, key); } @Override - public List xinfo(String key, StreamGroupInfo.StreamGroupInfoType streamInfoType) { + public List xinfo(StreamGroupInfo.StreamGroupInfoType streamInfoType, String key) { Jedis j = getShard(key); - return j.xinfo(key,streamInfoType); + return j.xinfo(streamInfoType, key); } @Override - public List xinfo (String key, String group, StreamConsumersInfo.StreamConsumersInfoType streamConsumersInfoType){ + public List xinfo (StreamConsumersInfo.StreamConsumersInfoType streamConsumersInfoType, String key, String group){ Jedis j = getShard(key); - return j.xinfo(key,group,streamConsumersInfoType); + return j.xinfo(streamConsumersInfoType, key, group); } public Object sendCommand(ProtocolCommand cmd, String... args) { diff --git a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java index 4f6fa9a965..078b1d92dd 100644 --- a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java +++ b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java @@ -10,23 +10,22 @@ public class StreamConsumersInfo { public static final String CONSUMERS_INFO = "consumers"; - private final String name; + /*private final String name; private final long idle; - private final long pending; + private final long pending;*/ + private final Map consumerInfo; public StreamConsumersInfo(Map map) { if (map!= null && map.size()>0) { - name = (String) map.get(NAME); - idle = (long) map.get(IDLE); - pending = (long) map.get(PENDING); + consumerInfo = map; } else throw new IllegalArgumentException(); } - public String getName() { + /*public String getName() { return name; } @@ -36,6 +35,10 @@ public long getIdle() { public long getPending() { return pending; + }*/ + + public Map getConsumerInfo() { + return consumerInfo; } public static class StreamConsumersInfoType { diff --git a/src/main/java/redis/clients/jedis/StreamGroupInfo.java b/src/main/java/redis/clients/jedis/StreamGroupInfo.java index dddda8c583..2be989ed45 100644 --- a/src/main/java/redis/clients/jedis/StreamGroupInfo.java +++ b/src/main/java/redis/clients/jedis/StreamGroupInfo.java @@ -12,24 +12,22 @@ public class StreamGroupInfo implements Serializable { public static final String GROUP_INFO = "groups"; - private final String name; + /*private final String name; private final long consumers; private final long pending; - private final String lastDeliveredId; + private final String lastDeliveredId;*/ + private final Map groupInfo; public StreamGroupInfo(Map map) { if (map!= null && map.size()>0) { - name = (String) map.get(NAME); - consumers = (long) map.get(CONSUMERS); - pending = (long) map.get(PENDING); - lastDeliveredId = (String) map.get(LAST_DELIVERED); + groupInfo = map; } else throw new IllegalArgumentException(); } - public String getName() { + /* public String getName() { return name; } @@ -43,7 +41,11 @@ public long getPending() { public String getLastDeliveredId() { return lastDeliveredId; - } + }*/ + + public Map getGroupInfo() { + return groupInfo; + } public static class StreamGroupInfoType { diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java index 640c13e29d..0d57e57dbf 100644 --- a/src/main/java/redis/clients/jedis/StreamInfo.java +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -14,30 +14,26 @@ public class StreamInfo implements Serializable { public static final String LAST_ENTRY = "last-entry"; public static final String STREAM_INFO = "stream"; - private final Long length; + /*private final Long length; private final Long radixTreeKeys; private final Long radixTreeNodes; private final Long groups; private final String lastGeneratedId; private final StreamEntry firstEntry; - private final StreamEntry lastEntry; + private final StreamEntry lastEntry;*/ + private final Map streamInfo; + public StreamInfo(Map map) { if (map!= null) { - length = (Long) map.get(LENGHT); - radixTreeKeys = (Long) map.get(RADIX_TREE_KEYS); - radixTreeNodes = (Long) map.get(RADIX_TREE_NODES); - groups = (Long) map.get(GROUPS); - lastGeneratedId = (String) map.get(LAST_GENERATED_ID); - firstEntry = (StreamEntry) map.get(FIRST_ENTRY); - lastEntry = (StreamEntry) map.get(LAST_ENTRY); + streamInfo = map; } else throw new IllegalArgumentException("InfoMap can not be null"); } - public Long getLength() { + /*public Long getLength() { return length; } @@ -63,6 +59,10 @@ public StreamEntry getFirstEntry() { public StreamEntry getLastEntry() { return lastEntry; + }*/ + + public Map getStreamInfo() { + return streamInfo; } public static class StreamInfoType { diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index 3f6c3adfd4..c2cd4e70b8 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -394,7 +394,7 @@ void zrevrangeByScoreWithScores(String key, String max, String min, void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); - void xinfo (String key, StreamInfo.StreamInfoType type); - void xinfo (String key, StreamGroupInfo.StreamGroupInfoType type); - void xinfo (String key, String group, StreamConsumersInfo.StreamConsumersInfoType type); + void xinfo (StreamInfo.StreamInfoType type, String key); + void xinfo (StreamGroupInfo.StreamGroupInfoType type, String key); + void xinfo (StreamConsumersInfo.StreamConsumersInfoType type, String key, String group); } diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index 4292238d97..59388c3258 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -487,14 +487,14 @@ List xclaim( String key, String group, String consumername, long mi /** * Introspection command used in order to retrieve different information about the streams * and associated consumer groups. - * @param key * @param type + * @param key * @return */ - StreamInfo xinfo (String key, StreamInfo.StreamInfoType type); + StreamInfo xinfo (StreamInfo.StreamInfoType type,String key); //TODO add an enum to select stream/group/consumers - List xinfo (String key, StreamGroupInfo.StreamGroupInfoType type); + List xinfo (StreamGroupInfo.StreamGroupInfoType type,String key); - List xinfo (String key, String group, StreamConsumersInfo.StreamConsumersInfoType type); + List xinfo (StreamConsumersInfo.StreamConsumersInfoType type, String key, String group); } diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index f64e3a6970..01a2a737f6 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -321,13 +321,13 @@ public void xinfo() { map1.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xadd-stream1", null, map1); assertNotNull(id1); - StreamInfo xinfo =jedis.xinfo("xadd-stream1", StreamInfo.StreamInfoType.getStreamInfoType()); - assertEquals(1L,xinfo.getLength().longValue()); - assertEquals(1L,xinfo.getRadixTreeKeys().longValue()); - assertEquals(2L,xinfo.getRadixTreeNodes().longValue()); - assertEquals(0L,xinfo.getGroups().longValue()); - assertEquals("v1",xinfo.getFirstEntry().getFields().get("f1")); - assertEquals("v1",xinfo.getLastEntry().getFields().get("f1")); + StreamInfo xinfo =jedis.xinfo(StreamInfo.StreamInfoType.getStreamInfoType(), "xadd-stream1"); + assertEquals(1L,(long)xinfo.getStreamInfo().get("length")); + assertEquals(1L,xinfo.getStreamInfo().get("radix-tree-keys")); + assertEquals(2L,xinfo.getStreamInfo().get("radix-tree-nodes")); + assertEquals(0L,xinfo.getStreamInfo().get("groups")); + assertEquals("v1",((StreamEntry)xinfo.getStreamInfo().get("first-entry")).getFields().get("f1")); + assertEquals("v1",((StreamEntry)xinfo.getStreamInfo().get("last-entry")).getFields().get("f1")); jedis.xgroupCreate("xadd-stream1","G1", StreamEntryID.LAST_ENTRY,false); jedis.xgroupCreate("xadd-stream1","G2", StreamEntryID.LAST_ENTRY,false); @@ -337,9 +337,11 @@ public void xinfo() { jedis.xreadGroup("G1", "myConsumer",1,0,false,streamQeury11); - List info = jedis.xinfo("xadd-stream1", StreamGroupInfo.StreamGroupInfoType.getStreamGroupInfoType()); - List consumersInfos = jedis.xinfo("xadd-stream1", "G1", - StreamConsumersInfo.StreamConsumersInfoType.getStreamGroupInfoType()); + List info = jedis.xinfo(StreamGroupInfo.StreamGroupInfoType.getStreamGroupInfoType(),"xadd-stream1"); + List consumersInfos = jedis.xinfo(StreamConsumersInfo.StreamConsumersInfoType.getStreamGroupInfoType(), + "xadd-stream1", "G1"); + + System.out.println(info.get(0).getGroupInfo().get("name")); } From 2690e017c226d60fecab76cb69226e799a139192 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Mon, 13 Jan 2020 17:07:05 +0100 Subject: [PATCH 05/15] Add more tests Some code clean up done together with more tests --- .../redis/clients/jedis/BuilderFactory.java | 5 -- .../tests/commands/StreamsCommandsTest.java | 67 ++++++++++++------- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 89336745da..2bf0f92c3a 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -705,13 +705,8 @@ public List build(Object data) { list.add(streamGroupInfo); } - //StreamGroupInfo streamInfo = new StreamGroupInfo(resultMap); return list; - - - - //throw new RuntimeException("Not implemented yet"); } @Override diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index 01a2a737f6..d1103a9db5 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -16,6 +16,7 @@ import redis.clients.jedis.*; import redis.clients.jedis.Protocol.Keyword; import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.exceptions.JedisException; public class StreamsCommandsTest extends JedisCommandTestBase { @@ -308,40 +309,60 @@ public void xpendeing() { @Test public void xinfo() { - - try { - Map map1 = new HashMap(); - jedis.xadd("stream1", null, map1); - fail(); - } catch (JedisDataException expected) { - assertEquals("ERR wrong number of arguments for 'xadd' command", expected.getMessage()); - } - Map map1 = new HashMap(); map1.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xadd-stream1", null, map1); + map1.put("f1", "v2"); + StreamEntryID id2 = jedis.xadd("xadd-stream1", null, map1); assertNotNull(id1); - StreamInfo xinfo =jedis.xinfo(StreamInfo.StreamInfoType.getStreamInfoType(), "xadd-stream1"); - assertEquals(1L,(long)xinfo.getStreamInfo().get("length")); - assertEquals(1L,xinfo.getStreamInfo().get("radix-tree-keys")); - assertEquals(2L,xinfo.getStreamInfo().get("radix-tree-nodes")); - assertEquals(0L,xinfo.getStreamInfo().get("groups")); - assertEquals("v1",((StreamEntry)xinfo.getStreamInfo().get("first-entry")).getFields().get("f1")); - assertEquals("v1",((StreamEntry)xinfo.getStreamInfo().get("last-entry")).getFields().get("f1")); + StreamInfo streamInfo =jedis.xinfo(StreamInfo.StreamInfoType.getStreamInfoType(), "xadd-stream1"); + assertNotNull(id2); jedis.xgroupCreate("xadd-stream1","G1", StreamEntryID.LAST_ENTRY,false); - jedis.xgroupCreate("xadd-stream1","G2", StreamEntryID.LAST_ENTRY,false); - Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>("xadd-stream1", new StreamEntryID("0-0")); - jedis.xreadGroup("G1", "myConsumer",1,0,false,streamQeury11); + List groupInfo = jedis.xinfo(StreamGroupInfo.StreamGroupInfoType.getStreamGroupInfoType(),"xadd-stream1"); + List consumersInfo = jedis.xinfo(StreamConsumersInfo.StreamConsumersInfoType.getStreamGroupInfoType(), + "xadd-stream1", "G1"); + //Stream info test + assertEquals(2L,(long)streamInfo.getStreamInfo().get("length")); + assertEquals(1L,streamInfo.getStreamInfo().get("radix-tree-keys")); + assertEquals(2L,streamInfo.getStreamInfo().get("radix-tree-nodes")); + assertEquals(0L,streamInfo.getStreamInfo().get("groups")); + assertEquals("v1",((StreamEntry)streamInfo.getStreamInfo().get("first-entry")).getFields().get("f1")); + assertEquals("v2",((StreamEntry)streamInfo.getStreamInfo().get("last-entry")).getFields().get("f1")); + + //Group info test + assertEquals(1,groupInfo.size()); + assertEquals("G1",groupInfo.get(0).getGroupInfo().get("name")); + assertEquals(1L,groupInfo.get(0).getGroupInfo().get("consumers")); + assertEquals(0L,groupInfo.get(0).getGroupInfo().get("pending")); + + //Consumer info test + assertEquals("myConsumer",consumersInfo.get(0).getConsumerInfo().get("name")); + assertEquals(0L,consumersInfo.get(0).getConsumerInfo().get("pending")); + assertTrue((Long)consumersInfo.get(0).getConsumerInfo().get("idle")>0); + + //test with more groups and consumers + jedis.xgroupCreate("xadd-stream1","G2", StreamEntryID.LAST_ENTRY,false); + jedis.xreadGroup("G1", "myConsumer2",1,0,false,streamQeury11); + jedis.xreadGroup("G2", "myConsumer",1,0,false,streamQeury11); + jedis.xreadGroup("G2", "myConsumer2",1,0,false,streamQeury11); - List info = jedis.xinfo(StreamGroupInfo.StreamGroupInfoType.getStreamGroupInfoType(),"xadd-stream1"); - List consumersInfos = jedis.xinfo(StreamConsumersInfo.StreamConsumersInfoType.getStreamGroupInfoType(), - "xadd-stream1", "G1"); + List manyGroupsInfo = jedis.xinfo(StreamGroupInfo.StreamGroupInfoType.getStreamGroupInfoType(),"xadd-stream1"); + List manyConsumersInfo = jedis.xinfo(StreamConsumersInfo.StreamConsumersInfoType.getStreamGroupInfoType(),"xadd-stream1", "G2"); + + assertEquals(2,manyGroupsInfo.size()); + assertEquals(2,manyConsumersInfo.size()); - System.out.println(info.get(0).getGroupInfo().get("name")); + //Not existing key - redis cli return error so we expect exception + try { + jedis.xinfo(StreamInfo.StreamInfoType.getStreamInfoType(), "random"); + fail("Command should fail"); + } catch (JedisException e) { + assertEquals("ERR no such key", e.getMessage()); + } } From 059190eb06f1c495e7947f002c541686c775bfe8 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Tue, 14 Jan 2020 11:07:16 +0100 Subject: [PATCH 06/15] Restore getters for map elements Some minor refactoring done together with restoring getters for elements known to be returned by redis now. In case new elements are added in the future the map can be used to obtain them. --- .../redis/clients/jedis/BinaryClient.java | 7 --- .../redis/clients/jedis/BuilderFactory.java | 14 ++--- src/main/java/redis/clients/jedis/Jedis.java | 4 +- .../clients/jedis/StreamConsumersInfo.java | 11 ++-- .../redis/clients/jedis/StreamGroupInfo.java | 13 +++-- .../java/redis/clients/jedis/StreamInfo.java | 29 ++++++---- .../tests/commands/StreamsCommandsTest.java | 57 +++++++++++++++---- 7 files changed, 87 insertions(+), 48 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index ebe8f3f613..bd8b9fc54f 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1457,19 +1457,12 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId public void xinfo(byte[] type, byte[] key) { - ArrayList arguments = new ArrayList<>(10); - arguments.add(type); - arguments.add(key); sendCommand(XINFO,type,key); } public void xinfo ( byte[] type, byte[] key, byte[] group) { - ArrayList arguments = new ArrayList<>(10); - arguments.add(type); - arguments.add(key); - arguments.add(group); sendCommand(XINFO,type,key,group); } diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 2bf0f92c3a..dd48afb453 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -514,7 +514,7 @@ public List build(Object data) { Iterator hashIterator = hash.iterator(); Map map = new HashMap<>(hash.size()/2); while(hashIterator.hasNext()) { - map.put(SafeEncoder.encode((byte[])hashIterator.next()), SafeEncoder.encode((byte[])hashIterator.next())); + map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode((byte[])hashIterator.next())); } responses.add(new StreamEntry(entryID, map)); } @@ -535,7 +535,7 @@ public StreamEntry build(Object data) { if (null == data) { return null; } - ArrayList objectList = (ArrayList) data; + List objectList = (List) data; if (objectList.isEmpty()) { return null; @@ -548,8 +548,8 @@ public StreamEntry build(Object data) { Iterator hashIterator = hash.iterator(); Map map = new HashMap<>(hash.size() / 2); while (hashIterator.hasNext()) { - map.put(SafeEncoder.encode((byte[]) hashIterator.next()), - SafeEncoder.encode((byte[]) hashIterator.next())); + map.put(SafeEncoder.encode(hashIterator.next()), + SafeEncoder.encode(hashIterator.next())); } StreamEntry streamEntry = new StreamEntry(entryID, map); @@ -600,7 +600,7 @@ private Map createDecoderMap() { Map tempMappingFunctions = new HashMap<>(); tempMappingFunctions.put(StreamInfo.LAST_GENERATED_ID,STRING); tempMappingFunctions.put(StreamInfo.FIRST_ENTRY,STREAM_ENTRY); - tempMappingFunctions.put(StreamInfo.LENGHT, LONG); + tempMappingFunctions.put(StreamInfo.LENGTH, LONG); tempMappingFunctions.put(StreamInfo.RADIX_TREE_KEYS, LONG); tempMappingFunctions.put(StreamInfo.RADIX_TREE_NODES, LONG); tempMappingFunctions.put(StreamInfo.LAST_ENTRY,STREAM_ENTRY); @@ -649,7 +649,7 @@ public String toString() { } }; - public static final Builder> STREAM_GROUP_INFO = new Builder>() { + public static final Builder> STREAM_GROUP_INFO_LIST = new Builder>() { Map mappingFunctions = createDecoderMap(); @@ -716,7 +716,7 @@ public String toString() { }; - public static final Builder> STREAM_CONSUMERS_INFO = new Builder>() { + public static final Builder> STREAM_CONSUMERS_INFO_LIST = new Builder>() { Map mappingFunctions = createDecoderMap(); diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index c3e20afd36..a99ea7abf2 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3842,7 +3842,7 @@ public StreamInfo xinfo(StreamInfo.StreamInfoType type, String key) { public List xinfo(StreamGroupInfo.StreamGroupInfoType type, String key) { client.xinfo(type,key); - return BuilderFactory.STREAM_GROUP_INFO.build(client.getObjectMultiBulkReply()); + return BuilderFactory.STREAM_GROUP_INFO_LIST.build(client.getObjectMultiBulkReply()); } @@ -3850,7 +3850,7 @@ public List xinfo(StreamGroupInfo.StreamGroupInfoType type, Str public List xinfo(StreamConsumersInfo.StreamConsumersInfoType type, String key, String group) { client.xinfo(type,key,group); - return BuilderFactory.STREAM_CONSUMERS_INFO.build(client.getObjectMultiBulkReply()); + return BuilderFactory.STREAM_CONSUMERS_INFO_LIST.build(client.getObjectMultiBulkReply()); } diff --git a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java index 078b1d92dd..f171a65319 100644 --- a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java +++ b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java @@ -10,22 +10,25 @@ public class StreamConsumersInfo { public static final String CONSUMERS_INFO = "consumers"; - /*private final String name; + private final String name; private final long idle; - private final long pending;*/ + private final long pending; private final Map consumerInfo; public StreamConsumersInfo(Map map) { if (map!= null && map.size()>0) { consumerInfo = map; + name = (String) map.get(NAME); + idle = (long) map.get(IDLE); + pending = (long) map.get(PENDING); } else throw new IllegalArgumentException(); } - /*public String getName() { + public String getName() { return name; } @@ -35,7 +38,7 @@ public long getIdle() { public long getPending() { return pending; - }*/ + } public Map getConsumerInfo() { return consumerInfo; diff --git a/src/main/java/redis/clients/jedis/StreamGroupInfo.java b/src/main/java/redis/clients/jedis/StreamGroupInfo.java index 2be989ed45..e7b530de28 100644 --- a/src/main/java/redis/clients/jedis/StreamGroupInfo.java +++ b/src/main/java/redis/clients/jedis/StreamGroupInfo.java @@ -12,22 +12,25 @@ public class StreamGroupInfo implements Serializable { public static final String GROUP_INFO = "groups"; - /*private final String name; + private final String name; private final long consumers; private final long pending; - private final String lastDeliveredId;*/ + private final String lastDeliveredId; private final Map groupInfo; public StreamGroupInfo(Map map) { if (map!= null && map.size()>0) { groupInfo = map; - + name = (String) map.get(NAME); + consumers = (long) map.get(CONSUMERS); + pending = (long) map.get(PENDING); + lastDeliveredId = (String) map.get(LAST_DELIVERED); } else throw new IllegalArgumentException(); } - /* public String getName() { + public String getName() { return name; } @@ -41,7 +44,7 @@ public long getPending() { public String getLastDeliveredId() { return lastDeliveredId; - }*/ + } public Map getGroupInfo() { return groupInfo; diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java index 0d57e57dbf..c6db99f634 100644 --- a/src/main/java/redis/clients/jedis/StreamInfo.java +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -5,7 +5,7 @@ public class StreamInfo implements Serializable { - public static final String LENGHT = "length"; + public static final String LENGTH = "length"; public static final String RADIX_TREE_KEYS = "radix-tree-keys"; public static final String RADIX_TREE_NODES = "radix-tree-nodes"; public static final String GROUPS = "groups"; @@ -14,14 +14,14 @@ public class StreamInfo implements Serializable { public static final String LAST_ENTRY = "last-entry"; public static final String STREAM_INFO = "stream"; - /*private final Long length; - private final Long radixTreeKeys; + private final long length; + private final long radixTreeKeys; - private final Long radixTreeNodes; - private final Long groups; + private final long radixTreeNodes; + private final long groups; private final String lastGeneratedId; private final StreamEntry firstEntry; - private final StreamEntry lastEntry;*/ + private final StreamEntry lastEntry; private final Map streamInfo; @@ -29,23 +29,30 @@ public StreamInfo(Map map) { if (map!= null) { streamInfo = map; + length = (Long) map.get(LENGTH); + radixTreeKeys = (Long) map.get(RADIX_TREE_KEYS); + radixTreeNodes = (Long) map.get(RADIX_TREE_NODES); + groups = (Long) map.get(GROUPS); + lastGeneratedId = (String) map.get(LAST_GENERATED_ID); + firstEntry = (StreamEntry) map.get(FIRST_ENTRY); + lastEntry = (StreamEntry) map.get(LAST_ENTRY); } else throw new IllegalArgumentException("InfoMap can not be null"); } - /*public Long getLength() { + public long getLength() { return length; } - public Long getRadixTreeKeys() { + public long getRadixTreeKeys() { return radixTreeKeys; } - public Long getRadixTreeNodes() { + public long getRadixTreeNodes() { return radixTreeNodes; } - public Long getGroups() { + public long getGroups() { return groups; } @@ -59,7 +66,7 @@ public StreamEntry getFirstEntry() { public StreamEntry getLastEntry() { return lastEntry; - }*/ + } public Map getStreamInfo() { return streamInfo; diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index d1103a9db5..f8275060e6 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -5,6 +5,18 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static redis.clients.jedis.StreamGroupInfo.CONSUMERS; +import static redis.clients.jedis.StreamGroupInfo.LAST_DELIVERED; +import static redis.clients.jedis.StreamGroupInfo.NAME; +import static redis.clients.jedis.StreamGroupInfo.PENDING; +import static redis.clients.jedis.StreamInfo.FIRST_ENTRY; +import static redis.clients.jedis.StreamInfo.GROUPS; +import static redis.clients.jedis.StreamInfo.LAST_ENTRY; +import static redis.clients.jedis.StreamInfo.LENGTH; +import static redis.clients.jedis.StreamInfo.RADIX_TREE_KEYS; +import static redis.clients.jedis.StreamInfo.RADIX_TREE_NODES; +import static redis.clients.jedis.StreamConsumersInfo.IDLE; + import java.util.AbstractMap; import java.util.HashMap; @@ -326,23 +338,44 @@ public void xinfo() { "xadd-stream1", "G1"); //Stream info test - assertEquals(2L,(long)streamInfo.getStreamInfo().get("length")); - assertEquals(1L,streamInfo.getStreamInfo().get("radix-tree-keys")); - assertEquals(2L,streamInfo.getStreamInfo().get("radix-tree-nodes")); - assertEquals(0L,streamInfo.getStreamInfo().get("groups")); - assertEquals("v1",((StreamEntry)streamInfo.getStreamInfo().get("first-entry")).getFields().get("f1")); - assertEquals("v2",((StreamEntry)streamInfo.getStreamInfo().get("last-entry")).getFields().get("f1")); + assertEquals(2L,streamInfo.getStreamInfo().get(LENGTH)); + assertEquals(1L,streamInfo.getStreamInfo().get(RADIX_TREE_KEYS)); + assertEquals(2L,streamInfo.getStreamInfo().get(RADIX_TREE_NODES)); + assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); + assertEquals("v1",((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get("f1")); + assertEquals("v2",((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get("f1")); + + //Using getters + assertEquals(2,streamInfo.getLength()); + assertEquals(1,streamInfo.getRadixTreeKeys()); + assertEquals(2,streamInfo.getRadixTreeNodes()); + assertEquals(0,streamInfo.getGroups()); + assertEquals("v1",streamInfo.getFirstEntry().getFields().get("f1")); + assertEquals("v2",streamInfo.getLastEntry().getFields().get("f1")); //Group info test assertEquals(1,groupInfo.size()); - assertEquals("G1",groupInfo.get(0).getGroupInfo().get("name")); - assertEquals(1L,groupInfo.get(0).getGroupInfo().get("consumers")); - assertEquals(0L,groupInfo.get(0).getGroupInfo().get("pending")); + assertEquals("G1",groupInfo.get(0).getGroupInfo().get(NAME)); + assertEquals(1L,groupInfo.get(0).getGroupInfo().get(CONSUMERS)); + assertEquals(0L,groupInfo.get(0).getGroupInfo().get(PENDING)); + assertNotNull(groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); + + //Using getters + assertEquals(1,groupInfo.size()); + assertEquals("G1",groupInfo.get(0).getName()); + assertEquals(1,groupInfo.get(0).getConsumers()); + assertEquals(0,groupInfo.get(0).getPending()); + assertNotNull(groupInfo.get(0).getLastDeliveredId()); //Consumer info test - assertEquals("myConsumer",consumersInfo.get(0).getConsumerInfo().get("name")); - assertEquals(0L,consumersInfo.get(0).getConsumerInfo().get("pending")); - assertTrue((Long)consumersInfo.get(0).getConsumerInfo().get("idle")>0); + assertEquals("myConsumer",consumersInfo.get(0).getConsumerInfo().get(redis.clients.jedis.StreamConsumersInfo.NAME)); + assertEquals(0L,consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.PENDING)); + assertTrue((Long)consumersInfo.get(0).getConsumerInfo().get(IDLE)>0); + + //Using getters + assertEquals("myConsumer",consumersInfo.get(0).getName()); + assertEquals(0L,consumersInfo.get(0).getPending()); + assertTrue(consumersInfo.get(0).getIdle()>0); //test with more groups and consumers jedis.xgroupCreate("xadd-stream1","G2", StreamEntryID.LAST_ENTRY,false); From de3e555822bb2bddfb6a71d4dd3c0f99d8a31030 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Tue, 14 Jan 2020 16:13:27 +0100 Subject: [PATCH 07/15] Move decoding to a separete function The decoding code is common for all xinfo cases so it has been extracted to a separate function so it can be used by stream, groups and consumers cases. --- .../redis/clients/jedis/BuilderFactory.java | 94 ++++++------------- 1 file changed, 30 insertions(+), 64 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index dd48afb453..c9780b2387 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -616,30 +616,11 @@ public StreamInfo build(Object data) { return null; } - Map resultMap = new HashMap<>(); List streamsEntries = (List)data; Iterator iterator = streamsEntries.iterator(); - while (iterator.hasNext()) { - - String mapKey = STRING.build(iterator.next()); - if (mappingFunctions.containsKey(mapKey)) { - resultMap.put(mapKey, mappingFunctions.get(mapKey).build(iterator.next())); - } else { //For future - if we don't find an element in our builder map - Object unknownData = iterator.next(); - for (Builder b:mappingFunctions.values()) { - try { - resultMap.put(mapKey,b.build(unknownData)); - break; - } catch (ClassCastException e) { - //We continue with next builder - - } - } - } - - } - StreamInfo streamInfo = new StreamInfo(resultMap); + StreamInfo streamInfo = new StreamInfo( + createMapFromDecodingFunctions(iterator,mappingFunctions)); return streamInfo; } @@ -664,7 +645,6 @@ private Map createDecoderMap() { return tempMappingFunctions; } - @Override @SuppressWarnings("unchecked") public List build(Object data) { @@ -678,30 +658,12 @@ public List build(Object data) { while (groupsArray.hasNext()) { - Map resultMap = new HashMap<>(); List groupInfo = (List) groupsArray.next(); Iterator groupInfoIterator = groupInfo.iterator(); - while (groupInfoIterator.hasNext()) { - - String mapKey = STRING.build(groupInfoIterator.next()); - if (mappingFunctions.containsKey(mapKey)) { - resultMap.put(mapKey, mappingFunctions.get(mapKey).build(groupInfoIterator.next())); - } else { //For future - if we don't find an element in our builder map - Object unknownData = groupInfoIterator.next(); - for (Builder b:mappingFunctions.values()) { - try { - resultMap.put(mapKey,b.build(unknownData)); - break; - } catch (ClassCastException e) { - //We continue with next builder - - } - } - } - } - StreamGroupInfo streamGroupInfo = new StreamGroupInfo(resultMap); + StreamGroupInfo streamGroupInfo = new StreamGroupInfo( + createMapFromDecodingFunctions(groupInfoIterator,mappingFunctions)); list.add(streamGroupInfo); } @@ -715,7 +677,6 @@ public String toString() { } }; - public static final Builder> STREAM_CONSUMERS_INFO_LIST = new Builder>() { Map mappingFunctions = createDecoderMap(); @@ -730,7 +691,6 @@ private Map createDecoderMap() { } - @Override @SuppressWarnings("unchecked") public List build(Object data) { @@ -744,30 +704,12 @@ public List build(Object data) { while (groupsArray.hasNext()) { - Map resultMap = new HashMap<>(); List groupInfo = (List) groupsArray.next(); Iterator consumerInfoIterator = groupInfo.iterator(); - while (consumerInfoIterator.hasNext()) { - - String mapKey = STRING.build(consumerInfoIterator.next()); - if (mappingFunctions.containsKey(mapKey)) { - resultMap.put(mapKey, mappingFunctions.get(mapKey).build(consumerInfoIterator.next())); - } else { //For future - if we don't find an element in our builder map - Object unknownData = consumerInfoIterator.next(); - for (Builder b:mappingFunctions.values()) { - try { - resultMap.put(mapKey,b.build(unknownData)); - break; - } catch (ClassCastException e) { - //We continue with next builder - - } - } - } - } - StreamConsumersInfo streamGroupInfo = new StreamConsumersInfo(resultMap); + StreamConsumersInfo streamGroupInfo = new StreamConsumersInfo( + createMapFromDecodingFunctions(consumerInfoIterator,mappingFunctions)); list.add(streamGroupInfo); } @@ -798,4 +740,28 @@ private BuilderFactory() { throw new InstantiationError( "Must not instantiate this class" ); } + private static Map createMapFromDecodingFunctions( Iterator iterator, Map mappingFunctions) { + + Map resultMap = new HashMap<>(); + while (iterator.hasNext()) { + + String mapKey = STRING.build(iterator.next()); + if (mappingFunctions.containsKey(mapKey)) { + resultMap.put(mapKey, mappingFunctions.get(mapKey).build(iterator.next())); + } else { //For future - if we don't find an element in our builder map + Object unknownData = iterator.next(); + for (Builder b:mappingFunctions.values()) { + try { + resultMap.put(mapKey,b.build(unknownData)); + break; + } catch (ClassCastException e) { + //We continue with next builder + + } + } + } + } + return resultMap; + } + } From d4eaf39cce021ecf3759e57be17031daa674d9c1 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Wed, 15 Jan 2020 07:04:26 +0100 Subject: [PATCH 08/15] Remove redundant casting --- src/main/java/redis/clients/jedis/BuilderFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index c9780b2387..cbf0802549 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -514,7 +514,7 @@ public List build(Object data) { Iterator hashIterator = hash.iterator(); Map map = new HashMap<>(hash.size()/2); while(hashIterator.hasNext()) { - map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode((byte[])hashIterator.next())); + map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next())); } responses.add(new StreamEntry(entryID, map)); } From dc9cf2faec7303ba73fb23c68e0d32fc17259798 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Fri, 31 Jan 2020 09:57:13 +0100 Subject: [PATCH 09/15] Refactor xinfo method into 3 separate Refactor xinfo to simplify usage. Javadoc will be added in another commit --- .../redis/clients/jedis/BinaryClient.java | 17 +++++++++---- .../java/redis/clients/jedis/BinaryJedis.java | 24 +++++++++++++++++++ .../clients/jedis/BinaryShardedJedis.java | 18 ++++++++++++++ src/main/java/redis/clients/jedis/Client.java | 12 +++++----- src/main/java/redis/clients/jedis/Jedis.java | 12 +++++----- .../java/redis/clients/jedis/Protocol.java | 3 +++ .../redis/clients/jedis/ShardedJedis.java | 12 +++++----- .../clients/jedis/StreamConsumersInfo.java | 11 --------- .../redis/clients/jedis/StreamGroupInfo.java | 11 --------- .../java/redis/clients/jedis/StreamInfo.java | 11 --------- .../jedis/commands/BinaryJedisCommands.java | 6 +++++ .../clients/jedis/commands/Commands.java | 6 ++--- .../clients/jedis/commands/JedisCommands.java | 13 ++++------ .../tests/commands/StreamsCommandsTest.java | 13 +++++----- 14 files changed, 96 insertions(+), 73 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index bd8b9fc54f..f1dc28544d 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1,5 +1,8 @@ package redis.clients.jedis; +import static redis.clients.jedis.Protocol.CONSUMERS; +import static redis.clients.jedis.Protocol.GROUPS; +import static redis.clients.jedis.Protocol.STREAM; import static redis.clients.jedis.Protocol.toByteArray; import static redis.clients.jedis.Protocol.Command.*; import static redis.clients.jedis.Protocol.Keyword.ENCODING; @@ -1455,15 +1458,21 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId sendCommand(XCLAIM, arguments.toArray(new byte[arguments.size()][])); } - public void xinfo(byte[] type, byte[] key) { + public void xinfoStream(byte[] key) { - sendCommand(XINFO,type,key); + sendCommand(XINFO,SafeEncoder.encode(STREAM),key); } - public void xinfo ( byte[] type, byte[] key, byte[] group) { + public void xinfoGroup(byte[] key) { - sendCommand(XINFO,type,key,group); + sendCommand(XINFO,SafeEncoder.encode(GROUPS),key); + + } + + public void xinfoConsumers (byte[] key, byte[] group) { + + sendCommand(XINFO,SafeEncoder.encode(CONSUMERS),key,group); } } diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 78ef561ea5..5bfddc7bde 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -4051,6 +4051,30 @@ public Object sendCommand(ProtocolCommand cmd, byte[]... args) { return client.getOne(); } + @Override + public byte[] xinfoStream(byte[] key) { + checkIsInMultiOrPipeline(); + client.xinfoStream(key); + + return client.getBinaryBulkReply(); + + } + + @Override + public List xinfoGroup (byte[] key) { + checkIsInMultiOrPipeline(); + client.xinfoGroup(key); + + return client.getBinaryMultiBulkReply(); + } + @Override + public List xinfoConsumers (byte[] key, byte[] group) { + checkIsInMultiOrPipeline(); + client.xinfoConsumers(key,group); + + return client.getBinaryMultiBulkReply(); + } + public Object sendCommand(ProtocolCommand cmd) { return sendCommand(cmd, dummyArray); } diff --git a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java index 9cd5c61829..a4cd4adad8 100644 --- a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java @@ -1039,6 +1039,24 @@ public List xclaim(byte[] key, byte[] groupname, byte[] consumername, lo return j.xclaim(key, groupname, consumername, minIdleTime, newIdleTime, retries, force, ids); } + @Override + public byte[] xinfoStream(byte[] key) { + Jedis j = getShard(key); + return j.xinfoStream(key); + } + + @Override + public List xinfoGroup(byte[] key) { + Jedis j = getShard(key); + return j.xinfoGroup(key); + } + + @Override + public List xinfoConsumers(byte[] key, byte[] group) { + Jedis j = getShard(key); + return j.xinfoConsumers(key, group); + } + public Object sendCommand(ProtocolCommand cmd, byte[]... args) { // default since no sample key provided in JedisCommands interface byte[] sampleKey = args.length > 0 ? args[0] : cmd.getRaw(); diff --git a/src/main/java/redis/clients/jedis/Client.java b/src/main/java/redis/clients/jedis/Client.java index 042eb955e9..05e0526c4d 100644 --- a/src/main/java/redis/clients/jedis/Client.java +++ b/src/main/java/redis/clients/jedis/Client.java @@ -1263,23 +1263,23 @@ public void xclaim(String key, String group, String consumername, long minIdleTi } @Override - public void xinfo(StreamInfo.StreamInfoType type, String key) { + public void xinfoStream(String key) { - xinfo(SafeEncoder.encode(StreamInfo.STREAM_INFO), SafeEncoder.encode(key)); + xinfoStream(SafeEncoder.encode(key)); } @Override - public void xinfo(StreamGroupInfo.StreamGroupInfoType type, String key) { + public void xinfoGroup(String key) { - xinfo(SafeEncoder.encode(StreamGroupInfo.GROUP_INFO), SafeEncoder.encode(key)); + xinfoGroup(SafeEncoder.encode(key)); } @Override - public void xinfo(StreamConsumersInfo.StreamConsumersInfoType type, String key, String group) { + public void xinfoConsumers(String key, String group) { - xinfo(SafeEncoder.encode(StreamConsumersInfo.CONSUMERS_INFO), SafeEncoder.encode(key),SafeEncoder.encode(group)); + xinfoConsumers(SafeEncoder.encode(key),SafeEncoder.encode(group)); } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index a99ea7abf2..535cb37a1b 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3831,24 +3831,24 @@ public List xclaim(String key, String group, String consumername, l } @Override - public StreamInfo xinfo(StreamInfo.StreamInfoType type, String key) { - client.xinfo(type,key); + public StreamInfo xinfoStream(String key) { + client.xinfoStream(key); return BuilderFactory.STREAM_INFO.build(client.getObjectMultiBulkReply()); } @Override - public List xinfo(StreamGroupInfo.StreamGroupInfoType type, String key) { - client.xinfo(type,key); + public List xinfoGroup(String key) { + client.xinfoGroup(key); return BuilderFactory.STREAM_GROUP_INFO_LIST.build(client.getObjectMultiBulkReply()); } @Override - public List xinfo(StreamConsumersInfo.StreamConsumersInfoType type, String key, String group) { - client.xinfo(type,key,group); + public List xinfoConsumers(String key, String group) { + client.xinfoConsumers(key,group); return BuilderFactory.STREAM_CONSUMERS_INFO_LIST.build(client.getObjectMultiBulkReply()); diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 9e21067c53..1eac2a4386 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -80,6 +80,9 @@ public final class Protocol { public static final byte[] POSITIVE_INFINITY_BYTES = "+inf".getBytes(); public static final byte[] NEGATIVE_INFINITY_BYTES = "-inf".getBytes(); + public static final String STREAM = "STREAM"; + public static final String GROUPS = "GROUPS"; + public static final String CONSUMERS = "CONSUMERS"; private Protocol() { // this prevent the class from instantiation } diff --git a/src/main/java/redis/clients/jedis/ShardedJedis.java b/src/main/java/redis/clients/jedis/ShardedJedis.java index a06578be80..6a8585606b 100644 --- a/src/main/java/redis/clients/jedis/ShardedJedis.java +++ b/src/main/java/redis/clients/jedis/ShardedJedis.java @@ -1056,23 +1056,23 @@ public List xclaim(String key, String group, String consumername, l } @Override - public StreamInfo xinfo(StreamInfo.StreamInfoType streamInfoType, String key) { + public StreamInfo xinfoStream(String key) { Jedis j = getShard(key); - return j.xinfo(streamInfoType, key); + return j.xinfoStream(key); } @Override - public List xinfo(StreamGroupInfo.StreamGroupInfoType streamInfoType, String key) { + public List xinfoGroup(String key) { Jedis j = getShard(key); - return j.xinfo(streamInfoType, key); + return j.xinfoGroup(key); } @Override - public List xinfo (StreamConsumersInfo.StreamConsumersInfoType streamConsumersInfoType, String key, String group){ + public List xinfoConsumers(String key, String group){ Jedis j = getShard(key); - return j.xinfo(streamConsumersInfoType, key, group); + return j.xinfoConsumers(key, group); } public Object sendCommand(ProtocolCommand cmd, String... args) { diff --git a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java index f171a65319..8a837d91a7 100644 --- a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java +++ b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java @@ -44,15 +44,4 @@ public Map getConsumerInfo() { return consumerInfo; } - public static class StreamConsumersInfoType { - - private static final StreamConsumersInfoType streamConsumersInfoType = new StreamConsumersInfoType(); - private StreamConsumersInfoType() { - //Should not be used - }; - - public static StreamConsumersInfoType getStreamGroupInfoType() { - return streamConsumersInfoType; - } - } } diff --git a/src/main/java/redis/clients/jedis/StreamGroupInfo.java b/src/main/java/redis/clients/jedis/StreamGroupInfo.java index e7b530de28..3a0c226d33 100644 --- a/src/main/java/redis/clients/jedis/StreamGroupInfo.java +++ b/src/main/java/redis/clients/jedis/StreamGroupInfo.java @@ -50,15 +50,4 @@ public Map getGroupInfo() { return groupInfo; } - public static class StreamGroupInfoType { - - private static final StreamGroupInfoType streamInfoType = new StreamGroupInfoType(); - private StreamGroupInfoType() { - //Should not be used - }; - - public static StreamGroupInfoType getStreamGroupInfoType() { - return streamInfoType; - } - } } diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java index c6db99f634..c585b34a42 100644 --- a/src/main/java/redis/clients/jedis/StreamInfo.java +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -72,15 +72,4 @@ public Map getStreamInfo() { return streamInfo; } - public static class StreamInfoType { - - private static final StreamInfoType streamInfoType = new StreamInfoType(); - private StreamInfoType() { - //Should not be used - }; - - public static StreamInfoType getStreamInfoType() { - return streamInfoType; - } - } } diff --git a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java index cff42790c4..245750a8ca 100644 --- a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java @@ -356,4 +356,10 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou List xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername); List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids); + + byte[] xinfoStream (byte[] key); + + List xinfoGroup (byte[] key); + + List xinfoConsumers (byte[] key, byte[] group); } diff --git a/src/main/java/redis/clients/jedis/commands/Commands.java b/src/main/java/redis/clients/jedis/commands/Commands.java index c2cd4e70b8..5fd7167958 100644 --- a/src/main/java/redis/clients/jedis/commands/Commands.java +++ b/src/main/java/redis/clients/jedis/commands/Commands.java @@ -394,7 +394,7 @@ void zrevrangeByScoreWithScores(String key, String max, String min, void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime, int retries, boolean force, StreamEntryID... ids); - void xinfo (StreamInfo.StreamInfoType type, String key); - void xinfo (StreamGroupInfo.StreamGroupInfoType type, String key); - void xinfo (StreamConsumersInfo.StreamConsumersInfoType type, String key, String group); + void xinfoStream (String key); + void xinfoGroup (String key); + void xinfoConsumers (String key, String group); } diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index 59388c3258..e5a5e742c7 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -485,16 +485,13 @@ List xclaim( String key, String group, String consumername, long mi long newIdleTime, int retries, boolean force, StreamEntryID... ids); /** - * Introspection command used in order to retrieve different information about the streams - * and associated consumer groups. - * @param type - * @param key + * Introspection command used in order to retrieve different information about the stream + * @param key Stream name * @return */ - StreamInfo xinfo (StreamInfo.StreamInfoType type,String key); - //TODO add an enum to select stream/group/consumers + StreamInfo xinfoStream (String key); - List xinfo (StreamGroupInfo.StreamGroupInfoType type,String key); + List xinfoGroup (String key); - List xinfo (StreamConsumersInfo.StreamConsumersInfoType type, String key, String group); + List xinfoConsumers (String key, String group); } diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index f8275060e6..b3085131b3 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -327,15 +327,14 @@ public void xinfo() { map1.put("f1", "v2"); StreamEntryID id2 = jedis.xadd("xadd-stream1", null, map1); assertNotNull(id1); - StreamInfo streamInfo =jedis.xinfo(StreamInfo.StreamInfoType.getStreamInfoType(), "xadd-stream1"); + StreamInfo streamInfo =jedis.xinfoStream("xadd-stream1"); assertNotNull(id2); jedis.xgroupCreate("xadd-stream1","G1", StreamEntryID.LAST_ENTRY,false); Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>("xadd-stream1", new StreamEntryID("0-0")); jedis.xreadGroup("G1", "myConsumer",1,0,false,streamQeury11); - List groupInfo = jedis.xinfo(StreamGroupInfo.StreamGroupInfoType.getStreamGroupInfoType(),"xadd-stream1"); - List consumersInfo = jedis.xinfo(StreamConsumersInfo.StreamConsumersInfoType.getStreamGroupInfoType(), - "xadd-stream1", "G1"); + List groupInfo = jedis.xinfoGroup("xadd-stream1"); + List consumersInfo = jedis.xinfoConsumers("xadd-stream1", "G1"); //Stream info test assertEquals(2L,streamInfo.getStreamInfo().get(LENGTH)); @@ -383,15 +382,15 @@ public void xinfo() { jedis.xreadGroup("G2", "myConsumer",1,0,false,streamQeury11); jedis.xreadGroup("G2", "myConsumer2",1,0,false,streamQeury11); - List manyGroupsInfo = jedis.xinfo(StreamGroupInfo.StreamGroupInfoType.getStreamGroupInfoType(),"xadd-stream1"); - List manyConsumersInfo = jedis.xinfo(StreamConsumersInfo.StreamConsumersInfoType.getStreamGroupInfoType(),"xadd-stream1", "G2"); + List manyGroupsInfo = jedis.xinfoGroup("xadd-stream1"); + List manyConsumersInfo = jedis.xinfoConsumers("xadd-stream1", "G2"); assertEquals(2,manyGroupsInfo.size()); assertEquals(2,manyConsumersInfo.size()); //Not existing key - redis cli return error so we expect exception try { - jedis.xinfo(StreamInfo.StreamInfoType.getStreamInfoType(), "random"); + jedis.xinfoStream("random"); fail("Command should fail"); } catch (JedisException e) { assertEquals("ERR no such key", e.getMessage()); From 76a4b6358adab2fe4ac730e81aa145194f8a6df5 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Fri, 31 Jan 2020 12:43:48 +0100 Subject: [PATCH 10/15] Update javadocs --- .../clients/jedis/StreamConsumersInfo.java | 15 ++++++++++++++- .../redis/clients/jedis/StreamGroupInfo.java | 18 +++++++++++++++--- .../java/redis/clients/jedis/StreamInfo.java | 16 +++++++++++++++- .../clients/jedis/commands/JedisCommands.java | 14 +++++++++++++- 4 files changed, 57 insertions(+), 6 deletions(-) diff --git a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java index 8a837d91a7..04665f3ac2 100644 --- a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java +++ b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java @@ -2,12 +2,18 @@ import java.util.Map; +/** + * This class holds information about a consumer + * They can be access via getters. + * For future purpose there is also {@link #getConsumerInfo()} ()} method + * that returns a generic {@code Map} - in case where more info is returned from a server + * + */ public class StreamConsumersInfo { public final static String NAME = "name"; public final static String IDLE = "idle"; public final static String PENDING = "pending"; - public static final String CONSUMERS_INFO = "consumers"; private final String name; @@ -15,6 +21,10 @@ public class StreamConsumersInfo { private final long pending; private final Map consumerInfo; + /** + * @param map contains key-value pairs with consumer info + * + */ public StreamConsumersInfo(Map map) { if (map!= null && map.size()>0) { @@ -40,6 +50,9 @@ public long getPending() { return pending; } + /** + * @return Generic map containing all key-value pairs returned by the server + */ public Map getConsumerInfo() { return consumerInfo; } diff --git a/src/main/java/redis/clients/jedis/StreamGroupInfo.java b/src/main/java/redis/clients/jedis/StreamGroupInfo.java index 3a0c226d33..829adb8f44 100644 --- a/src/main/java/redis/clients/jedis/StreamGroupInfo.java +++ b/src/main/java/redis/clients/jedis/StreamGroupInfo.java @@ -3,13 +3,19 @@ import java.io.Serializable; import java.util.Map; +/** + * This class holds information about a stream group + * They can be access via getters. + * For future purpose there is also {@link #getGroupInfo()} method + * that returns a generic {@code Map} - in case where more info is returned from a server + * + */ public class StreamGroupInfo implements Serializable { public final static String NAME = "name"; public final static String CONSUMERS = "consumers"; public final static String PENDING = "pending"; public final static String LAST_DELIVERED = "last-delivered-id"; - public static final String GROUP_INFO = "groups"; private final String name; @@ -18,7 +24,10 @@ public class StreamGroupInfo implements Serializable { private final String lastDeliveredId; private final Map groupInfo; - + /** + * @param map contains key-value pairs with group info + * + */ public StreamGroupInfo(Map map) { if (map!= null && map.size()>0) { groupInfo = map; @@ -46,7 +55,10 @@ public String getLastDeliveredId() { return lastDeliveredId; } - public Map getGroupInfo() { + /** + * @return Generic map containing all key-value pairs returned by the server + */ + public Map getGroupInfo() { return groupInfo; } diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java index c585b34a42..7b3dc4d5c7 100644 --- a/src/main/java/redis/clients/jedis/StreamInfo.java +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -3,6 +3,14 @@ import java.io.Serializable; import java.util.Map; +/** + * This class holds information about stream + * They can be access via getters. + * For future purpose there is also {@link #getStreamInfo} method + * that returns a generic {@code Map} - in case where more info is returned from a server + * + */ + public class StreamInfo implements Serializable { public static final String LENGTH = "length"; @@ -12,7 +20,6 @@ public class StreamInfo implements Serializable { public static final String LAST_GENERATED_ID = "last-generated-id"; public static final String FIRST_ENTRY = "first-entry"; public static final String LAST_ENTRY = "last-entry"; - public static final String STREAM_INFO = "stream"; private final long length; private final long radixTreeKeys; @@ -25,6 +32,10 @@ public class StreamInfo implements Serializable { private final Map streamInfo; + /** + * @param map contains key-value pairs with stream info + * + */ public StreamInfo(Map map) { if (map!= null) { @@ -68,6 +79,9 @@ public StreamEntry getLastEntry() { return lastEntry; } + /** + * @return Generic map containing all key-value pairs returned by the server + */ public Map getStreamInfo() { return streamInfo; } diff --git a/src/main/java/redis/clients/jedis/commands/JedisCommands.java b/src/main/java/redis/clients/jedis/commands/JedisCommands.java index e5a5e742c7..e8f42a1c63 100644 --- a/src/main/java/redis/clients/jedis/commands/JedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/JedisCommands.java @@ -487,11 +487,23 @@ List xclaim( String key, String group, String consumername, long mi /** * Introspection command used in order to retrieve different information about the stream * @param key Stream name - * @return + * @return {@link StreamInfo} that contains information about the stream */ StreamInfo xinfoStream (String key); + /** + * Introspection command used in order to retrieve different information about groups in the stream + * @param key Stream name + * @return List of {@link StreamGroupInfo} containing information about groups + */ List xinfoGroup (String key); + /** + * Introspection command used in order to retrieve different information about consumers in the group + * @param key Stream name + * @param group Group name + * @return List of {@link StreamConsumersInfo} containing information about consumers that belong + * to the the group + */ List xinfoConsumers (String key, String group); } From a442fe64e2fd0936977e2c79594a8189abb4d0db Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Tue, 4 Feb 2020 15:25:21 +0100 Subject: [PATCH 11/15] Refactor binary jedis xinfo commands Also a test for binary xinfo commands has been added --- .../redis/clients/jedis/BinaryClient.java | 12 +- .../java/redis/clients/jedis/BinaryJedis.java | 12 +- .../clients/jedis/BinaryShardedJedis.java | 6 +- .../java/redis/clients/jedis/Protocol.java | 7 +- .../jedis/commands/BinaryJedisCommands.java | 9 +- .../tests/commands/StreamsCommandsTest.java | 129 ++++++++++++++---- 6 files changed, 129 insertions(+), 46 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index f1dc28544d..ee4425c1af 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1,8 +1,8 @@ package redis.clients.jedis; -import static redis.clients.jedis.Protocol.CONSUMERS; -import static redis.clients.jedis.Protocol.GROUPS; -import static redis.clients.jedis.Protocol.STREAM; +import static redis.clients.jedis.Protocol.XINFO_CONSUMERS; +import static redis.clients.jedis.Protocol.XINFO_GROUPS; +import static redis.clients.jedis.Protocol.XINFO_STREAM; import static redis.clients.jedis.Protocol.toByteArray; import static redis.clients.jedis.Protocol.Command.*; import static redis.clients.jedis.Protocol.Keyword.ENCODING; @@ -1460,19 +1460,19 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId public void xinfoStream(byte[] key) { - sendCommand(XINFO,SafeEncoder.encode(STREAM),key); + sendCommand(XINFO,SafeEncoder.encode(XINFO_STREAM),key); } public void xinfoGroup(byte[] key) { - sendCommand(XINFO,SafeEncoder.encode(GROUPS),key); + sendCommand(XINFO,SafeEncoder.encode(XINFO_GROUPS),key); } public void xinfoConsumers (byte[] key, byte[] group) { - sendCommand(XINFO,SafeEncoder.encode(CONSUMERS),key,group); + sendCommand(XINFO,SafeEncoder.encode(XINFO_CONSUMERS),key,group); } } diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 5bfddc7bde..d5afbc9661 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -4052,27 +4052,27 @@ public Object sendCommand(ProtocolCommand cmd, byte[]... args) { } @Override - public byte[] xinfoStream(byte[] key) { + public StreamInfo xinfoStream(byte[] key) { checkIsInMultiOrPipeline(); client.xinfoStream(key); - return client.getBinaryBulkReply(); + return BuilderFactory.STREAM_INFO.build(client.getOne()); } @Override - public List xinfoGroup (byte[] key) { + public List xinfoGroup (byte[] key) { checkIsInMultiOrPipeline(); client.xinfoGroup(key); - return client.getBinaryMultiBulkReply(); + return BuilderFactory.STREAM_GROUP_INFO_LIST.build(client.getBinaryMultiBulkReply()); } @Override - public List xinfoConsumers (byte[] key, byte[] group) { + public List xinfoConsumers (byte[] key, byte[] group) { checkIsInMultiOrPipeline(); client.xinfoConsumers(key,group); - return client.getBinaryMultiBulkReply(); + return BuilderFactory.STREAM_CONSUMERS_INFO_LIST.build(client.getBinaryMultiBulkReply()); } public Object sendCommand(ProtocolCommand cmd) { diff --git a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java index a4cd4adad8..6c0ffd1e70 100644 --- a/src/main/java/redis/clients/jedis/BinaryShardedJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryShardedJedis.java @@ -1040,19 +1040,19 @@ public List xclaim(byte[] key, byte[] groupname, byte[] consumername, lo } @Override - public byte[] xinfoStream(byte[] key) { + public StreamInfo xinfoStream(byte[] key) { Jedis j = getShard(key); return j.xinfoStream(key); } @Override - public List xinfoGroup(byte[] key) { + public List xinfoGroup(byte[] key) { Jedis j = getShard(key); return j.xinfoGroup(key); } @Override - public List xinfoConsumers(byte[] key, byte[] group) { + public List xinfoConsumers(byte[] key, byte[] group) { Jedis j = getShard(key); return j.xinfoConsumers(key, group); } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 1eac2a4386..bc2c1298ef 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -73,6 +73,10 @@ public final class Protocol { public static final String PUBSUB_NUMSUB = "numsub"; public static final String PUBSUB_NUM_PAT = "numpat"; + public static final String XINFO_STREAM = "STREAM"; + public static final String XINFO_GROUPS = "GROUPS"; + public static final String XINFO_CONSUMERS = "CONSUMERS"; + public static final byte[] BYTES_TRUE = toByteArray(1); public static final byte[] BYTES_FALSE = toByteArray(0); public static final byte[] BYTES_TILDE = SafeEncoder.encode("~"); @@ -80,9 +84,6 @@ public final class Protocol { public static final byte[] POSITIVE_INFINITY_BYTES = "+inf".getBytes(); public static final byte[] NEGATIVE_INFINITY_BYTES = "-inf".getBytes(); - public static final String STREAM = "STREAM"; - public static final String GROUPS = "GROUPS"; - public static final String CONSUMERS = "CONSUMERS"; private Protocol() { // this prevent the class from instantiation } diff --git a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java index 245750a8ca..01a6b6a393 100644 --- a/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java +++ b/src/main/java/redis/clients/jedis/commands/BinaryJedisCommands.java @@ -12,6 +12,9 @@ import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; import redis.clients.jedis.SortingParams; +import redis.clients.jedis.StreamConsumersInfo; +import redis.clients.jedis.StreamGroupInfo; +import redis.clients.jedis.StreamInfo; import redis.clients.jedis.Tuple; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.SetParams; @@ -357,9 +360,9 @@ List georadiusByMemberReadonly(byte[] key, byte[] member, dou List xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids); - byte[] xinfoStream (byte[] key); + StreamInfo xinfoStream (byte[] key); - List xinfoGroup (byte[] key); + List xinfoGroup (byte[] key); - List xinfoConsumers (byte[] key, byte[] group); + List xinfoConsumers (byte[] key, byte[] group); } diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index b3085131b3..6552e1bdde 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -29,6 +29,7 @@ import redis.clients.jedis.Protocol.Keyword; import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.util.SafeEncoder; public class StreamsCommandsTest extends JedisCommandTestBase { @@ -321,69 +322,78 @@ public void xpendeing() { @Test public void xinfo() { - Map map1 = new HashMap(); - map1.put("f1", "v1"); - StreamEntryID id1 = jedis.xadd("xadd-stream1", null, map1); - map1.put("f1", "v2"); - StreamEntryID id2 = jedis.xadd("xadd-stream1", null, map1); + final String STREAM_NAME = "xadd-stream1"; + final String F1 = "f1"; + final String V1 = "v1"; + final String V2 = "v2"; + final String G1 = "G1"; + final String G2 = "G2"; + final String MY_CONSUMER = "myConsumer"; + final String MY_CONSUMER2 = "myConsumer2"; + + Map map1 = new HashMap<>(); + map1.put(F1, V1); + StreamEntryID id1 = jedis.xadd(STREAM_NAME, null, map1); + map1.put(F1, V2); + StreamEntryID id2 = jedis.xadd(STREAM_NAME, null, map1); assertNotNull(id1); - StreamInfo streamInfo =jedis.xinfoStream("xadd-stream1"); + StreamInfo streamInfo =jedis.xinfoStream(STREAM_NAME); assertNotNull(id2); - jedis.xgroupCreate("xadd-stream1","G1", StreamEntryID.LAST_ENTRY,false); - Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>("xadd-stream1", new StreamEntryID("0-0")); - jedis.xreadGroup("G1", "myConsumer",1,0,false,streamQeury11); - List groupInfo = jedis.xinfoGroup("xadd-stream1"); - List consumersInfo = jedis.xinfoConsumers("xadd-stream1", "G1"); + jedis.xgroupCreate(STREAM_NAME,G1, StreamEntryID.LAST_ENTRY,false); + Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>(STREAM_NAME, new StreamEntryID("0-0")); + jedis.xreadGroup(G1, MY_CONSUMER,1,0,false,streamQeury11); + List groupInfo = jedis.xinfoGroup(STREAM_NAME); + List consumersInfo = jedis.xinfoConsumers(STREAM_NAME, G1); //Stream info test assertEquals(2L,streamInfo.getStreamInfo().get(LENGTH)); assertEquals(1L,streamInfo.getStreamInfo().get(RADIX_TREE_KEYS)); assertEquals(2L,streamInfo.getStreamInfo().get(RADIX_TREE_NODES)); assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); - assertEquals("v1",((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get("f1")); - assertEquals("v2",((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get("f1")); + assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); + assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); //Using getters assertEquals(2,streamInfo.getLength()); assertEquals(1,streamInfo.getRadixTreeKeys()); assertEquals(2,streamInfo.getRadixTreeNodes()); assertEquals(0,streamInfo.getGroups()); - assertEquals("v1",streamInfo.getFirstEntry().getFields().get("f1")); - assertEquals("v2",streamInfo.getLastEntry().getFields().get("f1")); + assertEquals(V1,streamInfo.getFirstEntry().getFields().get(F1)); + assertEquals(V2,streamInfo.getLastEntry().getFields().get(F1)); //Group info test assertEquals(1,groupInfo.size()); - assertEquals("G1",groupInfo.get(0).getGroupInfo().get(NAME)); + assertEquals(G1,groupInfo.get(0).getGroupInfo().get(NAME)); assertEquals(1L,groupInfo.get(0).getGroupInfo().get(CONSUMERS)); assertEquals(0L,groupInfo.get(0).getGroupInfo().get(PENDING)); assertNotNull(groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); //Using getters assertEquals(1,groupInfo.size()); - assertEquals("G1",groupInfo.get(0).getName()); + assertEquals(G1,groupInfo.get(0).getName()); assertEquals(1,groupInfo.get(0).getConsumers()); assertEquals(0,groupInfo.get(0).getPending()); assertNotNull(groupInfo.get(0).getLastDeliveredId()); //Consumer info test - assertEquals("myConsumer",consumersInfo.get(0).getConsumerInfo().get(redis.clients.jedis.StreamConsumersInfo.NAME)); + assertEquals(MY_CONSUMER,consumersInfo.get(0).getConsumerInfo().get(redis.clients.jedis.StreamConsumersInfo.NAME)); assertEquals(0L,consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.PENDING)); assertTrue((Long)consumersInfo.get(0).getConsumerInfo().get(IDLE)>0); //Using getters - assertEquals("myConsumer",consumersInfo.get(0).getName()); + assertEquals(MY_CONSUMER,consumersInfo.get(0).getName()); assertEquals(0L,consumersInfo.get(0).getPending()); assertTrue(consumersInfo.get(0).getIdle()>0); //test with more groups and consumers - jedis.xgroupCreate("xadd-stream1","G2", StreamEntryID.LAST_ENTRY,false); - jedis.xreadGroup("G1", "myConsumer2",1,0,false,streamQeury11); - jedis.xreadGroup("G2", "myConsumer",1,0,false,streamQeury11); - jedis.xreadGroup("G2", "myConsumer2",1,0,false,streamQeury11); + jedis.xgroupCreate(STREAM_NAME,G2, StreamEntryID.LAST_ENTRY,false); + jedis.xreadGroup(G1, MY_CONSUMER2,1,0,false,streamQeury11); + jedis.xreadGroup(G2, MY_CONSUMER,1,0,false,streamQeury11); + jedis.xreadGroup(G2, MY_CONSUMER2,1,0,false,streamQeury11); - List manyGroupsInfo = jedis.xinfoGroup("xadd-stream1"); - List manyConsumersInfo = jedis.xinfoConsumers("xadd-stream1", "G2"); + List manyGroupsInfo = jedis.xinfoGroup(STREAM_NAME); + List manyConsumersInfo = jedis.xinfoConsumers(STREAM_NAME, G2); assertEquals(2,manyGroupsInfo.size()); assertEquals(2,manyConsumersInfo.size()); @@ -398,6 +408,75 @@ public void xinfo() { } + @Test + public void xinfoBinary() { + + final String STREAM_NAME = "xadd-stream1"; + final String F1 = "f1"; + final String V1 = "v1"; + final String V2 = "v2"; + final String G1 = "G1"; + final String G2 = "G2"; + final String MY_CONSUMER = "myConsumer"; + final String MY_CONSUMER2 = "myConsumer2"; + + Map map1 = new HashMap<>(); + map1.put(F1, V1); + StreamEntryID id1 = jedis.xadd(STREAM_NAME, null, map1); + map1.put(F1, V2); + StreamEntryID id2 = jedis.xadd(STREAM_NAME, null, map1); + assertNotNull(id1); + StreamInfo streamInfo = jedis.xinfoStream(SafeEncoder.encode(STREAM_NAME)); + assertNotNull(id2); + + jedis.xgroupCreate(STREAM_NAME,G1, StreamEntryID.LAST_ENTRY,false); + Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>(STREAM_NAME, new StreamEntryID("0-0")); + jedis.xreadGroup(G1, MY_CONSUMER,1,0,false,streamQeury11); + List groupInfo = jedis.xinfoGroup(SafeEncoder.encode(STREAM_NAME)); + List consumersInfo = jedis.xinfoConsumers(SafeEncoder.encode(STREAM_NAME), SafeEncoder.encode(G1)); + + //Stream info test + assertEquals(2L,streamInfo.getStreamInfo().get(LENGTH)); + assertEquals(1L,streamInfo.getStreamInfo().get(RADIX_TREE_KEYS)); + assertEquals(2L,streamInfo.getStreamInfo().get(RADIX_TREE_NODES)); + assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); + assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); + assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); + + //Group info test + assertEquals(1,groupInfo.size()); + assertEquals(G1,groupInfo.get(0).getGroupInfo().get(NAME)); + assertEquals(1L,groupInfo.get(0).getGroupInfo().get(CONSUMERS)); + assertEquals(0L,groupInfo.get(0).getGroupInfo().get(PENDING)); + assertNotNull(groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); + + //Consumer info test + assertEquals(MY_CONSUMER,consumersInfo.get(0).getConsumerInfo().get(redis.clients.jedis.StreamConsumersInfo.NAME)); + assertEquals(0L,consumersInfo.get(0).getConsumerInfo().get(StreamConsumersInfo.PENDING)); + assertTrue((Long)consumersInfo.get(0).getConsumerInfo().get(IDLE)>0); + + //test with more groups and consumers + jedis.xgroupCreate(STREAM_NAME,G2, StreamEntryID.LAST_ENTRY,false); + jedis.xreadGroup(G1, MY_CONSUMER2,1,0,false,streamQeury11); + jedis.xreadGroup(G2, MY_CONSUMER,1,0,false,streamQeury11); + jedis.xreadGroup(G2, MY_CONSUMER2,1,0,false,streamQeury11); + + List manyGroupsInfo = jedis.xinfoGroup(STREAM_NAME); + List manyConsumersInfo = jedis.xinfoConsumers(STREAM_NAME, G2); + + assertEquals(2,manyGroupsInfo.size()); + assertEquals(2,manyConsumersInfo.size()); + + //Not existing key - redis cli return error so we expect exception + try { + jedis.xinfoStream(SafeEncoder.encode("random")); + fail("Command should fail"); + } catch (JedisException e) { + assertEquals("ERR no such key", e.getMessage()); + } + + } + @Test From 6f6c88fa782c4ced40a73b3dfc5ab60cdb452462 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Tue, 11 Feb 2020 11:03:47 +0100 Subject: [PATCH 12/15] Small fixes after review Added test for last-generated-id Moved xinfo keywords to enum Removed throwing runtime exception from xinfo classes --- .../redis/clients/jedis/BinaryClient.java | 9 +++----- .../java/redis/clients/jedis/Protocol.java | 6 +---- .../clients/jedis/StreamConsumersInfo.java | 13 ++++------- .../redis/clients/jedis/StreamGroupInfo.java | 13 +++++------ .../java/redis/clients/jedis/StreamInfo.java | 22 ++++++++----------- .../tests/commands/StreamsCommandsTest.java | 4 ++++ 6 files changed, 27 insertions(+), 40 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BinaryClient.java b/src/main/java/redis/clients/jedis/BinaryClient.java index ee4425c1af..78d68c9eb5 100644 --- a/src/main/java/redis/clients/jedis/BinaryClient.java +++ b/src/main/java/redis/clients/jedis/BinaryClient.java @@ -1,8 +1,5 @@ package redis.clients.jedis; -import static redis.clients.jedis.Protocol.XINFO_CONSUMERS; -import static redis.clients.jedis.Protocol.XINFO_GROUPS; -import static redis.clients.jedis.Protocol.XINFO_STREAM; import static redis.clients.jedis.Protocol.toByteArray; import static redis.clients.jedis.Protocol.Command.*; import static redis.clients.jedis.Protocol.Keyword.ENCODING; @@ -1460,19 +1457,19 @@ public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minId public void xinfoStream(byte[] key) { - sendCommand(XINFO,SafeEncoder.encode(XINFO_STREAM),key); + sendCommand(XINFO,Keyword.STREAM.raw,key); } public void xinfoGroup(byte[] key) { - sendCommand(XINFO,SafeEncoder.encode(XINFO_GROUPS),key); + sendCommand(XINFO,Keyword.GROUPS.raw,key); } public void xinfoConsumers (byte[] key, byte[] group) { - sendCommand(XINFO,SafeEncoder.encode(XINFO_CONSUMERS),key,group); + sendCommand(XINFO,Keyword.CONSUMERS.raw,key,group); } } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index bc2c1298ef..fb845782d9 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -73,10 +73,6 @@ public final class Protocol { public static final String PUBSUB_NUMSUB = "numsub"; public static final String PUBSUB_NUM_PAT = "numpat"; - public static final String XINFO_STREAM = "STREAM"; - public static final String XINFO_GROUPS = "GROUPS"; - public static final String XINFO_CONSUMERS = "CONSUMERS"; - public static final byte[] BYTES_TRUE = toByteArray(1); public static final byte[] BYTES_FALSE = toByteArray(0); public static final byte[] BYTES_TILDE = SafeEncoder.encode("~"); @@ -284,7 +280,7 @@ public static enum Keyword { RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, KILL, LEN, REFCOUNT, ENCODING, IDLETIME, GETNAME, SETNAME, LIST, MATCH, COUNT, PING, PONG, UNLOAD, REPLACE, KEYS, PAUSE, DOCTOR, BLOCK, NOACK, STREAMS, KEY, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, - IDLE, TIME, RETRYCOUNT, FORCE; + IDLE, TIME, RETRYCOUNT, FORCE, STREAM, GROUPS, CONSUMERS; public final byte[] raw; diff --git a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java index 04665f3ac2..c97d865492 100644 --- a/src/main/java/redis/clients/jedis/StreamConsumersInfo.java +++ b/src/main/java/redis/clients/jedis/StreamConsumersInfo.java @@ -15,7 +15,6 @@ public class StreamConsumersInfo { public final static String IDLE = "idle"; public final static String PENDING = "pending"; - private final String name; private final long idle; private final long pending; @@ -27,14 +26,10 @@ public class StreamConsumersInfo { */ public StreamConsumersInfo(Map map) { - if (map!= null && map.size()>0) { - consumerInfo = map; - name = (String) map.get(NAME); - idle = (long) map.get(IDLE); - pending = (long) map.get(PENDING); - - - } else throw new IllegalArgumentException(); + consumerInfo = map; + name = (String) map.get(NAME); + idle = (long) map.get(IDLE); + pending = (long) map.get(PENDING); } diff --git a/src/main/java/redis/clients/jedis/StreamGroupInfo.java b/src/main/java/redis/clients/jedis/StreamGroupInfo.java index 829adb8f44..f0b293a8fc 100644 --- a/src/main/java/redis/clients/jedis/StreamGroupInfo.java +++ b/src/main/java/redis/clients/jedis/StreamGroupInfo.java @@ -29,14 +29,13 @@ public class StreamGroupInfo implements Serializable { * */ public StreamGroupInfo(Map map) { - if (map!= null && map.size()>0) { - groupInfo = map; - name = (String) map.get(NAME); - consumers = (long) map.get(CONSUMERS); - pending = (long) map.get(PENDING); - lastDeliveredId = (String) map.get(LAST_DELIVERED); - } else throw new IllegalArgumentException(); + groupInfo = map; + name = (String) map.get(NAME); + consumers = (long) map.get(CONSUMERS); + pending = (long) map.get(PENDING); + lastDeliveredId = (String) map.get(LAST_DELIVERED); + } public String getName() { diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java index 7b3dc4d5c7..bbd7bc4e6b 100644 --- a/src/main/java/redis/clients/jedis/StreamInfo.java +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -23,7 +23,6 @@ public class StreamInfo implements Serializable { private final long length; private final long radixTreeKeys; - private final long radixTreeNodes; private final long groups; private final String lastGeneratedId; @@ -31,24 +30,21 @@ public class StreamInfo implements Serializable { private final StreamEntry lastEntry; private final Map streamInfo; - /** * @param map contains key-value pairs with stream info * */ public StreamInfo(Map map) { - if (map!= null) { - streamInfo = map; - length = (Long) map.get(LENGTH); - radixTreeKeys = (Long) map.get(RADIX_TREE_KEYS); - radixTreeNodes = (Long) map.get(RADIX_TREE_NODES); - groups = (Long) map.get(GROUPS); - lastGeneratedId = (String) map.get(LAST_GENERATED_ID); - firstEntry = (StreamEntry) map.get(FIRST_ENTRY); - lastEntry = (StreamEntry) map.get(LAST_ENTRY); - - } else throw new IllegalArgumentException("InfoMap can not be null"); + streamInfo = map; + length = (Long) map.get(LENGTH); + radixTreeKeys = (Long) map.get(RADIX_TREE_KEYS); + radixTreeNodes = (Long) map.get(RADIX_TREE_NODES); + groups = (Long) map.get(GROUPS); + lastGeneratedId = (String) map.get(LAST_GENERATED_ID); + firstEntry = (StreamEntry) map.get(FIRST_ENTRY); + lastEntry = (StreamEntry) map.get(LAST_ENTRY); + } public long getLength() { diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index 6552e1bdde..e7932019ba 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -12,6 +12,7 @@ import static redis.clients.jedis.StreamInfo.FIRST_ENTRY; import static redis.clients.jedis.StreamInfo.GROUPS; import static redis.clients.jedis.StreamInfo.LAST_ENTRY; +import static redis.clients.jedis.StreamInfo.LAST_GENERATED_ID; import static redis.clients.jedis.StreamInfo.LENGTH; import static redis.clients.jedis.StreamInfo.RADIX_TREE_KEYS; import static redis.clients.jedis.StreamInfo.RADIX_TREE_NODES; @@ -353,6 +354,7 @@ public void xinfo() { assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); + assertNotNull(streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); //Using getters assertEquals(2,streamInfo.getLength()); @@ -361,6 +363,7 @@ public void xinfo() { assertEquals(0,streamInfo.getGroups()); assertEquals(V1,streamInfo.getFirstEntry().getFields().get(F1)); assertEquals(V2,streamInfo.getLastEntry().getFields().get(F1)); + assertNotNull(streamInfo.getLastGeneratedId()); //Group info test assertEquals(1,groupInfo.size()); @@ -442,6 +445,7 @@ public void xinfoBinary() { assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); + assertNotNull(streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); //Group info test assertEquals(1,groupInfo.size()); From daa90be314527340d97aa34ac8f46ab95bbab2e7 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Tue, 11 Feb 2020 12:00:48 +0100 Subject: [PATCH 13/15] Update last_generated_id test --- .../clients/jedis/tests/commands/StreamsCommandsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index e7932019ba..0cf97d317b 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -354,7 +354,7 @@ public void xinfo() { assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); - assertNotNull(streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); + assertEquals(id2.toString(),streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); //Using getters assertEquals(2,streamInfo.getLength()); @@ -363,7 +363,7 @@ public void xinfo() { assertEquals(0,streamInfo.getGroups()); assertEquals(V1,streamInfo.getFirstEntry().getFields().get(F1)); assertEquals(V2,streamInfo.getLastEntry().getFields().get(F1)); - assertNotNull(streamInfo.getLastGeneratedId()); + assertEquals(id2.toString(),streamInfo.getLastGeneratedId()); //Group info test assertEquals(1,groupInfo.size()); From df4255daac659a2b495166548502e91094a41780 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Tue, 11 Feb 2020 12:11:54 +0100 Subject: [PATCH 14/15] Change getLastGeneratedId to return StreamEntryID Better to use existing class --- src/main/java/redis/clients/jedis/BuilderFactory.java | 2 +- src/main/java/redis/clients/jedis/StreamInfo.java | 6 +++--- .../clients/jedis/tests/commands/StreamsCommandsTest.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index cbf0802549..81cc4ae8ec 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -598,7 +598,7 @@ public String toString() { private Map createDecoderMap() { Map tempMappingFunctions = new HashMap<>(); - tempMappingFunctions.put(StreamInfo.LAST_GENERATED_ID,STRING); + tempMappingFunctions.put(StreamInfo.LAST_GENERATED_ID,STREAM_ENTRY_ID); tempMappingFunctions.put(StreamInfo.FIRST_ENTRY,STREAM_ENTRY); tempMappingFunctions.put(StreamInfo.LENGTH, LONG); tempMappingFunctions.put(StreamInfo.RADIX_TREE_KEYS, LONG); diff --git a/src/main/java/redis/clients/jedis/StreamInfo.java b/src/main/java/redis/clients/jedis/StreamInfo.java index bbd7bc4e6b..ef6c7c502d 100644 --- a/src/main/java/redis/clients/jedis/StreamInfo.java +++ b/src/main/java/redis/clients/jedis/StreamInfo.java @@ -25,7 +25,7 @@ public class StreamInfo implements Serializable { private final long radixTreeKeys; private final long radixTreeNodes; private final long groups; - private final String lastGeneratedId; + private final StreamEntryID lastGeneratedId; private final StreamEntry firstEntry; private final StreamEntry lastEntry; private final Map streamInfo; @@ -41,7 +41,7 @@ public StreamInfo(Map map) { radixTreeKeys = (Long) map.get(RADIX_TREE_KEYS); radixTreeNodes = (Long) map.get(RADIX_TREE_NODES); groups = (Long) map.get(GROUPS); - lastGeneratedId = (String) map.get(LAST_GENERATED_ID); + lastGeneratedId = (StreamEntryID) map.get(LAST_GENERATED_ID); firstEntry = (StreamEntry) map.get(FIRST_ENTRY); lastEntry = (StreamEntry) map.get(LAST_ENTRY); @@ -63,7 +63,7 @@ public long getGroups() { return groups; } - public String getLastGeneratedId() { + public StreamEntryID getLastGeneratedId() { return lastGeneratedId; } diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index 0cf97d317b..8d8ff2c521 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -354,7 +354,7 @@ public void xinfo() { assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); - assertEquals(id2.toString(),streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); + assertEquals(id2,streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); //Using getters assertEquals(2,streamInfo.getLength()); @@ -363,7 +363,7 @@ public void xinfo() { assertEquals(0,streamInfo.getGroups()); assertEquals(V1,streamInfo.getFirstEntry().getFields().get(F1)); assertEquals(V2,streamInfo.getLastEntry().getFields().get(F1)); - assertEquals(id2.toString(),streamInfo.getLastGeneratedId()); + assertEquals(id2,streamInfo.getLastGeneratedId()); //Group info test assertEquals(1,groupInfo.size()); From c50cf79e2dc83f1c75bdd728af5dc7d1d4490462 Mon Sep 17 00:00:00 2001 From: Michal Cholewa Date: Wed, 12 Feb 2020 11:05:16 +0100 Subject: [PATCH 15/15] Update StreamGroupInfo and test LastDeliveredId type has been changed to be StreamEntryId instead of simple String. Also unstable test for Idle Consumer time has been fixed. --- .../redis/clients/jedis/BuilderFactory.java | 2 +- .../redis/clients/jedis/StreamGroupInfo.java | 6 +++--- .../tests/commands/StreamsCommandsTest.java | 19 +++++++++++++------ 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 81cc4ae8ec..e991192528 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -640,7 +640,7 @@ private Map createDecoderMap() { tempMappingFunctions.put(StreamGroupInfo.NAME,STRING); tempMappingFunctions.put(StreamGroupInfo.CONSUMERS, LONG); tempMappingFunctions.put(StreamGroupInfo.PENDING, LONG); - tempMappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STRING); + tempMappingFunctions.put(StreamGroupInfo.LAST_DELIVERED,STREAM_ENTRY_ID); return tempMappingFunctions; } diff --git a/src/main/java/redis/clients/jedis/StreamGroupInfo.java b/src/main/java/redis/clients/jedis/StreamGroupInfo.java index f0b293a8fc..8a5d62a0ce 100644 --- a/src/main/java/redis/clients/jedis/StreamGroupInfo.java +++ b/src/main/java/redis/clients/jedis/StreamGroupInfo.java @@ -21,7 +21,7 @@ public class StreamGroupInfo implements Serializable { private final String name; private final long consumers; private final long pending; - private final String lastDeliveredId; + private final StreamEntryID lastDeliveredId; private final Map groupInfo; /** @@ -34,7 +34,7 @@ public StreamGroupInfo(Map map) { name = (String) map.get(NAME); consumers = (long) map.get(CONSUMERS); pending = (long) map.get(PENDING); - lastDeliveredId = (String) map.get(LAST_DELIVERED); + lastDeliveredId = (StreamEntryID) map.get(LAST_DELIVERED); } @@ -50,7 +50,7 @@ public long getPending() { return pending; } - public String getLastDeliveredId() { + public StreamEntryID getLastDeliveredId() { return lastDeliveredId; } diff --git a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java index 8d8ff2c521..90842764f3 100644 --- a/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/tests/commands/StreamsCommandsTest.java @@ -321,7 +321,7 @@ public void xpendeing() { } @Test - public void xinfo() { + public void xinfo() throws InterruptedException { final String STREAM_NAME = "xadd-stream1"; final String F1 = "f1"; @@ -344,6 +344,9 @@ public void xinfo() { jedis.xgroupCreate(STREAM_NAME,G1, StreamEntryID.LAST_ENTRY,false); Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>(STREAM_NAME, new StreamEntryID("0-0")); jedis.xreadGroup(G1, MY_CONSUMER,1,0,false,streamQeury11); + + Thread.sleep(1); + List groupInfo = jedis.xinfoGroup(STREAM_NAME); List consumersInfo = jedis.xinfoConsumers(STREAM_NAME, G1); @@ -365,19 +368,20 @@ public void xinfo() { assertEquals(V2,streamInfo.getLastEntry().getFields().get(F1)); assertEquals(id2,streamInfo.getLastGeneratedId()); + //Group info test assertEquals(1,groupInfo.size()); assertEquals(G1,groupInfo.get(0).getGroupInfo().get(NAME)); assertEquals(1L,groupInfo.get(0).getGroupInfo().get(CONSUMERS)); assertEquals(0L,groupInfo.get(0).getGroupInfo().get(PENDING)); - assertNotNull(groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); + assertEquals(id2,groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); //Using getters assertEquals(1,groupInfo.size()); assertEquals(G1,groupInfo.get(0).getName()); assertEquals(1,groupInfo.get(0).getConsumers()); assertEquals(0,groupInfo.get(0).getPending()); - assertNotNull(groupInfo.get(0).getLastDeliveredId()); + assertEquals(id2,groupInfo.get(0).getLastDeliveredId()); //Consumer info test assertEquals(MY_CONSUMER,consumersInfo.get(0).getConsumerInfo().get(redis.clients.jedis.StreamConsumersInfo.NAME)); @@ -412,7 +416,7 @@ public void xinfo() { } @Test - public void xinfoBinary() { + public void xinfoBinary() throws InterruptedException { final String STREAM_NAME = "xadd-stream1"; final String F1 = "f1"; @@ -435,6 +439,9 @@ public void xinfoBinary() { jedis.xgroupCreate(STREAM_NAME,G1, StreamEntryID.LAST_ENTRY,false); Entry streamQeury11 = new AbstractMap.SimpleImmutableEntry<>(STREAM_NAME, new StreamEntryID("0-0")); jedis.xreadGroup(G1, MY_CONSUMER,1,0,false,streamQeury11); + + Thread.sleep(1); + List groupInfo = jedis.xinfoGroup(SafeEncoder.encode(STREAM_NAME)); List consumersInfo = jedis.xinfoConsumers(SafeEncoder.encode(STREAM_NAME), SafeEncoder.encode(G1)); @@ -445,14 +452,14 @@ public void xinfoBinary() { assertEquals(0L,streamInfo.getStreamInfo().get(GROUPS)); assertEquals(V1,((StreamEntry)streamInfo.getStreamInfo().get(FIRST_ENTRY)).getFields().get(F1)); assertEquals(V2,((StreamEntry)streamInfo.getStreamInfo().get(LAST_ENTRY)).getFields().get(F1)); - assertNotNull(streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); + assertEquals(id2,streamInfo.getStreamInfo().get(LAST_GENERATED_ID)); //Group info test assertEquals(1,groupInfo.size()); assertEquals(G1,groupInfo.get(0).getGroupInfo().get(NAME)); assertEquals(1L,groupInfo.get(0).getGroupInfo().get(CONSUMERS)); assertEquals(0L,groupInfo.get(0).getGroupInfo().get(PENDING)); - assertNotNull(groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); + assertEquals(id2,groupInfo.get(0).getGroupInfo().get(LAST_DELIVERED)); //Consumer info test assertEquals(MY_CONSUMER,consumersInfo.get(0).getConsumerInfo().get(redis.clients.jedis.StreamConsumersInfo.NAME));