Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for getting Summary info by XPENDING #2417

Merged
merged 3 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,10 @@ public void xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int
}
}

public void xpendingSummary(final byte[] key, final byte[] groupname) {
sendCommand(XPENDING, key, groupname);
}

public void xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[][] ids) {

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4497,6 +4497,13 @@ public List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[]
return client.getObjectMultiBulkReply();
}

@Override
public Object xpendingSummary(final byte[] key, final byte[] groupname) {
checkIsInMultiOrPipeline();
client.xpendingSummary(key, groupname);
return client.getOne();
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[]... ids) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2450,6 +2450,16 @@ public List<Object> execute(Jedis connection) {
}.runBinary(key);
}

@Override
public Object xpendingSummary(final byte[] key, final byte[] groupname) {
return new JedisClusterCommand<Object>(connectionHandler, maxAttempts) {
@Override
public Object execute(Jedis connection) {
return connection.xpendingSummary(key, groupname);
}
}.runBinary(key);
}

@Override
public List<byte[]> xclaim(final byte[] key, final byte[] groupname, final byte[] consumername,
final long minIdleTime, final long newIdleTime, final int retries, final boolean force,
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/redis/clients/jedis/BinaryShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,12 @@ public List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[]
return j.xpending(key, groupname, start, end, count, consumername);
}

@Override
public Object xpendingSummary(final byte[] key, final byte[] groupname) {
Jedis j = getShard(key);
return j.xpendingSummary(key, groupname);
}

@Override
public List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, byte[]... ids) {
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,32 @@ public String toString() {
}
};

public static final Builder<StreamPendingSummary> STREAM_PENDING_SUMMARY = new Builder<StreamPendingSummary>() {
@Override
@SuppressWarnings("unchecked")
public StreamPendingSummary build(Object data) {
if (null == data) {
return null;
}

List<Object> objectList = (List<Object>) data;
long total = BuilderFactory.LONG.build(objectList.get(0));
String minId = SafeEncoder.encode((byte[]) objectList.get(1));
String maxId = SafeEncoder.encode((byte[]) objectList.get(2));
List<List<Object>> consumerObjList = (List<List<Object>>) objectList.get(3);
Map<String, Long> map = new HashMap<>(consumerObjList.size());
for (List<Object> consumerObj : consumerObjList) {
map.put(SafeEncoder.encode((byte[]) consumerObj.get(0)), Long.parseLong(SafeEncoder.encode((byte[]) consumerObj.get(1))));
}
return new StreamPendingSummary(total, new StreamEntryID(minId), new StreamEntryID(maxId), map);
}

@Override
public String toString() {
return "StreamPendingSummary";
}
};

private static Map<String, Object> createMapFromDecodingFunctions(Iterator<Object> iterator,
Map<String, Builder> mappingFunctions) {

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,11 @@ public void xpending(String key, String groupname, StreamEntryID start, StreamEn
SafeEncoder.encode(end==null ? "+" : end.toString()), count, consumername == null? null : SafeEncoder.encode(consumername));
}

@Override
public void xpendingSummary(String key, String groupname) {
xpendingSummary(SafeEncoder.encode(key), SafeEncoder.encode(groupname));
}

@Override
public void xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4117,6 +4117,13 @@ public List<StreamPendingEntry> xpending(final String key, final String groupnam
return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(client.getObjectMultiBulkReply());
}

@Override
public StreamPendingSummary xpendingSummary(final String key, final String groupname) {
checkIsInMultiOrPipeline();
client.xpendingSummary(key, groupname);
return BuilderFactory.STREAM_PENDING_SUMMARY.build(client.getObjectMultiBulkReply());
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2522,6 +2522,16 @@ public List<StreamPendingEntry> execute(Jedis connection) {
}.run(key);
}

@Override
public StreamPendingSummary xpendingSummary(final String key, final String groupname) {
return new JedisClusterCommand<StreamPendingSummary>(connectionHandler, maxAttempts) {
@Override
public StreamPendingSummary execute(Jedis connection) {
return connection.xpendingSummary(key, groupname);
}
}.run(key);
}

@Override
public Long xdel(final String key, final StreamEntryID... ids) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/redis/clients/jedis/ShardedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,12 @@ public List<StreamPendingEntry> xpending(String key, String groupname, StreamEnt
return j.xpending(key, groupname, start, end, count, consumername);
}

@Override
public StreamPendingSummary xpendingSummary(String key, String groupname) {
Jedis j = getShard(key);
return j.xpendingSummary(key, groupname);
}

@Override
public List<StreamEntry> xclaim(String key, String group, String consumername, long minIdleTime,
long newIdleTime, int retries, boolean force, StreamEntryID... ids) {
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/redis/clients/jedis/StreamPendingSummary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package redis.clients.jedis;

import java.io.Serializable;
import java.util.Map;

public class StreamPendingSummary implements Serializable {

private static final long serialVersionUID = 1L;

private final long total;
private final StreamEntryID minId;
private final StreamEntryID maxId;
private final Map<String, Long> consumerMessageCount;

public StreamPendingSummary(long total, StreamEntryID minId, StreamEntryID maxId,
Map<String, Long> consumerMessageCount) {
this.total = total;
this.minId = minId;
this.maxId = maxId;
this.consumerMessageCount = consumerMessageCount;
}

public long getTotal() {
return total;
}

public StreamEntryID getMinId() {
return minId;
}

public StreamEntryID getMaxId() {
return maxId;
}

public Map<String, Long> getConsumerMessageCount() {
return consumerMessageCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(byte[] key, byte[] member, dou

List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);

Object xpendingSummary(final byte[] key, final byte[] groupname);

List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[][] ids);

Long waitReplicas(byte[] key, int replicas, long timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(byte[] key, byte[] member, dou
* @return lenth of the value for key
*/
Long hstrlen(byte[] key, byte[] field);


byte[] xadd(byte[] key, byte[] id, Map<byte[], byte[]> hash, long maxLen, boolean approximateLength);

Long xlen(byte[] key);
Expand All @@ -409,21 +409,23 @@ default List<byte[]> xrange(byte[] key, byte[] start, byte[] end, long count) {
List<byte[]> xrevrange(byte[] key, byte[] end, byte[] start, int count);

Long xack(byte[] key, byte[] group, byte[]... ids);

String xgroupCreate(byte[] key, byte[] consumer, byte[] id, boolean makeStream);

String xgroupSetID(byte[] key, byte[] consumer, byte[] id);

Long xgroupDestroy(byte[] key, byte[] consumer);

Long xgroupDelConsumer(byte[] key, byte[] consumer, byte[] consumerName);

Long xdel(byte[] key, byte[]... ids);

Long xtrim(byte[] key, long maxLen, boolean approximateLength);

List<Object> xpending(byte[] key, byte[] groupname, byte[] start, byte[] end, int count, byte[] consumername);

Object xpendingSummary(byte[] key, byte[] groupname);

List<byte[]> xclaim(byte[] key, byte[] groupname, byte[] consumername, long minIdleTime, long newIdleTime, int retries, boolean force, byte[]... ids);

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/redis/clients/jedis/commands/Commands.java
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ default void restoreReplace(String key, int ttl, byte[] serializedValue) {

void xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername);

void xpendingSummary(String key, String groupname);

void xclaim(String key, String group, String consumername, long minIdleTime, long newIdleTime,
int retries, boolean force, StreamEntryID... ids);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.SortingParams;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamPendingSummary;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.SetParams;
Expand Down Expand Up @@ -517,6 +518,15 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(String key, String member, dou
*/
List<StreamPendingEntry> xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername);

/**
* XPENDING key group
*
* @param key
* @param groupname
* @return
*/
StreamPendingSummary xpendingSummary(String key, String groupname);

/**
* XDEL key ID [ID ...]
* @param key
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/commands/JedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.SortingParams;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamPendingSummary;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.SetParams;
Expand Down Expand Up @@ -518,6 +519,15 @@ List<GeoRadiusResponse> georadiusByMemberReadonly(String key, String member, dou
*/
List<StreamPendingEntry> xpending(String key, String groupname, StreamEntryID start, StreamEntryID end, int count, String consumername);

/**
* XPENDING key group
*
* @param key
* @param groupname
* @return
*/
StreamPendingSummary xpendingSummary(String key, String groupname);

/**
* XDEL key ID [ID ...]
* @param key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ public void xpendeing() {
assertEquals(1, range.get(0).getValue().size());
assertEquals(map, range.get(0).getValue().get(0).getFields());

// Get the summary about the pending messages
StreamPendingSummary pendingSummary = jedis.xpendingSummary("xpendeing-stream", "xpendeing-group");
assertEquals(1, pendingSummary.getTotal());
assertEquals(id1, pendingSummary.getMinId());
assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue());

// Get the pending event
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
null, null, 3, "xpendeing-consumer");
Expand Down