Skip to content
This repository has been archived by the owner on Jan 24, 2023. It is now read-only.

Added support to specify the chunk size in Bytes of a time-series on create, add, incrby, and decrby methods #61

Merged
89 changes: 70 additions & 19 deletions redistimeseries/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self, args):
self.max_samples_per_chunk = response['maxSamplesPerChunk']
self.chunk_size = self.max_samples_per_chunk * 16 # backward compatible changes
if 'chunkSize' in response:
self.chunkSize = response['chunkSize']
self.chunk_size = response['chunkSize']

def list_to_dict(aList):
return {nativestr(aList[i][0]):nativestr(aList[i][1])
Expand Down Expand Up @@ -158,14 +158,30 @@ def appendAggregation(params, aggregation_type,
params.append('AGGREGATION')
params.extend([aggregation_type, bucket_size_msec])

def create(self, key, retention_msecs=None, uncompressed=False, labels={}):
@staticmethod
def appendChunkSize(params, chunk_size):
if chunk_size is not None:
params.extend(['CHUNK_SIZE', chunk_size])

def create(self, key, retention_msecs=None, uncompressed=False, labels={}, chunk_size=None):
"""
Creates a new time-series ``key`` with ``retention_msecs`` in
milliseconds and ``labels``.
Create a new time-series.

Args:
key: time-series key
retention_msecs: Maximum age for samples compared to last event time (in milliseconds).
If None or 0 is passed then the series is not trimmed at all.
uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
Adding this flag will keep data in an uncompressed form. Compression not only saves
memory but usually improve performance due to lower number of memory accesses
labels: Set of label-value pairs that represent metadata labels of the key.
chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples.
You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
"""
params = [key]
self.appendRetention(params, retention_msecs)
self.appendUncompressed(params, uncompressed)
self.appendChunkSize(params, chunk_size)
self.appendLabels(params, labels)

return self.redis.execute_command(self.CREATE_CMD, *params)
Expand All @@ -182,16 +198,27 @@ def alter(self, key, retention_msecs=None, labels={}):
return self.redis.execute_command(self.ALTER_CMD, *params)

def add(self, key, timestamp, value, retention_msecs=None,
uncompressed=False, labels={}):
uncompressed=False, labels={}, chunk_size=None):
"""
Appends (or creates and appends) a new ``value`` to series
``key`` with ``timestamp``. If ``key`` is created,
``retention_msecs`` and ``labels`` are applied. Return value
is timestamp of insertion.
Append (or create and append) a new sample to the series.

Args:
key: time-series key
timestamp: timestamp of the sample. * can be used for automatic timestamp (using the system clock).
value: numeric data value of the sample
retention_msecs: Maximum age for samples compared to last event time (in milliseconds).
If None or 0 is passed then the series is not trimmed at all.
uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
Adding this flag will keep data in an uncompressed form. Compression not only saves
memory but usually improve performance due to lower number of memory accesses
labels: Set of label-value pairs that represent metadata labels of the key.
chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples.
You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
"""
params = [key, timestamp, value]
self.appendRetention(params, retention_msecs)
self.appendUncompressed(params, uncompressed)
self.appendChunkSize(params, chunk_size)
self.appendLabels(params, labels)

return self.redis.execute_command(self.ADD_CMD, *params)
Expand All @@ -211,33 +238,57 @@ def madd(self, ktv_tuples):
return self.redis.execute_command(self.MADD_CMD, *params)

def incrby(self, key, value, timestamp=None, retention_msecs=None,
uncompressed=False, labels={}):
uncompressed=False, labels={}, chunk_size=None):
"""
Increases latest value in ``key`` by ``value``.
``timestamp`` can be set or system time will be used.
If ``key`` is created, ``retention_msecs`` and ``labels`` are
applied.
Increment (or create an time-series and increment) the latest sample's of a series.
This command can be used as a counter or gauge that automatically gets history as a time series.

Args:
key: time-series key
value: numeric data value of the sample
timestamp: timestamp of the sample. None can be used for automatic timestamp (using the system clock).
retention_msecs: Maximum age for samples compared to last event time (in milliseconds).
If None or 0 is passed then the series is not trimmed at all.
uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
Adding this flag will keep data in an uncompressed form. Compression not only saves
memory but usually improve performance due to lower number of memory accesses
labels: Set of label-value pairs that represent metadata labels of the key.
chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples.
You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
"""
params = [key, value]
self.appendTimestamp(params, timestamp)
self.appendRetention(params, retention_msecs)
self.appendUncompressed(params, uncompressed)
self.appendChunkSize(params, chunk_size)
self.appendLabels(params, labels)

return self.redis.execute_command(self.INCRBY_CMD, *params)

def decrby(self, key, value, timestamp=None, retention_msecs=None,
uncompressed=False, labels={}):
uncompressed=False, labels={}, chunk_size=None):
"""
Decreases latest value in ``key`` by ``value``.
``timestamp` can be set or system time will be used.
If ``key`` is created, ``retention_msecs`` and ``labels`` are
applied.
Decrement (or create an time-series and decrement) the latest sample's of a series.
This command can be used as a counter or gauge that automatically gets history as a time series.

Args:
key: time-series key
value: numeric data value of the sample
timestamp: timestamp of the sample. None can be used for automatic timestamp (using the system clock).
retention_msecs: Maximum age for samples compared to last event time (in milliseconds).
If None or 0 is passed then the series is not trimmed at all.
uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default.
Adding this flag will keep data in an uncompressed form. Compression not only saves
memory but usually improve performance due to lower number of memory accesses
labels: Set of label-value pairs that represent metadata labels of the key.
chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples.
You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
"""
params = [key, value]
self.appendTimestamp(params, timestamp)
self.appendRetention(params, retention_msecs)
self.appendUncompressed(params, uncompressed)
self.appendChunkSize(params, chunk_size)
self.appendLabels(params, labels)

return self.redis.execute_command(self.DECRBY_CMD, *params)
Expand Down
33 changes: 31 additions & 2 deletions test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def setUp(self):
global version
rts = RedisTimeSeries(port=port)
rts.redis.flushdb()
modules = rts.redis.execute_command("module","list")
modules = rts.redis.execute_command("module", "list")
if modules is not None:
for module_info in modules:
if module_info[1] == b'timeseries':
Expand All @@ -32,6 +32,15 @@ def testCreate(self):
self.assertEqual(20, info.retention_msecs)
self.assertEqual('Series', info.labels['Time'])

if version is None or version < 14000:
return

# Test for a chunk size of 128 Bytes
self.assertTrue(rts.create("time-serie-1",chunk_size=128))
info = rts.info("time-serie-1")
self.assertEqual(128, info.chunk_size)


def testAlter(self):
'''Test TS.ALTER calls'''

Expand Down Expand Up @@ -59,6 +68,14 @@ def testAdd(self):
self.assertEqual(10, info.retention_msecs)
self.assertEqual('Labs', info.labels['Redis'])

if version is None or version < 14000:
return

# Test for a chunk size of 128 Bytes on TS.ADD
self.assertTrue(rts.add("time-serie-1", 1, 10.0, chunk_size=128))
info = rts.info("time-serie-1")
self.assertEqual(128, info.chunk_size)

def testMAdd(self):
'''Test TS.MADD calls'''

Expand All @@ -83,6 +100,18 @@ def testIncrbyDecrby(self):
self.assertEqual((7, 3.75), rts.get(2))
self.assertTrue(rts.decrby(2, 1.5, timestamp=15))
self.assertEqual((15, 2.25), rts.get(2))
if version is None or version < 14000:
return

# Test for a chunk size of 128 Bytes on TS.INCRBY
self.assertTrue(rts.incrby("time-serie-1", 10, chunk_size=128))
info = rts.info("time-serie-1")
self.assertEqual(128, info.chunk_size)

# Test for a chunk size of 128 Bytes on TS.DECRBY
self.assertTrue(rts.decrby("time-serie-2", 10, chunk_size=128))
info = rts.info("time-serie-2")
self.assertEqual(128, info.chunk_size)

def testCreateRule(self):
'''Test TS.CREATERULE and TS.DELETERULE calls'''
Expand Down Expand Up @@ -273,4 +302,4 @@ def testPool(self):
self.assertEqual(1, info.total_samples)

if __name__ == '__main__':
unittest.main()
unittest.main()