Skip to content

Commit

Permalink
Aggregation bucket align (for RedisTimeSeries >= 1.6.0)
Browse files Browse the repository at this point in the history
  • Loading branch information
xdev-developer committed Dec 18, 2021
1 parent 4ebd561 commit 54d197c
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 dengliming.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.dengliming.redismodule.redistimeseries;

/**
* Aggregation Align type
*
* @author xdev.developer
*/
public enum Align {
START("start"), END("end");

private String key;

Align(String key) {
this.key = key;
}

public String getKey() {
return key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class RangeOptions {

private int count;
private Aggregation aggregationType;
private Align aggregationAlign;
private long timeBucket;
private boolean withLabels;

Expand All @@ -41,6 +42,13 @@ public RangeOptions aggregationType(Aggregation aggregationType, long timeBucket
return this;
}

public RangeOptions aggregationType(Aggregation aggregationType, long timeBucket, Align align) {
this.aggregationType = aggregationType;
this.timeBucket = timeBucket;
this.aggregationAlign = align;
return this;
}

public RangeOptions withLabels() {
this.withLabels = true;
return this;
Expand All @@ -55,6 +63,10 @@ public void build(List<Object> args) {
args.add(Keywords.AGGREGATION);
args.add(aggregationType.getKey());
args.add(timeBucket);
if (aggregationAlign != null) {
args.add(Keywords.ALIGN);
args.add(aggregationAlign.getKey());
}
}
if (withLabels) {
args.add(Keywords.WITHLABELS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
*/
public enum Keywords {

RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE;
RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE, ALIGN;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package io.github.dengliming.redismodule.redistimeseries;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.redisson.client.RedisException;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -152,6 +155,76 @@ public void testRange() {
assertThat(timeSeries).isEmpty();
}

@Test
public void testAggregations() {
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();
long timestamp = System.currentTimeMillis();
String sensor = "temperature:sum";

assertThat(redisTimeSeries.incrBy(sensor, 10, timestamp, new TimeSeriesOptions()
.retentionTime(6000L)
.unCompressed()).longValue()).isEqualTo(timestamp);

assertThat(redisTimeSeries.incrBy(sensor, 20, timestamp + 1).longValue()).isEqualTo(timestamp + 1);

List<Value> values = redisTimeSeries.range(sensor, timestamp, timestamp + 1);
assertThat(values).hasSize(2);

List<Value> sum = redisTimeSeries.range(sensor, timestamp, timestamp + 10, new RangeOptions()
.aggregationType(Aggregation.SUM, 60000L));

assertThat(sum).hasSize(1);
// Timestamp trimmed to timeBucket (minutes)
assertThat(sum.get(0).getTimestamp()).isEqualTo(Instant.ofEpochMilli(timestamp).truncatedTo(ChronoUnit.MINUTES).toEpochMilli());
assertThat(sum.get(0).getValue()).isEqualTo(40.0d);
}

@Ignore("Only for redis timeseries > 1.6.0")
public void testAggregationsAlign() {
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();
long from = 1L;
long to = 10000L;
long timeBucket = 3000L;

String sensor = "temperature:sum:align";
TimeSeriesOptions options = new TimeSeriesOptions().unCompressed();

/*
TS: 1000 | 2000 | 3000 | 4000
VAL: 1 | 1 | 10 | 10
BT : -------------------|-----
*/

assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(1000L, 1.0d)), options).longValue()).isEqualTo(1000L);
assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(2000L, 1.0d)), options).longValue()).isEqualTo(2000L);
assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(3000L, 10.0d)), options).longValue()).isEqualTo(3000L);
assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(4000L, 10.0d)), options).longValue()).isEqualTo(4000L);

List<Value> values = redisTimeSeries.range(sensor, from, to);
assertThat(values).hasSize(4);

List<Value> start = redisTimeSeries.range(sensor, from, to, new RangeOptions()
.aggregationType(Aggregation.SUM, timeBucket, Align.START));

assertThat(start).hasSize(2);
assertThat(start.get(0).getTimestamp()).isEqualTo(from);
assertThat(start.get(0).getValue()).isEqualTo(12.0d);

assertThat(start.get(1).getTimestamp()).isEqualTo(from + timeBucket);
assertThat(start.get(1).getValue()).isEqualTo(10.0d);

List<Value> end = redisTimeSeries.range(sensor, from, to, new RangeOptions()
.aggregationType(Aggregation.SUM, timeBucket, Align.END));

assertThat(end).hasSize(2);

assertThat(end.get(0).getTimestamp()).isEqualTo(1000L);
assertThat(end.get(0).getValue()).isEqualTo(12.0d);

assertThat(end.get(1).getTimestamp()).isEqualTo(4000L);
assertThat(end.get(1).getValue()).isEqualTo(10.0d);
}

@Test
public void testQueryIndex() {
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();
Expand Down

0 comments on commit 54d197c

Please sign in to comment.