Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PENDING entries of xinfoStreamFull method #2988

Merged
merged 1 commit into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ private Map<String, Builder> createDecoderMap() {
tempMappingFunctions.put(StreamConsumerFullInfo.NAME, STRING);
tempMappingFunctions.put(StreamConsumerFullInfo.SEEN_TIME, LONG);
tempMappingFunctions.put(StreamConsumerFullInfo.PEL_COUNT, LONG);
tempMappingFunctions.put(StreamConsumerFullInfo.PENDING, LONG_LIST);
tempMappingFunctions.put(StreamConsumerFullInfo.PENDING, ENCODED_OBJECT_LIST);

return tempMappingFunctions;
}
Expand Down Expand Up @@ -1354,7 +1354,7 @@ private Map<String, Builder> createDecoderMap() {
Map<String, Builder> tempMappingFunctions = new HashMap<>();
tempMappingFunctions.put(StreamGroupFullInfo.NAME, STRING);
tempMappingFunctions.put(StreamGroupFullInfo.CONSUMERS, STREAM_CONSUMER_FULL_INFO_LIST);
tempMappingFunctions.put(StreamGroupFullInfo.PENDING, STRING_LIST);
tempMappingFunctions.put(StreamGroupFullInfo.PENDING, ENCODED_OBJECT_LIST);
tempMappingFunctions.put(StreamGroupFullInfo.LAST_DELIVERED, STREAM_ENTRY_ID);
tempMappingFunctions.put(StreamGroupFullInfo.PEL_COUNT, LONG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.StreamEntryID;

/**
* This class holds information about a stream consumer with command <code>xinfo stream mystream full<code/>.
Expand All @@ -19,16 +20,18 @@ public class StreamConsumerFullInfo implements Serializable {
private final String name;
private final Long seenTime;
private final Long pelCount;
private final List<Long> pending;
private final List<List<Object>> pending;
private final Map<String, Object> consumerInfo;

@SuppressWarnings("unchecked")
public StreamConsumerFullInfo(Map<String, Object> map) {
consumerInfo = map;
name = (String) map.get(NAME);
seenTime = (Long) map.get(SEEN_TIME);
pending = (List<Long>) map.get(PENDING);
pending = (List<List<Object>>) map.get(PENDING);
pelCount = (Long) map.get(PEL_COUNT);

pending.stream().forEach(entry -> entry.set(0, new StreamEntryID((String) entry.get(0))));
}

public String getName() {
Expand All @@ -43,7 +46,7 @@ public Long getPelCount() {
return pelCount;
}

public List<Long> getPending() {
public List<List<Object>> getPending() {
return pending;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class StreamGroupFullInfo implements Serializable {

private final String name;
private final List<StreamConsumerFullInfo> consumers;
private final List<String> pending;
private final List<List<Object>> pending;
private final Long pelCount;
private final StreamEntryID lastDeliveredId;
private final Map<String, Object> groupFullInfo;
Expand All @@ -35,10 +35,11 @@ public StreamGroupFullInfo(Map<String, Object> map) {
groupFullInfo = map;
name = (String) map.get(NAME);
consumers = (List<StreamConsumerFullInfo>) map.get(CONSUMERS);
pending = (List<String>) map.get(PENDING);
pending = (List<List<Object>>) map.get(PENDING);
lastDeliveredId = (StreamEntryID) map.get(LAST_DELIVERED);
pelCount = (Long) map.get(PEL_COUNT);

pending.stream().forEach(entry -> entry.set(0, new StreamEntryID((String) entry.get(0))));
}

public String getName() {
Expand All @@ -49,7 +50,7 @@ public List<StreamConsumerFullInfo> getConsumers() {
return consumers;
}

public List<String> getPending() {
public List<List<Object>> getPending() {
return pending;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,38 @@ public void xinfo() throws InterruptedException {
} catch (JedisException e) {
assertEquals("ERR no such key", e.getMessage());
}
}

@Test
public void xinfoStreamFullWithPending() {

Map<String, String> map = singletonMap("f1", "v1");
StreamEntryID id1 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false);

Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY);
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
XReadGroupParams.xReadGroupParams().count(1), streamQeury1);
assertEquals(1, range.size());
assertEquals(1, range.get(0).getValue().size());

StreamFullInfo full = jedis.xinfoStreamFull("streamfull2");
assertEquals(1, full.getGroups().size());
StreamGroupFullInfo group = full.getGroups().get(0);
assertEquals("xreadGroup-group", group.getName());

assertEquals(1, group.getPending().size());
List<Object> groupPendingEntry = group.getPending().get(0);
assertEquals(id1, groupPendingEntry.get(0));
assertEquals("xreadGroup-consumer", groupPendingEntry.get(1));

assertEquals(1, group.getConsumers().size());
StreamConsumerFullInfo consumer = group.getConsumers().get(0);
assertEquals("xreadGroup-consumer", consumer.getName());
assertEquals(1, consumer.getPending().size());
List<Object> consumerPendingEntry = consumer.getPending().get(0);
assertEquals(id1, consumerPendingEntry.get(0));
}

@Test
Expand Down