diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index 161dd3b32e..9d3916abc9 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2228,6 +2228,10 @@ public final CommandObject xpending(String key, String gro BuilderFactory.STREAM_PENDING_SUMMARY); } + /** + * @deprecated Use {@link CommandObjects#xpending(java.lang.String, java.lang.String, redis.clients.jedis.params.XPendingParams)}. + */ + @Deprecated public final CommandObject> xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername) { CommandArguments args = commandArguments(XPENDING).key(key).add(groupname) @@ -2246,6 +2250,10 @@ public final CommandObject xpending(byte[] key, byte[] groupname) { BuilderFactory.RAW_OBJECT); } + /** + * @deprecated Use {@link CommandObjects#xpending(byte[], byte[], redis.clients.jedis.params.XPendingParams)}. + */ + @Deprecated public final CommandObject> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) { CommandArguments args = commandArguments(XPENDING).key(key).add(groupname) diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index aa12c18eaa..4a65eebd6c 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -4566,7 +4566,11 @@ public long xtrim(byte[] key, XTrimParams params) { return connection.executeCommand(commandObjects.xtrim(key, params)); } + /** + * @deprecated Use {@link Jedis#xpending(byte[], byte[], redis.clients.jedis.params.XPendingParams)}. + */ @Override + @Deprecated public List xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername) { checkIsInMultiOrPipeline(); @@ -8780,7 +8784,11 @@ public StreamPendingSummary xpending(final String key, final String groupname) { return connection.executeCommand(commandObjects.xpending(key, groupname)); } + /** + * @deprecated Use {@link Jedis#xpending(java.lang.String, java.lang.String, redis.clients.jedis.params.XPendingParams)}. + */ @Override + @Deprecated public List xpending(final String key, final String groupname, final StreamEntryID start, final StreamEntryID end, final int count, final String consumername) { checkIsInMultiOrPipeline(); diff --git a/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java index b5e9c46007..ce1dc4caae 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamBinaryCommands.java @@ -4,7 +4,6 @@ import java.util.Map; import redis.clients.jedis.params.*; -import redis.clients.jedis.resps.StreamFullInfo; public interface StreamBinaryCommands { @@ -42,6 +41,10 @@ default byte[] xadd(byte[] key, Map hash, XAddParams params) { Object xpending(byte[] key, byte[] groupname); + /** + * @deprecated Use {@link StreamBinaryCommands#xpending(byte[], byte[], redis.clients.jedis.params.XPendingParams)}. + */ + @Deprecated List xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername); List xpending(byte[] key, byte[] groupname, XPendingParams params); diff --git a/src/main/java/redis/clients/jedis/commands/StreamCommands.java b/src/main/java/redis/clients/jedis/commands/StreamCommands.java index b5000ed509..2872f8bfeb 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamCommands.java @@ -153,7 +153,9 @@ default StreamEntryID xadd(String key, Map hash, XAddParams para * @param end * @param count * @param consumername + * @deprecated Use {@link StreamCommands#xpending(java.lang.String, java.lang.String, redis.clients.jedis.params.XPendingParams)}. */ + @Deprecated List xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); diff --git a/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java b/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java index f5028dfaa9..daa99da0fc 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamPipelineBinaryCommands.java @@ -5,7 +5,6 @@ import redis.clients.jedis.Response; import redis.clients.jedis.params.*; -import redis.clients.jedis.resps.StreamFullInfo; public interface StreamPipelineBinaryCommands { // @@ -45,6 +44,10 @@ default Response xadd(byte[] key, Map hash, XAddParams p Response xpending(byte[] key, byte[] groupname); + /** + * @deprecated Use {@link StreamPipelineBinaryCommands#xpending(byte[], byte[], redis.clients.jedis.params.XPendingParams)}. + */ + @Deprecated Response> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername); Response> xpending(byte[] key, byte[] groupname, XPendingParams params); diff --git a/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java b/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java index 5437abd0d1..b99d377c3c 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java @@ -154,7 +154,9 @@ default Response xadd(String key, Map hash, XAddP * @param end * @param count * @param consumername + * @deprecated Use {@link StreamPipelineCommands#xpending(java.lang.String, java.lang.String, redis.clients.jedis.params.XPendingParams)}. */ + @Deprecated Response> xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername); diff --git a/src/main/java/redis/clients/jedis/params/XPendingParams.java b/src/main/java/redis/clients/jedis/params/XPendingParams.java index 59273917fb..7ce3e687fd 100644 --- a/src/main/java/redis/clients/jedis/params/XPendingParams.java +++ b/src/main/java/redis/clients/jedis/params/XPendingParams.java @@ -1,26 +1,66 @@ package redis.clients.jedis.params; import static redis.clients.jedis.Protocol.Keyword.IDLE; +import static redis.clients.jedis.Protocol.toByteArray; +import static redis.clients.jedis.args.RawableFactory.from; import redis.clients.jedis.CommandArguments; -import redis.clients.jedis.Protocol; import redis.clients.jedis.StreamEntryID; -import redis.clients.jedis.util.SafeEncoder; +import redis.clients.jedis.args.Rawable; public class XPendingParams implements IParams { + private boolean legacy = true; private Long idle; + private Rawable start; // TODO: final + private Rawable end; // TODO: final + private int count = Integer.MIN_VALUE; // TODO: final + private Rawable consumer; + + /** + * @deprecated Use {@link XPendingParams#XPendingParams(redis.clients.jedis.StreamEntryID, redis.clients.jedis.StreamEntryID, int)}. + */ + @Deprecated + public XPendingParams() { + } - private String consumer; + /** + * @deprecated Use {@link XPendingParams#xPendingParams(redis.clients.jedis.StreamEntryID, redis.clients.jedis.StreamEntryID, int)}. + */ + @Deprecated + public static XPendingParams xPendingParams() { + return new XPendingParams(); + } - private StreamEntryID start; + public XPendingParams(StreamEntryID start, StreamEntryID end, int count) { + this(start.toString(), end.toString(), count); + } + + public XPendingParams(String start, String end, int count) { + this(from(start), from(end), count); + } - private StreamEntryID end; + public XPendingParams(byte[] start, byte[] end, int count) { + this(from(start), from(end), count); + } - private Integer count; + private XPendingParams(Rawable start, Rawable end, int count) { + this.legacy = false; + this.start = start; + this.end = end; + this.count = count; + } - public static XPendingParams xPendingParams() { - return new XPendingParams(); + public static XPendingParams xPendingParams(StreamEntryID start, StreamEntryID end, int count) { + return new XPendingParams(start, end, count); + } + + public static XPendingParams xPendingParams(String start, String end, int count) { + return new XPendingParams(start, end, count); + } + + public static XPendingParams xPendingParams(byte[] start, byte[] end, int count) { + return new XPendingParams(start, end, count); } public XPendingParams idle(long idle) { @@ -28,13 +68,15 @@ public XPendingParams idle(long idle) { return this; } + @Deprecated public XPendingParams start(StreamEntryID start) { - this.start = start; + this.start = from(start.toString()); return this; } + @Deprecated public XPendingParams end(StreamEntryID end) { - this.end = end; + this.end = from(end.toString()); return this; } @@ -44,7 +86,12 @@ public XPendingParams count(int count) { } public XPendingParams consumer(String consumer) { - this.consumer = consumer; + this.consumer = from(consumer); + return this; + } + + public XPendingParams consumer(byte[] consumer) { + this.consumer = from(consumer); return this; } @@ -52,28 +99,31 @@ public XPendingParams consumer(String consumer) { public void addParams(CommandArguments args) { if (idle != null) { - args.add(IDLE.getRaw()); - args.add(Protocol.toByteArray(idle)); + args.add(IDLE).add(toByteArray(idle)); } - if (start == null) { - args.add(SafeEncoder.encode("-")); + if (legacy) { + if (start == null) { + args.add("-"); + } else { + args.add(start); + } + + if (end == null) { + args.add("+"); + } else { + args.add(end); + } + + if (count != Integer.MIN_VALUE) { + args.add(toByteArray(count)); + } } else { - args.add(SafeEncoder.encode(start.toString())); - } - - if (end == null) { - args.add(SafeEncoder.encode("+")); - } else { - args.add(SafeEncoder.encode(end.toString())); - } - - if (count != null) { - args.add(Protocol.toByteArray(count)); + args.add(start).add(end).add(toByteArray(count)); } if (consumer != null) { - args.add(SafeEncoder.encode(consumer)); + args.add(consumer); } } } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index 432e6b8cac..57c7be1cad 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -423,7 +423,6 @@ public void xreadGroupWithParams() { @Test public void xack() { - Map map = new HashMap(); map.put("f1", "v1"); jedis.xadd("xack-stream", (StreamEntryID) null, map); @@ -482,6 +481,29 @@ public void xpendingWithParams() { assertEquals(0, pendingRange.size()); } + @Test + public void xpendingRange() { + Map map = new HashMap<>(); + map.put("foo", "bar"); + StreamEntryID m1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); + StreamEntryID m2 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); + jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false); + + // read 1 message from the group with each consumer + Map streamQeury = Collections.singletonMap( + "xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY); + jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury); + jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury); + + List response = jedis.xpending("xpendeing-stream", "xpendeing-group", + XPendingParams.xPendingParams("(0", "+", 5)); + assertEquals(2, response.size()); + assertEquals(m1, response.get(0).getID()); + assertEquals("consumer1", response.get(0).getConsumerName()); + assertEquals(m2, response.get(1).getID()); + assertEquals("consumer2", response.get(1).getConsumerName()); + } + @Test public void xclaimWithParams() { Map map = new HashMap<>();