Skip to content

Commit

Permalink
Support RedisTimeSeries (redis#2854)
Browse files Browse the repository at this point in the history
* Support RedisTimeSeries

* remove commented codes and format

* one more little test

* Add TimeSeries pipeline/transaction commands

* Rename to KeyedTSElements

* Reorder imports

* Prepare test
  • Loading branch information
sazzad16 authored and zeekling committed Feb 2, 2022
1 parent be5f0ee commit 8ca7d4f
Show file tree
Hide file tree
Showing 21 changed files with 2,490 additions and 142 deletions.
43 changes: 43 additions & 0 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import redis.clients.jedis.resps.LCSMatchResult.Position;
import redis.clients.jedis.resps.*;
import redis.clients.jedis.search.aggr.AggregationResult;
import redis.clients.jedis.timeseries.KeyedTSElements;
import redis.clients.jedis.timeseries.TSElement;
import redis.clients.jedis.util.JedisByteHashMap;
import redis.clients.jedis.util.SafeEncoder;

Expand Down Expand Up @@ -1492,6 +1494,47 @@ public Map<String, List<String>> build(Object data) {
}
};

public static final Builder<TSElement> TIMESERIES_ELEMENT = new Builder<TSElement>() {
@Override
public TSElement build(Object data) {
List<Object> list = (List<Object>) data;
if (list == null || list.isEmpty()) return null;
return new TSElement(LONG.build(list.get(0)), DOUBLE.build(list.get(1)));
}
};

public static final Builder<List<TSElement>> TIMESERIES_ELEMENT_LIST = new Builder<List<TSElement>>() {
@Override
public List<TSElement> build(Object data) {
return ((List<Object>) data).stream().map((pairObject) -> (List<Object>) pairObject)
.map((pairList)
-> new TSElement(LONG.build(pairList.get(0)), DOUBLE.build(pairList.get(1))))
.collect(Collectors.toList());
}
};

public static final Builder<List<KeyedTSElements>> TIMESERIES_MRANGE_RESPONSE = new Builder<List<KeyedTSElements>>() {
@Override
public List<KeyedTSElements> build(Object data) {
return ((List<Object>) data).stream().map((tsObject) -> (List<Object>) tsObject)
.map((tsList) -> new KeyedTSElements(STRING.build(tsList.get(0)),
STRING_MAP_FROM_PAIRS.build(tsList.get(1)),
TIMESERIES_ELEMENT_LIST.build(tsList.get(2))))
.collect(Collectors.toList());
}
};

public static final Builder<List<KeyedTSElements>> TIMESERIES_MGET_RESPONSE = new Builder<List<KeyedTSElements>>() {
@Override
public List<KeyedTSElements> build(Object data) {
return ((List<Object>) data).stream().map((tsObject) -> (List<Object>) tsObject)
.map((tsList) -> new KeyedTSElements(STRING.build(tsList.get(0)),
STRING_MAP_FROM_PAIRS.build(tsList.get(1)),
TIMESERIES_ELEMENT.build(tsList.get(2))))
.collect(Collectors.toList());
}
};

/**
* A decorator to implement Set from List. Assume that given List do not contains duplicated
* values. The resulting set displays the same ordering, concurrency, and performance
Expand Down
106 changes: 103 additions & 3 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.args.*;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.json.*;
import redis.clients.jedis.json.JsonProtocol.JsonCommand;
import redis.clients.jedis.json.JsonSetParams;
import redis.clients.jedis.json.Path;
import redis.clients.jedis.json.Path2;
import redis.clients.jedis.params.*;
import redis.clients.jedis.resps.*;
import redis.clients.jedis.search.*;
Expand All @@ -28,6 +26,9 @@
import redis.clients.jedis.search.SearchResult.SearchResultBuilder;
import redis.clients.jedis.search.aggr.AggregationBuilder;
import redis.clients.jedis.search.aggr.AggregationResult;
import redis.clients.jedis.timeseries.*;
import redis.clients.jedis.timeseries.TimeSeriesProtocol.TimeSeriesCommand;
import redis.clients.jedis.timeseries.TimeSeriesProtocol.TimeSeriesKeyword;

public class CommandObjects {

Expand Down Expand Up @@ -3066,6 +3067,105 @@ public final CommandObject<Long> jsonArrTrim(String key, Path path, int start, i
}
// RedisJSON commands

// RedisTimeSeries commands
public final CommandObject<String> tsCreate(String key) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.CREATE).key(key), BuilderFactory.STRING);
}

public final CommandObject<String> tsCreate(String key, TSCreateParams createParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.CREATE).key(key).addParams(createParams), BuilderFactory.STRING);
}

public final CommandObject<Long> tsDel(String key, long fromTimestamp, long toTimestamp) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.DEL).key(key)
.add(fromTimestamp).add(toTimestamp), BuilderFactory.LONG);
}

public final CommandObject<String> tsAlter(String key, TSAlterParams alterParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.ALTER).key(key).addParams(alterParams), BuilderFactory.STRING);
}

public final CommandObject<Long> tsAdd(String key, double value) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.ADD).key(key)
.add(Protocol.BYTES_ASTERISK).add(value), BuilderFactory.LONG);
}

public final CommandObject<Long> tsAdd(String key, long timestamp, double value) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.ADD).key(key)
.add(timestamp).add(value), BuilderFactory.LONG);
}

public final CommandObject<Long> tsAdd(String key, long timestamp, double value, TSCreateParams createParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.ADD).key(key)
.add(timestamp).add(value).addParams(createParams), BuilderFactory.LONG);
}

public final CommandObject<List<TSElement>> tsRange(String key, long fromTimestamp, long toTimestamp) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.RANGE).key(key)
.add(fromTimestamp).add(toTimestamp), BuilderFactory.TIMESERIES_ELEMENT_LIST);
}

public final CommandObject<List<TSElement>> tsRange(String key, TSRangeParams rangeParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.RANGE).key(key)
.addParams(rangeParams), BuilderFactory.TIMESERIES_ELEMENT_LIST);
}

public final CommandObject<List<TSElement>> tsRevRange(String key, long fromTimestamp, long toTimestamp) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.REVRANGE).key(key)
.add(fromTimestamp).add(toTimestamp), BuilderFactory.TIMESERIES_ELEMENT_LIST);
}

public final CommandObject<List<TSElement>> tsRevRange(String key, TSRangeParams rangeParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.REVRANGE).key(key)
.addParams(rangeParams), BuilderFactory.TIMESERIES_ELEMENT_LIST);
}

public final CommandObject<List<KeyedTSElements>> tsMRange(long fromTimestamp, long toTimestamp, String... filters) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.MRANGE).add(fromTimestamp)
.add(toTimestamp).add(TimeSeriesKeyword.FILTER).addObjects((Object[]) filters),
BuilderFactory.TIMESERIES_MRANGE_RESPONSE);
}

public final CommandObject<List<KeyedTSElements>> tsMRange(TSMRangeParams multiRangeParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.MRANGE)
.addParams(multiRangeParams), BuilderFactory.TIMESERIES_MRANGE_RESPONSE);
}

public final CommandObject<List<KeyedTSElements>> tsMRevRange(long fromTimestamp, long toTimestamp, String... filters) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.MREVRANGE).add(fromTimestamp)
.add(toTimestamp).add(TimeSeriesKeyword.FILTER).addObjects((Object[]) filters),
BuilderFactory.TIMESERIES_MRANGE_RESPONSE);
}

public final CommandObject<List<KeyedTSElements>> tsMRevRange(TSMRangeParams multiRangeParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.MREVRANGE).addParams(multiRangeParams),
BuilderFactory.TIMESERIES_MRANGE_RESPONSE);
}

public final CommandObject<TSElement> tsGet(String key) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.GET).key(key), BuilderFactory.TIMESERIES_ELEMENT);
}

public final CommandObject<List<KeyedTSElements>> tsMGet(TSMGetParams multiGetParams, String... filters) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.MGET).addParams(multiGetParams)
.add(TimeSeriesKeyword.FILTER).addObjects((Object[]) filters), BuilderFactory.TIMESERIES_MGET_RESPONSE);
}

public final CommandObject<String> tsCreateRule(String sourceKey, String destKey,
AggregationType aggregationType, long timeBucket) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.CREATERULE).key(sourceKey).key(destKey)
.add(TimeSeriesKeyword.AGGREGATION).add(aggregationType).add(timeBucket), BuilderFactory.STRING);
}

public final CommandObject<String> tsDeleteRule(String sourceKey, String destKey) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.DELETERULE).key(sourceKey).key(destKey), BuilderFactory.STRING);
}

public final CommandObject<List<String>> tsQueryIndex(String... filters) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.QUERYINDEX).addObjects((Object[]) filters), BuilderFactory.STRING_LIST);
}
// RedisTimeSeries commands

private static final Gson GSON = new Gson();

private class GsonObjectBuilder<T> extends Builder<T> {
Expand Down
Loading

0 comments on commit 8ca7d4f

Please sign in to comment.