From a6666c11779f6f2c7de1bb70575258c4d3e36397 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Tue, 26 Mar 2024 22:40:02 +0600 Subject: [PATCH 1/5] Add last entry id for XREAD variants and support XREAD reply as map --- .../redis/clients/jedis/BuilderFactory.java | 37 +++++- .../redis/clients/jedis/CommandObjects.java | 21 ++++ src/main/java/redis/clients/jedis/Jedis.java | 18 ++- .../redis/clients/jedis/PipeliningBase.java | 10 ++ .../redis/clients/jedis/StreamEntryID.java | 50 +++++++-- .../redis/clients/jedis/UnifiedJedis.java | 11 ++ .../jedis/commands/StreamCommands.java | 12 ++ .../commands/StreamPipelineCommands.java | 12 ++ .../commands/jedis/StreamsCommandsTest.java | 105 +++++++++++++----- .../pipeline/StreamsPipelineCommandsTest.java | 28 +++-- .../jedis/params/XPendingParamsTest.java | 4 +- 11 files changed, 255 insertions(+), 53 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 18bcc5a31c..2f9f5aa56c 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1420,10 +1420,10 @@ public List>> build(Object data) { .collect(Collectors.toList()); } else { List>> result = new ArrayList<>(list.size()); - for (Object streamObj : list) { - List stream = (List) streamObj; - String streamKey = STRING.build(stream.get(0)); - List streamEntries = STREAM_ENTRY_LIST.build(stream.get(1)); + for (Object anObj : list) { + List streamObj = (List) anObj; + String streamKey = STRING.build(streamObj.get(0)); + List streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1)); result.add(KeyValue.of(streamKey, streamEntries)); } return result; @@ -1436,6 +1436,35 @@ public String toString() { } }; + public static final Builder>> STREAM_READ_RESPONSE_MAP + = new Builder>>() { + @Override + public Map> build(Object data) { + if (data == null) return null; + List list = (List) data; + if (list.isEmpty()) return Collections.emptyMap(); + + if (list.get(0) instanceof KeyValue) { + return ((List) list).stream() + .collect(Collectors.toMap(kv -> STRING.build(kv.getKey()), kv -> STREAM_ENTRY_LIST.build(kv.getValue()))); + } else { + Map> result = new HashMap<>(list.size()); + for (Object anObj : list) { + List streamObj = (List) anObj; + String streamKey = STRING.build(streamObj.get(0)); + List streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1)); + result.put(streamKey, streamEntries); + } + return result; + } + } + + @Override + public String toString() { + return "Map>"; + } + }; + public static final Builder> STREAM_PENDING_ENTRY_LIST = new Builder>() { @Override @SuppressWarnings("unchecked") diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index 1de7affa6a..1c56c4576c 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2666,6 +2666,15 @@ public final CommandObject>>> xread( return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE); } + public final CommandObject>> xreadAsMap( + XReadParams xReadParams, Map streams) { + CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); + Set> entrySet = streams.entrySet(); + entrySet.forEach(entry -> args.key(entry.getKey())); + entrySet.forEach(entry -> args.add(entry.getValue())); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE_MAP); + } + public final CommandObject>>> xreadGroup( String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) { @@ -2678,6 +2687,18 @@ public final CommandObject>>> xreadGrou return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE); } + public final CommandObject>> xreadGroupAsMap( + String groupName, String consumer, XReadGroupParams xReadGroupParams, + Map streams) { + CommandArguments args = commandArguments(XREADGROUP) + .add(GROUP).add(groupName).add(consumer) + .addParams(xReadGroupParams).add(STREAMS); + Set> entrySet = streams.entrySet(); + entrySet.forEach(entry -> args.key(entry.getKey())); + entrySet.forEach(entry -> args.add(entry.getValue())); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE_MAP); + } + public final CommandObject> xread(XReadParams xReadParams, Map.Entry... streams) { CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS); for (Map.Entry entry : streams) { diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index a9f6ceafe7..03a2bc1ed1 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -9394,6 +9394,12 @@ public List>> xread(final XReadParams xReadP return connection.executeCommand(commandObjects.xread(xReadParams, streams)); } + @Override + public Map> xreadAsMap(final XReadParams xReadParams, final Map streams) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xreadAsMap(xReadParams, streams)); + } + @Override public long xack(final String key, final String group, final StreamEntryID... ids) { checkIsInMultiOrPipeline(); @@ -9450,13 +9456,19 @@ public long xtrim(final String key, final XTrimParams params) { } @Override - public List>> xreadGroup(final String groupName, - final String consumer, final XReadGroupParams xReadGroupParams, - final Map streams) { + public List>> xreadGroup(final String groupName, final String consumer, + final XReadGroupParams xReadGroupParams, final Map streams) { checkIsInMultiOrPipeline(); return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } + @Override + public Map> xreadGroupAsMap(final String groupName, final String consumer, + final XReadGroupParams xReadGroupParams, final Map streams) { + checkIsInMultiOrPipeline(); + return connection.executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams)); + } + @Override public StreamPendingSummary xpending(final String key, final String groupName) { checkIsInMultiOrPipeline(); diff --git a/src/main/java/redis/clients/jedis/PipeliningBase.java b/src/main/java/redis/clients/jedis/PipeliningBase.java index b35d1fd423..2dac5af712 100644 --- a/src/main/java/redis/clients/jedis/PipeliningBase.java +++ b/src/main/java/redis/clients/jedis/PipeliningBase.java @@ -1536,11 +1536,21 @@ public Response>>> xread(XReadParams xR return appendCommand(commandObjects.xread(xReadParams, streams)); } + @Override + public Response>> xreadAsMap(XReadParams xReadParams, Map streams) { + return appendCommand(commandObjects.xreadAsMap(xReadParams, streams)); + } + @Override public Response>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) { return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } + @Override + public Response>> xreadGroupAsMap(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) { + return appendCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams)); + } + @Override public Response eval(String script) { return appendCommand(commandObjects.eval(script)); diff --git a/src/main/java/redis/clients/jedis/StreamEntryID.java b/src/main/java/redis/clients/jedis/StreamEntryID.java index 9644010d7c..5bd48393da 100644 --- a/src/main/java/redis/clients/jedis/StreamEntryID.java +++ b/src/main/java/redis/clients/jedis/StreamEntryID.java @@ -80,9 +80,7 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN /** * Should be used only with XADD * - * - * XADD mystream * field1 value1 - * + * {@code XADD mystream * field1 value1} */ public static final StreamEntryID NEW_ENTRY = new StreamEntryID() { @@ -97,11 +95,30 @@ public String toString() { /** * Should be used only with XGROUP CREATE * - * - * XGROUP CREATE mystream consumer-group-name $ - * + * {@code XGROUP CREATE mystream consumer-group-name $} */ - public static final StreamEntryID LAST_ENTRY = new StreamEntryID() { + public static final StreamEntryID XGROUP_LAST_ENTRY = new StreamEntryID() { + + private static final long serialVersionUID = 1L; + + @Override + public String toString() { + return "$"; + } + }; + + /** + * @deprecated Use {@link StreamEntryID#XGROUP_LAST_ID} or {@link StreamEntryID#XREAD_NEW_ENTRY}. + */ + @Deprecated + public static final StreamEntryID LAST_ENTRY = XGROUP_LAST_ENTRY; + + /** + * Should be used only with XREAD + * + * {@code XREAD BLOCK 5000 COUNT 100 STREAMS mystream $} + */ + public static final StreamEntryID XREAD_NEW_ENTRY = new StreamEntryID() { private static final long serialVersionUID = 1L; @@ -129,6 +146,7 @@ public String toString() { /** * Can be used in XRANGE, XREVRANGE and XPENDING commands. */ + // TODO: FIRST_ID ? public static final StreamEntryID MINIMUM_ID = new StreamEntryID() { private static final long serialVersionUID = 1L; @@ -142,6 +160,8 @@ public String toString() { /** * Can be used in XRANGE, XREVRANGE and XPENDING commands. */ + // TODO: LAST_ID ? + // TODO: unify with XREAD_LAST_ENTRY ?? public static final StreamEntryID MAXIMUM_ID = new StreamEntryID() { private static final long serialVersionUID = 1L; @@ -151,4 +171,20 @@ public String toString() { return "+"; } }; + + /** + * Should be used only with XREAD + * + * {@code XREAD STREAMS mystream +} + */ + // TODO: unify with MAXIMUM_ID ?? + public static final StreamEntryID XREAD_LAST_ENTRY = new StreamEntryID() { + + private static final long serialVersionUID = 1L; + + @Override + public String toString() { + return "+"; + } + }; } diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index b8137d3536..42aa67ae74 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -3066,12 +3066,23 @@ public List>> xread(XReadParams xReadParams, return executeCommand(commandObjects.xread(xReadParams, streams)); } + @Override + public Map> xreadAsMap(XReadParams xReadParams, Map streams) { + return executeCommand(commandObjects.xreadAsMap(xReadParams, streams)); + } + @Override public List>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams) { return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams)); } + @Override + public Map> xreadGroupAsMap(String groupName, String consumer, + XReadGroupParams xReadGroupParams, Map streams) { + return executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams)); + } + @Override public byte[] xadd(byte[] key, XAddParams params, Map hash) { return executeCommand(commandObjects.xadd(key, params, hash)); diff --git a/src/main/java/redis/clients/jedis/commands/StreamCommands.java b/src/main/java/redis/clients/jedis/commands/StreamCommands.java index 9c34cbc6a6..163e11050e 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamCommands.java @@ -250,7 +250,19 @@ List>> xread(XReadParams xReadParams, /** * XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] */ + Map> xreadAsMap(XReadParams xReadParams, + Map streams); + + /** + * XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] + */ List>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams); + /** + * XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] + */ + Map> xreadGroupAsMap(String groupName, String consumer, + XReadGroupParams xReadGroupParams, Map streams); + } diff --git a/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java b/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java index e435c02341..d4bda0fb98 100644 --- a/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java +++ b/src/main/java/redis/clients/jedis/commands/StreamPipelineCommands.java @@ -243,7 +243,19 @@ Response>>> xread(XReadParams xReadPara /** * XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] */ + Response>> xreadAsMap(XReadParams xReadParams, + Map streams); + + /** + * XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] + */ Response>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map streams); + /** + * XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] + */ + Response>> xreadGroupAsMap(String groupName, String consumer, + XReadGroupParams xReadGroupParams, Map streams); + } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index acadc0b75d..d0af32edc5 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -1,6 +1,8 @@ package redis.clients.jedis.commands.jedis; +import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -15,7 +17,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicReference; -import org.hamcrest.MatcherAssert; + import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; @@ -273,7 +275,10 @@ public void xrangeExclusive() { @Test public void xreadWithParams() { - Map streamQeury1 = singletonMap("xread-stream1", new StreamEntryID()); + final String key1 = "xread-stream1"; + final String key2 = "xread-stream2"; + + Map streamQeury1 = singletonMap(key1, new StreamEntryID()); // Before creating Stream assertNull(jedis.xread(XReadParams.xReadParams().block(1), streamQeury1)); @@ -281,28 +286,78 @@ public void xreadWithParams() { Map map = new HashMap<>(); map.put("f1", "v1"); - StreamEntryID id1 = jedis.xadd("xread-stream1", (StreamEntryID) null, map); - StreamEntryID id2 = jedis.xadd("xread-stream2", (StreamEntryID) null, map); + StreamEntryID id1 = jedis.xadd(key1, (StreamEntryID) null, map); + StreamEntryID id2 = jedis.xadd(key2, (StreamEntryID) null, map); // Read only a single Stream List>> streams1 = jedis.xread(XReadParams.xReadParams().count(1).block(1), streamQeury1); assertEquals(1, streams1.size()); - assertEquals("xread-stream1", streams1.get(0).getKey()); + assertEquals(key1, streams1.get(0).getKey()); assertEquals(1, streams1.get(0).getValue().size()); assertEquals(id1, streams1.get(0).getValue().get(0).getID()); assertEquals(map, streams1.get(0).getValue().get(0).getFields()); - assertNull(jedis.xread(XReadParams.xReadParams().block(1), singletonMap("xread-stream1", id1))); - assertNull(jedis.xread(XReadParams.xReadParams(), singletonMap("xread-stream1", id1))); + assertNull(jedis.xread(XReadParams.xReadParams().block(1), singletonMap(key1, id1))); + assertNull(jedis.xread(XReadParams.xReadParams(), singletonMap(key1, id1))); // Read from two Streams - Map streamQuery23 = new LinkedHashMap<>(); - streamQuery23.put("xread-stream1", new StreamEntryID()); - streamQuery23.put("xread-stream2", new StreamEntryID()); - List>> streams2 = jedis.xread(XReadParams.xReadParams().count(2).block(1), streamQuery23); + Map streamQuery2 = new LinkedHashMap<>(); + streamQuery2.put(key1, new StreamEntryID()); + streamQuery2.put(key2, new StreamEntryID()); + List>> streams2 = jedis.xread(XReadParams.xReadParams().count(2).block(1), streamQuery2); assertEquals(2, streams2.size()); } + @Test + public void xreadAsMap() { + + final String key1 = "xread-stream1"; + final String key2 = "xread-stream2"; + + Map streamQeury1 = singletonMap(key1, new StreamEntryID()); + + // Before creating Stream + assertNull(jedis.xreadAsMap(XReadParams.xReadParams().block(1), streamQeury1)); + assertNull(jedis.xreadAsMap(XReadParams.xReadParams(), streamQeury1)); + + Map map = new HashMap<>(); + map.put("f1", "v1"); + StreamEntryID id1 = new StreamEntryID(1); + StreamEntryID id2 = new StreamEntryID(2); + StreamEntryID id3 = new StreamEntryID(3); + + assertEquals(id1, jedis.xadd(key1, id1, map)); + assertEquals(id2, jedis.xadd(key2, id2, map)); + assertEquals(id3, jedis.xadd(key1, id3, map)); + + // Read only a single Stream + Map> streams1 = jedis.xreadAsMap(XReadParams.xReadParams().count(2), streamQeury1); + assertEquals(singleton(key1), streams1.keySet()); + List list1 = streams1.get(key1); + assertEquals(2, list1.size()); + assertEquals(id1, list1.get(0).getID()); + assertEquals(map, list1.get(0).getFields()); + assertEquals(id3, list1.get(1).getID()); + assertEquals(map, list1.get(1).getFields()); + + // Read from two Streams + Map streamQuery2 = new LinkedHashMap<>(); + streamQuery2.put(key1, new StreamEntryID()); + streamQuery2.put(key2, new StreamEntryID()); + Map> streams2 = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQuery2); + assertEquals(2, streams2.size()); + assertEquals(id1, streams2.get(key1).get(0).getID()); + assertEquals(id2, streams2.get(key2).get(0).getID()); + + // Read from last entry + Map streamQueryLE = singletonMap(key1, StreamEntryID.XREAD_LAST_ENTRY); + Map> streamsLE = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQueryLE); + assertEquals(singleton(key1), streamsLE.keySet()); + assertEquals(1, streamsLE.get(key1).size()); + assertEquals(id3, streamsLE.get(key1).get(0).getID()); + assertEquals(map, streamsLE.get(key1).get(0).getFields()); + } + @Test public void xreadBlockZero() throws InterruptedException { final AtomicReference readId = new AtomicReference<>(); @@ -411,13 +466,13 @@ public void xrevrangeExclusive() { @Test public void xgroup() { - Map map = new HashMap(); + Map map = new HashMap<>(); map.put("f1", "v1"); StreamEntryID id1 = jedis.xadd("xgroup-stream", (StreamEntryID) null, map); assertEquals("OK", jedis.xgroupCreate("xgroup-stream", "consumer-group-name", null, false)); assertEquals("OK", jedis.xgroupSetID("xgroup-stream", "consumer-group-name", id1)); - assertEquals("OK", jedis.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.LAST_ENTRY, false)); + assertEquals("OK", jedis.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.XGROUP_LAST_ENTRY, false)); jedis.xgroupDestroy("xgroup-stream", "consumer-group-name"); assertEquals(0L, jedis.xgroupDelConsumer("xgroup-stream", "consumer-group-name1","myconsumer1")); @@ -451,11 +506,11 @@ public void xreadGroupWithParams() { assertEquals(1, streams1.get(0).getValue().size()); // Read from two Streams - Map streamQuery23 = new LinkedHashMap<>(); - streamQuery23.put("xreadGroup-stream1", new StreamEntryID()); - streamQuery23.put("xreadGroup-stream2", new StreamEntryID()); + Map streamQuery2 = new LinkedHashMap<>(); + streamQuery2.put("xreadGroup-stream1", new StreamEntryID()); + streamQuery2.put("xreadGroup-stream2", new StreamEntryID()); List>> streams2 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", - XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQuery23); + XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQuery2); assertEquals(2, streams2.size()); // Read only fresh messages @@ -802,7 +857,7 @@ public void xinfo() throws InterruptedException { StreamInfo streamInfo = jedis.xinfoStream(STREAM_NAME); assertNotNull(id2); - jedis.xgroupCreate(STREAM_NAME, G1, StreamEntryID.LAST_ENTRY, false); + jedis.xgroupCreate(STREAM_NAME, G1, StreamEntryID.XGROUP_LAST_ENTRY, false); Map streamQeury11 = singletonMap( STREAM_NAME, new StreamEntryID("0-0")); jedis.xreadGroup(G1, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQeury11); @@ -854,8 +909,8 @@ public void xinfo() throws InterruptedException { // Using getters assertEquals(MY_CONSUMER, consumersInfo.get(0).getName()); assertEquals(0L, consumersInfo.get(0).getPending()); - MatcherAssert.assertThat(consumersInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); - MatcherAssert.assertThat(consumersInfo.get(0).getInactive(), Matchers.any(Long.class)); + assertThat(consumersInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumersInfo.get(0).getInactive(), Matchers.any(Long.class)); // Consumer info test assertEquals(MY_CONSUMER, @@ -866,11 +921,11 @@ public void xinfo() throws InterruptedException { // Using getters assertEquals(MY_CONSUMER, consumerInfo.get(0).getName()); assertEquals(0L, consumerInfo.get(0).getPending()); - MatcherAssert.assertThat(consumerInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); - MatcherAssert.assertThat(consumerInfo.get(0).getInactive(), Matchers.any(Long.class)); + assertThat(consumerInfo.get(0).getIdle(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumerInfo.get(0).getInactive(), Matchers.any(Long.class)); // test with more groups and consumers - jedis.xgroupCreate(STREAM_NAME, G2, StreamEntryID.LAST_ENTRY, false); + jedis.xgroupCreate(STREAM_NAME, G2, StreamEntryID.XGROUP_LAST_ENTRY, false); jedis.xreadGroup(G1, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQeury11); jedis.xreadGroup(G2, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQeury11); jedis.xreadGroup(G2, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQeury11); @@ -940,8 +995,8 @@ public void xinfoStreamFullWithPending() { assertEquals(1, group.getConsumers().size()); StreamConsumerFullInfo consumer = group.getConsumers().get(0); assertEquals("xreadGroup-consumer", consumer.getName()); - MatcherAssert.assertThat(consumer.getSeenTime(), Matchers.greaterThanOrEqualTo(0L)); - MatcherAssert.assertThat(consumer.getActiveTime(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumer.getSeenTime(), Matchers.greaterThanOrEqualTo(0L)); + assertThat(consumer.getActiveTime(), Matchers.greaterThanOrEqualTo(0L)); assertEquals(1, consumer.getPending().size()); List consumerPendingEntry = consumer.getPending().get(0); assertEquals(id1, consumerPendingEntry.get(0)); diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java index 119f233198..ada186e8ed 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java @@ -396,7 +396,11 @@ public void xrangeExclusive() { @Test public void xreadWithParams() { - Map streamQuery1 = singletonMap("xread-stream1", new StreamEntryID()); + + final String key1 = "xread-stream1"; + final String key2 = "xread-stream2"; + + Map streamQuery1 = singletonMap(key1, new StreamEntryID()); // Before creating Stream pipe.xread(XReadParams.xReadParams().block(1), streamQuery1); @@ -408,25 +412,25 @@ public void xreadWithParams() { )); Map map1 = singletonMap("f1", "v1"); - StreamEntryID id1 = jedis.xadd("xread-stream1", (StreamEntryID) null, map1); + StreamEntryID id1 = jedis.xadd(key1, (StreamEntryID) null, map1); Map map2 = singletonMap("f2", "v2"); - StreamEntryID id2 = jedis.xadd("xread-stream2", (StreamEntryID) null, map2); + StreamEntryID id2 = jedis.xadd(key2, (StreamEntryID) null, map2); // Read only a single Stream Response>>> streams1 = pipe.xread(XReadParams.xReadParams().count(1).block(1), streamQuery1); Response>>> streams2 = - pipe.xread(XReadParams.xReadParams().block(1), singletonMap("xread-stream1", id1)); + pipe.xread(XReadParams.xReadParams().block(1), singletonMap(key1, id1)); Response>>> streams3 = - pipe.xread(XReadParams.xReadParams(), singletonMap("xread-stream1", id1)); + pipe.xread(XReadParams.xReadParams(), singletonMap(key1, id1)); pipe.sync(); assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()), - contains("xread-stream1")); + contains(key1)); assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream) .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1)); @@ -440,8 +444,8 @@ public void xreadWithParams() { // Read from two Streams Map streamQuery2 = new LinkedHashMap<>(); - streamQuery2.put("xread-stream1", new StreamEntryID()); - streamQuery2.put("xread-stream2", new StreamEntryID()); + streamQuery2.put(key1, new StreamEntryID()); + streamQuery2.put(key2, new StreamEntryID()); Response>>> streams4 = pipe.xread(XReadParams.xReadParams().count(2).block(1), streamQuery2); @@ -449,7 +453,7 @@ public void xreadWithParams() { pipe.sync(); assertThat(streams4.get().stream().map(Entry::getKey).collect(Collectors.toList()), - contains("xread-stream1", "xread-stream2")); + contains(key1, key2)); assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream) .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2)); @@ -618,7 +622,7 @@ public void xgroup() { pipe.xgroupCreate("xgroup-stream", "consumer-group-name", null, false); pipe.xgroupSetID("xgroup-stream", "consumer-group-name", id1); - pipe.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.LAST_ENTRY, false); + pipe.xgroupCreate("xgroup-stream", "consumer-group-name1", StreamEntryID.XGROUP_LAST_ENTRY, false); pipe.xgroupDestroy("xgroup-stream", "consumer-group-name"); pipe.xgroupDelConsumer("xgroup-stream", "consumer-group-name1", "myconsumer1"); @@ -1140,7 +1144,7 @@ public void xinfo() throws InterruptedException { Response streamInfoResponse = pipe.xinfoStream(STREAM_NAME); - pipe.xgroupCreate(STREAM_NAME, G1, StreamEntryID.LAST_ENTRY, false); + pipe.xgroupCreate(STREAM_NAME, G1, StreamEntryID.XGROUP_LAST_ENTRY, false); Map streamQuery1 = singletonMap(STREAM_NAME, new StreamEntryID("0-0")); @@ -1221,7 +1225,7 @@ public void xinfo() throws InterruptedException { assertThat(consumerInfo.get(0).getInactive(), Matchers.any(Long.class)); // test with more groups and consumers - pipe.xgroupCreate(STREAM_NAME, G2, StreamEntryID.LAST_ENTRY, false); + pipe.xgroupCreate(STREAM_NAME, G2, StreamEntryID.XGROUP_LAST_ENTRY, false); pipe.xreadGroup(G1, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQuery1); pipe.xreadGroup(G2, MY_CONSUMER, XReadGroupParams.xReadGroupParams().count(1), streamQuery1); pipe.xreadGroup(G2, MY_CONSUMER2, XReadGroupParams.xReadGroupParams().count(1), streamQuery1); diff --git a/src/test/java/redis/clients/jedis/params/XPendingParamsTest.java b/src/test/java/redis/clients/jedis/params/XPendingParamsTest.java index b6698d875f..c7c95da424 100644 --- a/src/test/java/redis/clients/jedis/params/XPendingParamsTest.java +++ b/src/test/java/redis/clients/jedis/params/XPendingParamsTest.java @@ -24,7 +24,7 @@ public void checkHashCodeIdenticalParams() { @Test public void checkEqualsVariousParams() { XPendingParams firstParam = getDefaultValue(); - firstParam.start(StreamEntryID.LAST_ENTRY); + firstParam.start(StreamEntryID.XGROUP_LAST_ENTRY); XPendingParams secondParam = getDefaultValue(); secondParam.start(StreamEntryID.NEW_ENTRY); assertFalse(firstParam.equals(secondParam)); @@ -33,7 +33,7 @@ public void checkEqualsVariousParams() { @Test public void checkHashCodeVariousParams() { XPendingParams firstParam = getDefaultValue(); - firstParam.start(StreamEntryID.LAST_ENTRY); + firstParam.start(StreamEntryID.XGROUP_LAST_ENTRY); XPendingParams secondParam = getDefaultValue(); secondParam.start(StreamEntryID.NEW_ENTRY); assertNotEquals(firstParam.hashCode(), secondParam.hashCode()); From 6e21552fb673e6905b32f6543a225c8b172d2a07 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Tue, 26 Mar 2024 22:48:21 +0600 Subject: [PATCH 2/5] Rename builder --- src/main/java/redis/clients/jedis/BuilderFactory.java | 2 +- src/main/java/redis/clients/jedis/CommandObjects.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 2f9f5aa56c..83e552de83 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1436,7 +1436,7 @@ public String toString() { } }; - public static final Builder>> STREAM_READ_RESPONSE_MAP + public static final Builder>> STREAM_READ_MAP_RESPONSE = new Builder>>() { @Override public Map> build(Object data) { diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index 1c56c4576c..139fa78ae8 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2672,7 +2672,7 @@ public final CommandObject>> xreadAsMap( Set> entrySet = streams.entrySet(); entrySet.forEach(entry -> args.key(entry.getKey())); entrySet.forEach(entry -> args.add(entry.getValue())); - return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE_MAP); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE); } public final CommandObject>>> xreadGroup( @@ -2696,7 +2696,7 @@ public final CommandObject>> xreadGroupAsMap( Set> entrySet = streams.entrySet(); entrySet.forEach(entry -> args.key(entry.getKey())); entrySet.forEach(entry -> args.add(entry.getValue())); - return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE_MAP); + return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE); } public final CommandObject> xread(XReadParams xReadParams, Map.Entry... streams) { From 3d233dfecd3762a1446cef6cc8ef0d2ded70eb76 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Tue, 26 Mar 2024 22:51:10 +0600 Subject: [PATCH 3/5] Fix javadoc reference --- src/main/java/redis/clients/jedis/StreamEntryID.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/StreamEntryID.java b/src/main/java/redis/clients/jedis/StreamEntryID.java index 5bd48393da..fcab14d760 100644 --- a/src/main/java/redis/clients/jedis/StreamEntryID.java +++ b/src/main/java/redis/clients/jedis/StreamEntryID.java @@ -108,7 +108,8 @@ public String toString() { }; /** - * @deprecated Use {@link StreamEntryID#XGROUP_LAST_ID} or {@link StreamEntryID#XREAD_NEW_ENTRY}. + * @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XREADGROUP command or + * {@link StreamEntryID#XREAD_NEW_ENTRY} for XREAD command. */ @Deprecated public static final StreamEntryID LAST_ENTRY = XGROUP_LAST_ENTRY; From 9140c3d26bff6da3d0cd6a6608d7e5b65c10e5f7 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 27 Mar 2024 13:09:35 +0600 Subject: [PATCH 4/5] more stream id changes --- .../redis/clients/jedis/StreamEntryID.java | 16 +++++----- .../commands/jedis/StreamsCommandsTest.java | 31 +++++++++---------- .../pipeline/StreamsPipelineCommandsTest.java | 30 +++++++++--------- 3 files changed, 38 insertions(+), 39 deletions(-) diff --git a/src/main/java/redis/clients/jedis/StreamEntryID.java b/src/main/java/redis/clients/jedis/StreamEntryID.java index fcab14d760..5381638b5a 100644 --- a/src/main/java/redis/clients/jedis/StreamEntryID.java +++ b/src/main/java/redis/clients/jedis/StreamEntryID.java @@ -108,7 +108,7 @@ public String toString() { }; /** - * @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XREADGROUP command or + * @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XGROUP CREATE command or * {@link StreamEntryID#XREAD_NEW_ENTRY} for XREAD command. */ @Deprecated @@ -132,9 +132,9 @@ public String toString() { /** * Should be used only with XREADGROUP *

- * {@code XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >} + * {@code XREADGROUP GROUP mygroup myconsumer STREAMS mystream >} */ - public static final StreamEntryID UNRECEIVED_ENTRY = new StreamEntryID() { + public static final StreamEntryID XREADGROUP_UNDELIVERED_ENTRY = new StreamEntryID() { private static final long serialVersionUID = 1L; @@ -144,10 +144,15 @@ public String toString() { } }; + /** + * @deprecated Use {@link StreamEntryID#XREADGROUP_UNDELIVERED_ENTRY}. + */ + @Deprecated + public static final StreamEntryID UNRECEIVED_ENTRY = XREADGROUP_UNDELIVERED_ENTRY; + /** * Can be used in XRANGE, XREVRANGE and XPENDING commands. */ - // TODO: FIRST_ID ? public static final StreamEntryID MINIMUM_ID = new StreamEntryID() { private static final long serialVersionUID = 1L; @@ -161,8 +166,6 @@ public String toString() { /** * Can be used in XRANGE, XREVRANGE and XPENDING commands. */ - // TODO: LAST_ID ? - // TODO: unify with XREAD_LAST_ENTRY ?? public static final StreamEntryID MAXIMUM_ID = new StreamEntryID() { private static final long serialVersionUID = 1L; @@ -178,7 +181,6 @@ public String toString() { * * {@code XREAD STREAMS mystream +} */ - // TODO: unify with MAXIMUM_ID ?? public static final StreamEntryID XREAD_LAST_ENTRY = new StreamEntryID() { private static final long serialVersionUID = 1L; diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index d0af32edc5..8641db7e8e 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -488,7 +488,7 @@ public void xreadGroupWithParams() { map.put("f1", "v1"); jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map); jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false); - Map streamQeury1 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQeury1); assertEquals(1, range.size()); @@ -499,7 +499,7 @@ public void xreadGroupWithParams() { jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false); // Read only a single Stream - Map streamQeury11 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury11 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury11); assertEquals(1, streams1.size()); @@ -515,7 +515,7 @@ public void xreadGroupWithParams() { // Read only fresh messages StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map); - Map streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> streams3 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQeuryFresh); assertEquals(1, streams3.size()); @@ -536,7 +536,7 @@ public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() { jedis.xadd("xreadGroup-discard-stream1", xAddParams, map2); jedis.xgroupCreate("xreadGroup-discard-stream1", "xreadGroup-group", null, false); - Map streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1), streamQuery1); assertEquals(1, range.size()); @@ -569,8 +569,7 @@ public void xack() { jedis.xgroupCreate("xack-stream", "xack-group", null, false); - Map streamQeury1 = singletonMap( - "xack-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("xack-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); // Empty Stream List>> range = jedis.xreadGroup("xack-group", "xack-consumer", @@ -589,8 +588,7 @@ public void xpendingWithParams() { assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false)); - Map streamQeury1 = singletonMap( - "xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); // Read the event from Stream put it on pending List>> range = jedis.xreadGroup("xpendeing-group", @@ -630,8 +628,7 @@ public void xpendingRange() { jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false); // read 1 message from the group with each consumer - Map streamQeury = singletonMap( - "xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury); jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury); @@ -662,7 +659,7 @@ public void xclaimWithParams() { // Read the event from Stream put it on pending jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", @@ -693,7 +690,7 @@ public void xclaimJustId() { // Read the event from Stream put it on pending jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", @@ -722,7 +719,7 @@ public void xautoclaim() { // Read the event from Stream put it on pending jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpending-stream", "xpending-group", @@ -752,7 +749,7 @@ public void xautoclaimBinary() { // Read the event from Stream put it on pending jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpending-stream", "xpending-group", @@ -784,7 +781,7 @@ public void xautoclaimJustId() { // Read the event from Stream put it on pending jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpending-stream", "xpending-group", @@ -814,7 +811,7 @@ public void xautoclaimJustIdBinary() { // Read the event from Stream put it on pending jedis.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event List pendingRange = jedis.xpending("xpending-stream", "xpending-group", @@ -976,7 +973,7 @@ public void xinfoStreamFullWithPending() { StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map); jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false); - Map streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("streamfull2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1), streamQeury1); assertEquals(1, range.size()); diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java index ada186e8ed..f2e77663d1 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java @@ -648,7 +648,7 @@ public void xreadGroupWithParams() { jedis.xgroupCreate("xreadGroup-stream1", "xreadGroup-group", null, false); - Map streamQuery1 = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQuery1 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); Response>>> streams1 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", @@ -679,8 +679,8 @@ public void xreadGroupWithParams() { // Read from two Streams Map streamQuery2 = new LinkedHashMap<>(); - streamQuery2.put("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); - streamQuery2.put("xreadGroup-stream2", StreamEntryID.UNRECEIVED_ENTRY); + streamQuery2.put("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + streamQuery2.put("xreadGroup-stream2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); Response>>> streams3 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1).noAck(), streamQuery2); @@ -709,7 +709,7 @@ public void xreadGroupWithParams() { Map map4 = singletonMap("f4", "v4"); StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map4); - Map streamQueryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQueryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); Response>>> streams4 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQueryFresh); @@ -737,7 +737,7 @@ public void xreadGroupWithParamsWhenPendingMessageIsDiscarded() { pipe.xgroupCreate("xreadGroup-discard-stream1", "xreadGroup-group", null, false); - Map streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQuery1 = singletonMap("xreadGroup-discard-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); Response>>> streams1 = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", @@ -781,7 +781,7 @@ public void xack() { pipe.xgroupCreate("xack-stream", "xack-group", null, false); - Map streamQuery1 = singletonMap("xack-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQuery1 = singletonMap("xack-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); // Empty Stream Response>>> streams1 = @@ -809,7 +809,7 @@ public void xpendingWithParams() { assertEquals("OK", jedis.xgroupCreate("xpending-stream", "xpending-group", null, false)); - Map streamQeury1 = singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); // Read the event from Stream put it on pending Response>>> range = pipe.xreadGroup("xpending-group", @@ -861,7 +861,7 @@ public void xpendingRange() { pipe.xgroupCreate("xpending-stream", "xpending-group", null, false); // read 1 message from the group with each consumer - Map streamQeury = singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury = singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); pipe.xreadGroup("xpending-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury); pipe.xreadGroup("xpending-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury); @@ -896,7 +896,7 @@ public void xclaimWithParams() throws InterruptedException { // Read the event from Stream put it on pending pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event Response> pending = @@ -938,7 +938,7 @@ public void xclaimJustId() throws InterruptedException { // Read the event from Stream put it on pending pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event Response> pending = @@ -976,7 +976,7 @@ public void xautoclaim() throws InterruptedException { // Read the event from Stream put it on pending pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event Response> pending = pipe.xpending("xpending-stream", "xpending-group", @@ -1016,7 +1016,7 @@ public void xautoclaimBinary() throws InterruptedException { // Read the event from Stream put it on pending pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event Response> pending = pipe.xpending("xpending-stream", "xpending-group", @@ -1059,7 +1059,7 @@ public void xautoclaimJustId() throws InterruptedException { // Read the event from Stream put it on pending pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event Response> pending = pipe.xpending("xpending-stream", "xpending-group", @@ -1095,7 +1095,7 @@ public void xautoclaimJustIdBinary() throws InterruptedException { // Read the event from Stream put it on pending pipe.xreadGroup("xpending-group", "xpending-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpending-stream", StreamEntryID.UNRECEIVED_ENTRY)); + singletonMap("xpending-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event Response> pending = pipe.xpending("xpending-stream", "xpending-group", @@ -1281,7 +1281,7 @@ public void xinfoStreamFullWithPending() { StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map); jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false); - Map streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY); + Map streamQeury1 = singletonMap("streamfull2", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); Response>>> pending = pipe.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(1), streamQeury1); From 7e54283db0eea6b7d55c10485de666d277ab3b65 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 27 Mar 2024 17:40:49 +0600 Subject: [PATCH 5/5] xreadgroup command test --- .../commands/jedis/StreamsCommandsTest.java | 61 ++++++++++++------- .../pipeline/StreamsPipelineCommandsTest.java | 22 +++---- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index 8641db7e8e..552c8ac1f6 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -311,10 +311,10 @@ public void xreadWithParams() { @Test public void xreadAsMap() { - final String key1 = "xread-stream1"; - final String key2 = "xread-stream2"; + final String stream1 = "xread-stream1"; + final String stream2 = "xread-stream2"; - Map streamQeury1 = singletonMap(key1, new StreamEntryID()); + Map streamQeury1 = singletonMap(stream1, new StreamEntryID()); // Before creating Stream assertNull(jedis.xreadAsMap(XReadParams.xReadParams().block(1), streamQeury1)); @@ -326,14 +326,14 @@ public void xreadAsMap() { StreamEntryID id2 = new StreamEntryID(2); StreamEntryID id3 = new StreamEntryID(3); - assertEquals(id1, jedis.xadd(key1, id1, map)); - assertEquals(id2, jedis.xadd(key2, id2, map)); - assertEquals(id3, jedis.xadd(key1, id3, map)); + assertEquals(id1, jedis.xadd(stream1, id1, map)); + assertEquals(id2, jedis.xadd(stream2, id2, map)); + assertEquals(id3, jedis.xadd(stream1, id3, map)); // Read only a single Stream Map> streams1 = jedis.xreadAsMap(XReadParams.xReadParams().count(2), streamQeury1); - assertEquals(singleton(key1), streams1.keySet()); - List list1 = streams1.get(key1); + assertEquals(singleton(stream1), streams1.keySet()); + List list1 = streams1.get(stream1); assertEquals(2, list1.size()); assertEquals(id1, list1.get(0).getID()); assertEquals(map, list1.get(0).getFields()); @@ -342,20 +342,20 @@ public void xreadAsMap() { // Read from two Streams Map streamQuery2 = new LinkedHashMap<>(); - streamQuery2.put(key1, new StreamEntryID()); - streamQuery2.put(key2, new StreamEntryID()); + streamQuery2.put(stream1, new StreamEntryID()); + streamQuery2.put(stream2, new StreamEntryID()); Map> streams2 = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQuery2); assertEquals(2, streams2.size()); - assertEquals(id1, streams2.get(key1).get(0).getID()); - assertEquals(id2, streams2.get(key2).get(0).getID()); + assertEquals(id1, streams2.get(stream1).get(0).getID()); + assertEquals(id2, streams2.get(stream2).get(0).getID()); // Read from last entry - Map streamQueryLE = singletonMap(key1, StreamEntryID.XREAD_LAST_ENTRY); + Map streamQueryLE = singletonMap(stream1, StreamEntryID.XREAD_LAST_ENTRY); Map> streamsLE = jedis.xreadAsMap(XReadParams.xReadParams().count(1), streamQueryLE); - assertEquals(singleton(key1), streamsLE.keySet()); - assertEquals(1, streamsLE.get(key1).size()); - assertEquals(id3, streamsLE.get(key1).get(0).getID()); - assertEquals(map, streamsLE.get(key1).get(0).getFields()); + assertEquals(singleton(stream1), streamsLE.keySet()); + assertEquals(1, streamsLE.get(stream1).size()); + assertEquals(id3, streamsLE.get(stream1).get(0).getID()); + assertEquals(map, streamsLE.get(stream1).get(0).getFields()); } @Test @@ -499,9 +499,8 @@ public void xreadGroupWithParams() { jedis.xgroupCreate("xreadGroup-stream2", "xreadGroup-group", null, false); // Read only a single Stream - Map streamQeury11 = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); List>> streams1 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", - XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury11); + XReadGroupParams.xReadGroupParams().count(1).block(1).noAck(), streamQeury1); assertEquals(1, streams1.size()); assertEquals(1, streams1.get(0).getValue().size()); @@ -516,10 +515,28 @@ public void xreadGroupWithParams() { // Read only fresh messages StreamEntryID id4 = jedis.xadd("xreadGroup-stream1", (StreamEntryID) null, map); Map streamQeuryFresh = singletonMap("xreadGroup-stream1", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); - List>> streams3 = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", + List>> streamsFresh = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer", XReadGroupParams.xReadGroupParams().count(4).block(100).noAck(), streamQeuryFresh); - assertEquals(1, streams3.size()); - assertEquals(id4, streams3.get(0).getValue().get(0).getID()); + assertEquals(1, streamsFresh.size()); + assertEquals(id4, streamsFresh.get(0).getValue().get(0).getID()); + } + + @Test + public void xreadGroupAsMap() { + + final String stream1 = "xreadGroup-stream1"; + Map map = singletonMap("f1", "v1"); + + StreamEntryID id1 = jedis.xadd(stream1, StreamEntryID.NEW_ENTRY, map); + jedis.xgroupCreate(stream1, "xreadGroup-group", null, false); + Map streamQeury1 = singletonMap(stream1, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + Map> range = jedis.xreadGroupAsMap("xreadGroup-group", "xreadGroup-consumer", + XReadGroupParams.xReadGroupParams().noAck(), streamQeury1); + assertEquals(singleton(stream1), range.keySet()); + List list = range.get(stream1); + assertEquals(1, list.size()); + assertEquals(id1, list.get(0).getID()); + assertEquals(map, list.get(0).getFields()); } @Test diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java index f2e77663d1..af64b17576 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/StreamsPipelineCommandsTest.java @@ -397,10 +397,10 @@ public void xrangeExclusive() { @Test public void xreadWithParams() { - final String key1 = "xread-stream1"; - final String key2 = "xread-stream2"; + final String stream1 = "xread-stream1"; + final String stream2 = "xread-stream2"; - Map streamQuery1 = singletonMap(key1, new StreamEntryID()); + Map streamQuery1 = singletonMap(stream1, new StreamEntryID()); // Before creating Stream pipe.xread(XReadParams.xReadParams().block(1), streamQuery1); @@ -412,25 +412,25 @@ public void xreadWithParams() { )); Map map1 = singletonMap("f1", "v1"); - StreamEntryID id1 = jedis.xadd(key1, (StreamEntryID) null, map1); + StreamEntryID id1 = jedis.xadd(stream1, (StreamEntryID) null, map1); Map map2 = singletonMap("f2", "v2"); - StreamEntryID id2 = jedis.xadd(key2, (StreamEntryID) null, map2); + StreamEntryID id2 = jedis.xadd(stream2, (StreamEntryID) null, map2); // Read only a single Stream Response>>> streams1 = pipe.xread(XReadParams.xReadParams().count(1).block(1), streamQuery1); Response>>> streams2 = - pipe.xread(XReadParams.xReadParams().block(1), singletonMap(key1, id1)); + pipe.xread(XReadParams.xReadParams().block(1), singletonMap(stream1, id1)); Response>>> streams3 = - pipe.xread(XReadParams.xReadParams(), singletonMap(key1, id1)); + pipe.xread(XReadParams.xReadParams(), singletonMap(stream1, id1)); pipe.sync(); assertThat(streams1.get().stream().map(Entry::getKey).collect(Collectors.toList()), - contains(key1)); + contains(stream1)); assertThat(streams1.get().stream().map(Entry::getValue).flatMap(List::stream) .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1)); @@ -444,8 +444,8 @@ public void xreadWithParams() { // Read from two Streams Map streamQuery2 = new LinkedHashMap<>(); - streamQuery2.put(key1, new StreamEntryID()); - streamQuery2.put(key2, new StreamEntryID()); + streamQuery2.put(stream1, new StreamEntryID()); + streamQuery2.put(stream2, new StreamEntryID()); Response>>> streams4 = pipe.xread(XReadParams.xReadParams().count(2).block(1), streamQuery2); @@ -453,7 +453,7 @@ public void xreadWithParams() { pipe.sync(); assertThat(streams4.get().stream().map(Entry::getKey).collect(Collectors.toList()), - contains(key1, key2)); + contains(stream1, stream2)); assertThat(streams4.get().stream().map(Entry::getValue).flatMap(List::stream) .map(StreamEntry::getID).collect(Collectors.toList()), contains(id1, id2));