Skip to content

Commit

Permalink
Add last entry id for XREAD variants and support XREAD reply as map
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Mar 26, 2024
1 parent bad18b9 commit a6666c1
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 53 deletions.
37 changes: 33 additions & 4 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1420,10 +1420,10 @@ public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
.collect(Collectors.toList());
} else {
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(list.size());
for (Object streamObj : list) {
List<Object> stream = (List<Object>) streamObj;
String streamKey = STRING.build(stream.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(stream.get(1));
for (Object anObj : list) {
List<Object> streamObj = (List<Object>) anObj;
String streamKey = STRING.build(streamObj.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1));
result.add(KeyValue.of(streamKey, streamEntries));
}
return result;
Expand All @@ -1436,6 +1436,35 @@ public String toString() {
}
};

public static final Builder<Map<String, List<StreamEntry>>> STREAM_READ_RESPONSE_MAP
= new Builder<Map<String, List<StreamEntry>>>() {
@Override
public Map<String, List<StreamEntry>> build(Object data) {
if (data == null) return null;
List list = (List) data;
if (list.isEmpty()) return Collections.emptyMap();

if (list.get(0) instanceof KeyValue) {
return ((List<KeyValue>) list).stream()
.collect(Collectors.toMap(kv -> STRING.build(kv.getKey()), kv -> STREAM_ENTRY_LIST.build(kv.getValue())));
} else {
Map<String, List<StreamEntry>> result = new HashMap<>(list.size());
for (Object anObj : list) {
List<Object> streamObj = (List<Object>) anObj;
String streamKey = STRING.build(streamObj.get(0));
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1));
result.put(streamKey, streamEntries);
}
return result;
}
}

@Override
public String toString() {
return "Map<String, List<StreamEntry>>";
}
};

public static final Builder<List<StreamPendingEntry>> STREAM_PENDING_ENTRY_LIST = new Builder<List<StreamPendingEntry>>() {
@Override
@SuppressWarnings("unchecked")
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,15 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xread(
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<Map<String, List<StreamEntry>>> xreadAsMap(
XReadParams xReadParams, Map<String, StreamEntryID> streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
Set<Map.Entry<String, StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE_MAP);
}

public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(
String groupName, String consumer, XReadGroupParams xReadGroupParams,
Map<String, StreamEntryID> streams) {
Expand All @@ -2678,6 +2687,18 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGrou
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
}

public final CommandObject<Map<String, List<StreamEntry>>> xreadGroupAsMap(
String groupName, String consumer, XReadGroupParams xReadGroupParams,
Map<String, StreamEntryID> streams) {
CommandArguments args = commandArguments(XREADGROUP)
.add(GROUP).add(groupName).add(consumer)
.addParams(xReadGroupParams).add(STREAMS);
Set<Map.Entry<String, StreamEntryID>> entrySet = streams.entrySet();
entrySet.forEach(entry -> args.key(entry.getKey()));
entrySet.forEach(entry -> args.add(entry.getValue()));
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE_MAP);
}

public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
for (Map.Entry<byte[], byte[]> entry : streams) {
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -9394,6 +9394,12 @@ public List<Map.Entry<String, List<StreamEntry>>> xread(final XReadParams xReadP
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadAsMap(final XReadParams xReadParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadAsMap(xReadParams, streams));
}

@Override
public long xack(final String key, final String group, final StreamEntryID... ids) {
checkIsInMultiOrPipeline();
Expand Down Expand Up @@ -9450,13 +9456,19 @@ public long xtrim(final String key, final XTrimParams params) {
}

@Override
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName,
final String consumer, final XReadGroupParams xReadGroupParams,
final Map<String, StreamEntryID> streams) {
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName, final String consumer,
final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadGroupAsMap(final String groupName, final String consumer,
final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams) {
checkIsInMultiOrPipeline();
return connection.executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
}

@Override
public StreamPendingSummary xpending(final String key, final String groupName) {
checkIsInMultiOrPipeline();
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/PipeliningBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1536,11 +1536,21 @@ public Response<List<Map.Entry<String, List<StreamEntry>>>> xread(XReadParams xR
return appendCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Response<Map<String, List<StreamEntry>>> xreadAsMap(XReadParams xReadParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadAsMap(xReadParams, streams));
}

@Override
public Response<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Response<Map<String, List<StreamEntry>>> xreadGroupAsMap(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return appendCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Response<Object> eval(String script) {
return appendCommand(commandObjects.eval(script));
Expand Down
50 changes: 43 additions & 7 deletions src/main/java/redis/clients/jedis/StreamEntryID.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
/**
* Should be used only with XADD
*
* <code>
* XADD mystream * field1 value1
* </code>
* {@code XADD mystream * field1 value1}
*/
public static final StreamEntryID NEW_ENTRY = new StreamEntryID() {

Expand All @@ -97,11 +95,30 @@ public String toString() {
/**
* Should be used only with XGROUP CREATE
*
* <code>
* XGROUP CREATE mystream consumer-group-name $
* </code>
* {@code XGROUP CREATE mystream consumer-group-name $}
*/
public static final StreamEntryID LAST_ENTRY = new StreamEntryID() {
public static final StreamEntryID XGROUP_LAST_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

@Override
public String toString() {
return "$";
}
};

/**
* @deprecated Use {@link StreamEntryID#XGROUP_LAST_ID} or {@link StreamEntryID#XREAD_NEW_ENTRY}.
*/
@Deprecated
public static final StreamEntryID LAST_ENTRY = XGROUP_LAST_ENTRY;

/**
* Should be used only with XREAD
*
* {@code XREAD BLOCK 5000 COUNT 100 STREAMS mystream $}
*/
public static final StreamEntryID XREAD_NEW_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -129,6 +146,7 @@ public String toString() {
/**
* Can be used in XRANGE, XREVRANGE and XPENDING commands.
*/
// TODO: FIRST_ID ?
public static final StreamEntryID MINIMUM_ID = new StreamEntryID() {

private static final long serialVersionUID = 1L;
Expand All @@ -142,6 +160,8 @@ public String toString() {
/**
* Can be used in XRANGE, XREVRANGE and XPENDING commands.
*/
// TODO: LAST_ID ?
// TODO: unify with XREAD_LAST_ENTRY ??
public static final StreamEntryID MAXIMUM_ID = new StreamEntryID() {

private static final long serialVersionUID = 1L;
Expand All @@ -151,4 +171,20 @@ public String toString() {
return "+";
}
};

/**
* Should be used only with XREAD
*
* {@code XREAD STREAMS mystream +}
*/
// TODO: unify with MAXIMUM_ID ??
public static final StreamEntryID XREAD_LAST_ENTRY = new StreamEntryID() {

private static final long serialVersionUID = 1L;

@Override
public String toString() {
return "+";
}
};
}
11 changes: 11 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3066,12 +3066,23 @@ public List<Map.Entry<String, List<StreamEntry>>> xread(XReadParams xReadParams,
return executeCommand(commandObjects.xread(xReadParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadAsMap(XReadParams xReadParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadAsMap(xReadParams, streams));
}

@Override
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
}

@Override
public Map<String, List<StreamEntry>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
}

@Override
public byte[] xadd(byte[] key, XAddParams params, Map<byte[], byte[]> hash) {
return executeCommand(commandObjects.xadd(key, params, hash));
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/redis/clients/jedis/commands/StreamCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,19 @@ List<Map.Entry<String, List<StreamEntry>>> xread(XReadParams xReadParams,
/**
* XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
*/
Map<String, List<StreamEntry>> xreadAsMap(XReadParams xReadParams,
Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Map<String, List<StreamEntry>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,19 @@ Response<List<Map.Entry<String, List<StreamEntry>>>> xread(XReadParams xReadPara
/**
* XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
*/
Response<Map<String, List<StreamEntry>>> xreadAsMap(XReadParams xReadParams,
Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Response<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

/**
* XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
*/
Response<Map<String, List<StreamEntry>>> xreadGroupAsMap(String groupName, String consumer,
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams);

}
Loading

0 comments on commit a6666c1

Please sign in to comment.