Skip to content

Commit

Permalink
Support IGNORE and other optional arguments for timeseries commands (#…
Browse files Browse the repository at this point in the history
…3860)

* Re-implement TS.ADD command with optional arguments
* Implement TS.INCRBY and TS.DECRBY commands with optional arguments
* Support IGNORE argument for TS.[ CREATE | ALTER | ADD | INCRBY | DECRBY] commands

---

* Cover optional arguments for timeseries commands
   - Re-implement TS.ADD command with optional arguments
   - Implement TS.INCRBY and TS.DECRBY commands with optional arguments

* Introduce EncodingFormat enum for <COMPRESSED|UNCOMPRESSED>

* Support IGNORE option
   and rename to TSIncrOrDecrByParams
  • Loading branch information
sazzad16 committed Jun 14, 2024
1 parent ef79d54 commit ac18fd0
Show file tree
Hide file tree
Showing 14 changed files with 600 additions and 37 deletions.
20 changes: 18 additions & 2 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -3946,9 +3946,15 @@ public final CommandObject<Long> tsAdd(String key, long timestamp, double value)
return new CommandObject<>(commandArguments(TimeSeriesCommand.ADD).key(key).add(timestamp).add(value), BuilderFactory.LONG);
}

@Deprecated
public final CommandObject<Long> tsAdd(String key, long timestamp, double value, TSCreateParams createParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.ADD).key(key)
.add(timestamp).add(value).addParams(createParams), BuilderFactory.LONG);
return new CommandObject<>(commandArguments(TimeSeriesCommand.ADD).key(key).add(timestamp).add(value)
.addParams(createParams), BuilderFactory.LONG);
}

public final CommandObject<Long> tsAdd(String key, long timestamp, double value, TSAddParams addParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.ADD).key(key).add(timestamp).add(value)
.addParams(addParams), BuilderFactory.LONG);
}

public final CommandObject<List<Long>> tsMAdd(Map.Entry<String, TSElement>... entries) {
Expand All @@ -3968,6 +3974,11 @@ public final CommandObject<Long> tsIncrBy(String key, double value, long timesta
.add(TimeSeriesKeyword.TIMESTAMP).add(timestamp), BuilderFactory.LONG);
}

public final CommandObject<Long> tsIncrBy(String key, double addend, TSIncrOrDecrByParams incrByParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.INCRBY).key(key).add(addend)
.addParams(incrByParams), BuilderFactory.LONG);
}

public final CommandObject<Long> tsDecrBy(String key, double value) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.DECRBY).key(key).add(value), BuilderFactory.LONG);
}
Expand All @@ -3977,6 +3988,11 @@ public final CommandObject<Long> tsDecrBy(String key, double value, long timesta
.add(TimeSeriesKeyword.TIMESTAMP).add(timestamp), BuilderFactory.LONG);
}

public final CommandObject<Long> tsDecrBy(String key, double subtrahend, TSIncrOrDecrByParams decrByParams) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.DECRBY).key(key).add(subtrahend)
.addParams(decrByParams), BuilderFactory.LONG);
}

public final CommandObject<List<TSElement>> tsRange(String key, long fromTimestamp, long toTimestamp) {
return new CommandObject<>(commandArguments(TimeSeriesCommand.RANGE).key(key)
.add(fromTimestamp).add(toTimestamp), TimeSeriesBuilderFactory.TIMESERIES_ELEMENT_LIST);
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/PipeliningBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3948,6 +3948,11 @@ public Response<Long> tsAdd(String key, long timestamp, double value, TSCreatePa
return appendCommand(commandObjects.tsAdd(key, timestamp, value, createParams));
}

@Override
public Response<Long> tsAdd(String key, long timestamp, double value, TSAddParams addParams) {
return appendCommand(commandObjects.tsAdd(key, timestamp, value, addParams));
}

@Override
public Response<List<Long>> tsMAdd(Map.Entry<String, TSElement>... entries) {
return appendCommand(commandObjects.tsMAdd(entries));
Expand All @@ -3963,6 +3968,11 @@ public Response<Long> tsIncrBy(String key, double value, long timestamp) {
return appendCommand(commandObjects.tsIncrBy(key, value, timestamp));
}

@Override
public Response<Long> tsIncrBy(String key, double addend, TSIncrOrDecrByParams incrByParams) {
return appendCommand(commandObjects.tsIncrBy(key, addend, incrByParams));
}

@Override
public Response<Long> tsDecrBy(String key, double value) {
return appendCommand(commandObjects.tsDecrBy(key, value));
Expand All @@ -3973,6 +3983,11 @@ public Response<Long> tsDecrBy(String key, double value, long timestamp) {
return appendCommand(commandObjects.tsDecrBy(key, value, timestamp));
}

@Override
public Response<Long> tsDecrBy(String key, double subtrahend, TSIncrOrDecrByParams decrByParams) {
return appendCommand(commandObjects.tsDecrBy(key, subtrahend, decrByParams));
}

@Override
public Response<List<TSElement>> tsRange(String key, long fromTimestamp, long toTimestamp) {
return appendCommand(commandObjects.tsRange(key, fromTimestamp, toTimestamp));
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4513,6 +4513,11 @@ public long tsAdd(String key, long timestamp, double value, TSCreateParams creat
return executeCommand(commandObjects.tsAdd(key, timestamp, value, createParams));
}

@Override
public long tsAdd(String key, long timestamp, double value, TSAddParams addParams) {
return executeCommand(commandObjects.tsAdd(key, timestamp, value, addParams));
}

@Override
public List<Long> tsMAdd(Map.Entry<String, TSElement>... entries) {
return executeCommand(commandObjects.tsMAdd(entries));
Expand All @@ -4528,6 +4533,11 @@ public long tsIncrBy(String key, double value, long timestamp) {
return executeCommand(commandObjects.tsIncrBy(key, value, timestamp));
}

@Override
public long tsIncrBy(String key, double addend, TSIncrOrDecrByParams incrByParams) {
return executeCommand(commandObjects.tsIncrBy(key, addend, incrByParams));
}

@Override
public long tsDecrBy(String key, double value) {
return executeCommand(commandObjects.tsDecrBy(key, value));
Expand All @@ -4538,6 +4548,11 @@ public long tsDecrBy(String key, double value, long timestamp) {
return executeCommand(commandObjects.tsDecrBy(key, value, timestamp));
}

@Override
public long tsDecrBy(String key, double subtrahend, TSIncrOrDecrByParams decrByParams) {
return executeCommand(commandObjects.tsDecrBy(key, subtrahend, decrByParams));
}

@Override
public List<TSElement> tsRange(String key, long fromTimestamp, long toTimestamp) {
return checkAndClientSideCacheCommand(commandObjects.tsRange(key, fromTimestamp, toTimestamp), key);
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/redis/clients/jedis/timeseries/EncodingFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package redis.clients.jedis.timeseries;

import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.util.SafeEncoder;

/**
* Specifies the series samples encoding format.
*/
public enum EncodingFormat implements Rawable {

COMPRESSED,
UNCOMPRESSED;

private final byte[] raw;

private EncodingFormat() {
raw = SafeEncoder.encode(name());
}

@Override
public byte[] getRaw() {
return raw;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,33 @@ public interface RedisTimeSeriesCommands {
long tsAdd(String key, long timestamp, double value);

/**
* {@code TS.ADD key timestamp value [RETENTION retentionTime] [ENCODING [COMPRESSED|UNCOMPRESSED]] [CHUNK_SIZE size] [ON_DUPLICATE policy] [LABELS label value..]}
*
* @param key
* @param timestamp
* @param value
* @param createParams
* @return timestamp
* @deprecated Use {@link RedisTimeSeriesCommands#tsAdd(java.lang.String, long, double, redis.clients.jedis.timeseries.TSAddParams)}.
*/
@Deprecated
long tsAdd(String key, long timestamp, double value, TSCreateParams createParams);

/**
* {@code TS.ADD key timestamp value
* [RETENTION retentionTime]
* [ENCODING <COMPRESSED|UNCOMPRESSED>]
* [CHUNK_SIZE size]
* [DUPLICATE_POLICY policy]
* [ON_DUPLICATE policy_ovr]
* [LABELS label value..]}
*
* @param key
* @param timestamp
* @param value
* @param addParams
* @return timestamp
*/
long tsAdd(String key, long timestamp, double value, TSAddParams addParams);

/**
* {@code TS.MADD key timestamp value [key timestamp value ...]}
*
Expand All @@ -81,10 +98,44 @@ public interface RedisTimeSeriesCommands {

long tsIncrBy(String key, double value, long timestamp);

/**
* {@code TS.INCRBY key addend
* [TIMESTAMP timestamp]
* [RETENTION retentionPeriod]
* [ENCODING <COMPRESSED|UNCOMPRESSED>]
* [CHUNK_SIZE size]
* [DUPLICATE_POLICY policy]
* [IGNORE ignoreMaxTimediff ignoreMaxValDiff]
* [LABELS [label value ...]]}
*
* @param key
* @param addend
* @param incrByParams
* @return timestamp
*/
long tsIncrBy(String key, double addend, TSIncrOrDecrByParams incrByParams);

long tsDecrBy(String key, double value);

long tsDecrBy(String key, double value, long timestamp);

/**
* {@code TS.DECRBY key subtrahend
* [TIMESTAMP timestamp]
* [RETENTION retentionPeriod]
* [ENCODING <COMPRESSED|UNCOMPRESSED>]
* [CHUNK_SIZE size]
* [DUPLICATE_POLICY policy]
* [IGNORE ignoreMaxTimediff ignoreMaxValDiff]
* [LABELS [label value ...]]}
*
* @param key
* @param subtrahend
* @param decrByParams
* @return timestamp
*/
long tsDecrBy(String key, double subtrahend, TSIncrOrDecrByParams decrByParams);

/**
* {@code TS.RANGE key fromTimestamp toTimestamp}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@ public interface RedisTimeSeriesPipelineCommands {

Response<Long> tsAdd(String key, long timestamp, double value);

@Deprecated
Response<Long> tsAdd(String key, long timestamp, double value, TSCreateParams createParams);

Response<Long> tsAdd(String key, long timestamp, double value, TSAddParams addParams);

Response<List<Long>> tsMAdd(Map.Entry<String, TSElement>... entries);

Response<Long> tsIncrBy(String key, double value);

Response<Long> tsIncrBy(String key, double value, long timestamp);

Response<Long> tsIncrBy(String key, double addend, TSIncrOrDecrByParams incrByParams);

Response<Long> tsDecrBy(String key, double value);

Response<Long> tsDecrBy(String key, double value, long timestamp);

Response<Long> tsDecrBy(String key, double subtrahend, TSIncrOrDecrByParams decrByParams);

Response<List<TSElement>> tsRange(String key, long fromTimestamp, long toTimestamp);

Response<List<TSElement>> tsRange(String key, TSRangeParams rangeParams);
Expand Down
128 changes: 128 additions & 0 deletions src/main/java/redis/clients/jedis/timeseries/TSAddParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package redis.clients.jedis.timeseries;

import static redis.clients.jedis.Protocol.toByteArray;
import static redis.clients.jedis.timeseries.TimeSeriesProtocol.TimeSeriesKeyword.*;

import java.util.LinkedHashMap;
import java.util.Map;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.params.IParams;

/**
* Represents optional arguments of TS.ADD command.
*/
public class TSAddParams implements IParams {

private Long retentionPeriod;
private EncodingFormat encoding;
private Long chunkSize;
private DuplicatePolicy duplicatePolicy;
private DuplicatePolicy onDuplicate;

private boolean ignore;
private long ignoreMaxTimediff;
private double ignoreMaxValDiff;

private Map<String, String> labels;

public TSAddParams() {
}

public static TSAddParams addParams() {
return new TSAddParams();
}

public TSAddParams retention(long retentionPeriod) {
this.retentionPeriod = retentionPeriod;
return this;
}

public TSAddParams encoding(EncodingFormat encoding) {
this.encoding = encoding;
return this;
}

public TSAddParams chunkSize(long chunkSize) {
this.chunkSize = chunkSize;
return this;
}

public TSAddParams duplicatePolicy(DuplicatePolicy duplicatePolicy) {
this.duplicatePolicy = duplicatePolicy;
return this;
}

public TSAddParams onDuplicate(DuplicatePolicy onDuplicate) {
this.onDuplicate = onDuplicate;
return this;
}

public TSAddParams ignore(long maxTimediff, double maxValDiff) {
this.ignore = true;
this.ignoreMaxTimediff = maxTimediff;
this.ignoreMaxValDiff = maxValDiff;
return this;
}

/**
* Set label-value pairs
*
* @param labels label-value pairs
* @return the object itself
*/
public TSAddParams labels(Map<String, String> labels) {
this.labels = labels;
return this;
}

/**
* Add label-value pair. Multiple pairs can be added through chaining.
* @param label
* @param value
* @return the object itself
*/
public TSAddParams label(String label, String value) {
if (this.labels == null) {
this.labels = new LinkedHashMap<>();
}
this.labels.put(label, value);
return this;
}

@Override
public void addParams(CommandArguments args) {

if (retentionPeriod != null) {
args.add(RETENTION).add(toByteArray(retentionPeriod));
}

if (encoding != null) {
args.add(ENCODING).add(encoding);
}

if (chunkSize != null) {
args.add(CHUNK_SIZE).add(toByteArray(chunkSize));
}

if (duplicatePolicy != null) {
args.add(DUPLICATE_POLICY).add(duplicatePolicy);
}

if (duplicatePolicy != null) {
args.add(DUPLICATE_POLICY).add(duplicatePolicy);
}

if (onDuplicate != null) {
args.add(ON_DUPLICATE).add(onDuplicate);
}

if (ignore) {
args.add(IGNORE).add(ignoreMaxTimediff).add(ignoreMaxValDiff);
}

if (labels != null) {
args.add(LABELS);
labels.entrySet().forEach((entry) -> args.add(entry.getKey()).add(entry.getValue()));
}
}
}
Loading

0 comments on commit ac18fd0

Please sign in to comment.