From a5c404e11bf3324abc083dc433686744aa689df3 Mon Sep 17 00:00:00 2001 From: Weyoss Date: Tue, 8 Feb 2022 11:44:10 +0300 Subject: [PATCH] Implement TimeSeriesWorker --- examples/javascript/config.js | 1 - .../consumer/consumer-message-rate-writer.ts | 31 +--- .../consumer-acknowledged-time-series.ts | 13 +- .../consumer-dead-lettered-time-series.ts | 13 +- .../global-acknowledged-time-series.ts | 26 +-- .../global-dead-lettered-time-series.ts | 26 +-- .../queue-acknowledged-time-series.ts | 22 +-- .../queue-dead-lettered-time-series.ts | 22 +-- .../producer/producer-message-rate-writer.ts | 28 +--- .../global-published-time-series.ts | 26 +-- .../queue-published-time-series.ts | 22 +-- src/system/app/queue-manager/queue-manager.ts | 2 +- src/system/common/configuration.ts | 2 +- src/system/common/message-rate-writer.ts | 15 +- .../common/time-series/hash-time-series.ts | 66 +++----- .../time-series/sorted-set-time-series.ts | 3 +- src/system/common/time-series/time-series.ts | 78 +++++---- src/system/workers/time-series.worker.ts | 153 ++++++++++++++++++ tests/common.ts | 39 +++++ tests/shared-components/test00003.test.ts | 16 +- tests/shared-components/test00004.test.ts | 27 ++-- tests/shared-components/test00005.test.ts | 16 +- tests/shared-components/test00006.test.ts | 13 +- tests/shared-components/test00007.test.ts | 26 ++- tests/shared-components/test00008.test.ts | 19 +-- tests/shared-components/test00009.test.ts | 16 +- tests/shared-components/test00016.test.ts | 17 ++ types/index.ts | 11 ++ 28 files changed, 390 insertions(+), 359 deletions(-) create mode 100644 src/system/workers/time-series.worker.ts create mode 100644 tests/shared-components/test00016.test.ts diff --git a/examples/javascript/config.js b/examples/javascript/config.js index 8ec07f6f..ff29e8d7 100755 --- a/examples/javascript/config.js +++ b/examples/javascript/config.js @@ -1,5 +1,4 @@ 'use strict'; -const path = require('path'); module.exports = { namespace: 'ns1', diff --git a/src/system/app/consumer/consumer-message-rate-writer.ts b/src/system/app/consumer/consumer-message-rate-writer.ts index 347fc5d8..584eb837 100644 --- a/src/system/app/consumer/consumer-message-rate-writer.ts +++ b/src/system/app/consumer/consumer-message-rate-writer.ts @@ -12,7 +12,6 @@ import { TRedisClientMulti, } from '../../../../types'; import { MessageRateWriter } from '../../common/message-rate-writer'; -import { waterfall } from '../../lib/async'; export class ConsumerMessageRateWriter extends MessageRateWriter { protected redisClient: RedisClient; @@ -41,35 +40,27 @@ export class ConsumerMessageRateWriter extends MessageRateWriter cb()); else cb(); } - - onQuit(cb: ICallback): void { - waterfall( - [ - (cb: ICallback) => this.acknowledgedTimeSeries.quit(cb), - (cb: ICallback) => this.queueAcknowledgedRateTimeSeries.quit(cb), - (cb: ICallback) => this.globalAcknowledgedRateTimeSeries.quit(cb), - (cb: ICallback) => this.deadLetteredTimeSeries.quit(cb), - (cb: ICallback) => this.queueDeadLetteredTimeSeries.quit(cb), - (cb: ICallback) => this.globalDeadLetteredTimeSeries.quit(cb), - ], - cb, - ); - } } diff --git a/src/system/app/consumer/consumer-time-series/consumer-acknowledged-time-series.ts b/src/system/app/consumer/consumer-time-series/consumer-acknowledged-time-series.ts index 4b5648f8..412d9c1f 100644 --- a/src/system/app/consumer/consumer-time-series/consumer-acknowledged-time-series.ts +++ b/src/system/app/consumer/consumer-time-series/consumer-acknowledged-time-series.ts @@ -7,18 +7,13 @@ export const ConsumerAcknowledgedTimeSeries = ( redisClient: RedisClient, consumerId: string, queue: TQueueParams, - isMaster?: boolean, ) => { const { keyRateConsumerAcknowledged } = redisKeys.getQueueConsumerKeys( queue, consumerId, ); - return new SortedSetTimeSeries( - redisClient, - keyRateConsumerAcknowledged, - 30, - undefined, - undefined, - isMaster, - ); + return new SortedSetTimeSeries(redisClient, { + key: keyRateConsumerAcknowledged, + expireAfterInSeconds: 30, + }); }; diff --git a/src/system/app/consumer/consumer-time-series/consumer-dead-lettered-time-series.ts b/src/system/app/consumer/consumer-time-series/consumer-dead-lettered-time-series.ts index efed3ef9..efd9b3c2 100644 --- a/src/system/app/consumer/consumer-time-series/consumer-dead-lettered-time-series.ts +++ b/src/system/app/consumer/consumer-time-series/consumer-dead-lettered-time-series.ts @@ -7,18 +7,13 @@ export const ConsumerDeadLetteredTimeSeries = ( redisClient: RedisClient, consumerId: string, queue: TQueueParams, - isMaster?: boolean, ) => { const { keyRateConsumerDeadLettered } = redisKeys.getQueueConsumerKeys( queue, consumerId, ); - return new SortedSetTimeSeries( - redisClient, - keyRateConsumerDeadLettered, - 30, - undefined, - undefined, - isMaster, - ); + return new SortedSetTimeSeries(redisClient, { + key: keyRateConsumerDeadLettered, + expireAfterInSeconds: 30, + }); }; diff --git a/src/system/app/consumer/consumer-time-series/global-acknowledged-time-series.ts b/src/system/app/consumer/consumer-time-series/global-acknowledged-time-series.ts index a665ae09..2aed9dbd 100644 --- a/src/system/app/consumer/consumer-time-series/global-acknowledged-time-series.ts +++ b/src/system/app/consumer/consumer-time-series/global-acknowledged-time-series.ts @@ -2,23 +2,11 @@ import { RedisClient } from '../../../common/redis-client/redis-client'; import { redisKeys } from '../../../common/redis-keys/redis-keys'; import { HashTimeSeries } from '../../../common/time-series/hash-time-series'; -export const GlobalAcknowledgedTimeSeries = ( - redisClient: RedisClient, - isMaster?: boolean, -) => { - const { - keyRateGlobalAcknowledged, - keyRateGlobalAcknowledgedIndex, - keyLockRateGlobalAcknowledged, - } = redisKeys.getMainKeys(); - return new HashTimeSeries( - redisClient, - keyRateGlobalAcknowledged, - keyRateGlobalAcknowledgedIndex, - keyLockRateGlobalAcknowledged, - undefined, - undefined, - undefined, - isMaster, - ); +export const GlobalAcknowledgedTimeSeries = (redisClient: RedisClient) => { + const { keyRateGlobalAcknowledged, keyRateGlobalAcknowledgedIndex } = + redisKeys.getMainKeys(); + return new HashTimeSeries(redisClient, { + key: keyRateGlobalAcknowledged, + indexKey: keyRateGlobalAcknowledgedIndex, + }); }; diff --git a/src/system/app/consumer/consumer-time-series/global-dead-lettered-time-series.ts b/src/system/app/consumer/consumer-time-series/global-dead-lettered-time-series.ts index 05eeae8e..62fa3ca8 100644 --- a/src/system/app/consumer/consumer-time-series/global-dead-lettered-time-series.ts +++ b/src/system/app/consumer/consumer-time-series/global-dead-lettered-time-series.ts @@ -2,23 +2,11 @@ import { RedisClient } from '../../../common/redis-client/redis-client'; import { redisKeys } from '../../../common/redis-keys/redis-keys'; import { HashTimeSeries } from '../../../common/time-series/hash-time-series'; -export const GlobalDeadLetteredTimeSeries = ( - redisClient: RedisClient, - isMaster?: boolean, -) => { - const { - keyRateGlobalDeadLettered, - keyRateGlobalDeadLetteredIndex, - keyLockRateGlobalDeadLettered, - } = redisKeys.getMainKeys(); - return new HashTimeSeries( - redisClient, - keyRateGlobalDeadLettered, - keyRateGlobalDeadLetteredIndex, - keyLockRateGlobalDeadLettered, - undefined, - undefined, - undefined, - isMaster, - ); +export const GlobalDeadLetteredTimeSeries = (redisClient: RedisClient) => { + const { keyRateGlobalDeadLettered, keyRateGlobalDeadLetteredIndex } = + redisKeys.getMainKeys(); + return new HashTimeSeries(redisClient, { + key: keyRateGlobalDeadLettered, + indexKey: keyRateGlobalDeadLetteredIndex, + }); }; diff --git a/src/system/app/consumer/consumer-time-series/queue-acknowledged-time-series.ts b/src/system/app/consumer/consumer-time-series/queue-acknowledged-time-series.ts index 675ba7d2..fb903eb6 100644 --- a/src/system/app/consumer/consumer-time-series/queue-acknowledged-time-series.ts +++ b/src/system/app/consumer/consumer-time-series/queue-acknowledged-time-series.ts @@ -6,21 +6,11 @@ import { TQueueParams } from '../../../../../types'; export const QueueAcknowledgedTimeSeries = ( redisClient: RedisClient, queue: TQueueParams, - isMaster?: boolean, ) => { - const { - keyRateQueueAcknowledged, - keyRateQueueAcknowledgedIndex, - keyLockRateQueueAcknowledged, - } = redisKeys.getQueueKeys(queue); - return new HashTimeSeries( - redisClient, - keyRateQueueAcknowledged, - keyRateQueueAcknowledgedIndex, - keyLockRateQueueAcknowledged, - undefined, - undefined, - undefined, - isMaster, - ); + const { keyRateQueueAcknowledged, keyRateQueueAcknowledgedIndex } = + redisKeys.getQueueKeys(queue); + return new HashTimeSeries(redisClient, { + key: keyRateQueueAcknowledged, + indexKey: keyRateQueueAcknowledgedIndex, + }); }; diff --git a/src/system/app/consumer/consumer-time-series/queue-dead-lettered-time-series.ts b/src/system/app/consumer/consumer-time-series/queue-dead-lettered-time-series.ts index d799826a..a5e248b7 100644 --- a/src/system/app/consumer/consumer-time-series/queue-dead-lettered-time-series.ts +++ b/src/system/app/consumer/consumer-time-series/queue-dead-lettered-time-series.ts @@ -6,21 +6,11 @@ import { TQueueParams } from '../../../../../types'; export const QueueDeadLetteredTimeSeries = ( redisClient: RedisClient, queue: TQueueParams, - isMaster?: boolean, ) => { - const { - keyRateQueueDeadLettered, - keyRateQueueDeadLetteredIndex, - keyLockRateQueueDeadLettered, - } = redisKeys.getQueueKeys(queue); - return new HashTimeSeries( - redisClient, - keyRateQueueDeadLettered, - keyRateQueueDeadLetteredIndex, - keyLockRateQueueDeadLettered, - undefined, - undefined, - undefined, - isMaster, - ); + const { keyRateQueueDeadLettered, keyRateQueueDeadLetteredIndex } = + redisKeys.getQueueKeys(queue); + return new HashTimeSeries(redisClient, { + key: keyRateQueueDeadLettered, + indexKey: keyRateQueueDeadLetteredIndex, + }); }; diff --git a/src/system/app/producer/producer-message-rate-writer.ts b/src/system/app/producer/producer-message-rate-writer.ts index 6662894b..0ccfabbd 100644 --- a/src/system/app/producer/producer-message-rate-writer.ts +++ b/src/system/app/producer/producer-message-rate-writer.ts @@ -7,7 +7,6 @@ import { import { RedisClient } from '../../common/redis-client/redis-client'; import { QueuePublishedTimeSeries } from './producer-time-series/queue-published-time-series'; import { GlobalPublishedTimeSeries } from './producer-time-series/global-published-time-series'; -import { each, waterfall } from '../../lib/async'; export class ProducerMessageRateWriter extends MessageRateWriter { protected redisClient: RedisClient; @@ -22,10 +21,7 @@ export class ProducerMessageRateWriter extends MessageRateWriter): void { - waterfall( - [ - (cb: ICallback) => this.globalPublishedTimeSeries.quit(cb), - (cb: ICallback) => { - each( - this.queuePublishedTimeSeries, - (item, key, done) => { - item.quit(() => done()); - }, - cb, - ); - }, - ], - cb, - ); - } } diff --git a/src/system/app/producer/producer-time-series/global-published-time-series.ts b/src/system/app/producer/producer-time-series/global-published-time-series.ts index e286fd19..4e90d492 100644 --- a/src/system/app/producer/producer-time-series/global-published-time-series.ts +++ b/src/system/app/producer/producer-time-series/global-published-time-series.ts @@ -2,23 +2,11 @@ import { RedisClient } from '../../../common/redis-client/redis-client'; import { redisKeys } from '../../../common/redis-keys/redis-keys'; import { HashTimeSeries } from '../../../common/time-series/hash-time-series'; -export const GlobalPublishedTimeSeries = ( - redisClient: RedisClient, - isMaster?: boolean, -) => { - const { - keyRateGlobalPublished, - keyRateGlobalInputIndex, - keyLockRateGlobalPublished, - } = redisKeys.getMainKeys(); - return new HashTimeSeries( - redisClient, - keyRateGlobalPublished, - keyRateGlobalInputIndex, - keyLockRateGlobalPublished, - undefined, - undefined, - undefined, - isMaster, - ); +export const GlobalPublishedTimeSeries = (redisClient: RedisClient) => { + const { keyRateGlobalPublished, keyRateGlobalInputIndex } = + redisKeys.getMainKeys(); + return new HashTimeSeries(redisClient, { + key: keyRateGlobalPublished, + indexKey: keyRateGlobalInputIndex, + }); }; diff --git a/src/system/app/producer/producer-time-series/queue-published-time-series.ts b/src/system/app/producer/producer-time-series/queue-published-time-series.ts index a4ca0ac2..4f26882c 100644 --- a/src/system/app/producer/producer-time-series/queue-published-time-series.ts +++ b/src/system/app/producer/producer-time-series/queue-published-time-series.ts @@ -6,21 +6,11 @@ import { TQueueParams } from '../../../../../types'; export const QueuePublishedTimeSeries = ( redisClient: RedisClient, queue: TQueueParams, - isMaster?: boolean, ) => { - const { - keyRateQueuePublished, - keyRateQueuePublishedIndex, - keyLockRateQueuePublished, - } = redisKeys.getQueueKeys(queue); - return new HashTimeSeries( - redisClient, - keyRateQueuePublished, - keyRateQueuePublishedIndex, - keyLockRateQueuePublished, - undefined, - undefined, - undefined, - isMaster, - ); + const { keyRateQueuePublished, keyRateQueuePublishedIndex } = + redisKeys.getQueueKeys(queue); + return new HashTimeSeries(redisClient, { + key: keyRateQueuePublished, + indexKey: keyRateQueuePublishedIndex, + }); }; diff --git a/src/system/app/queue-manager/queue-manager.ts b/src/system/app/queue-manager/queue-manager.ts index d34de0a4..0ecbd585 100644 --- a/src/system/app/queue-manager/queue-manager.ts +++ b/src/system/app/queue-manager/queue-manager.ts @@ -275,7 +275,7 @@ export const queueManager = { redisClient.smembers(keyNamespaces, (err, reply) => { if (err) cb(err); else if (!reply) cb(new EmptyCallbackReplyError()); - else cb(null, reply ?? []); + else cb(null, reply); }); }, diff --git a/src/system/common/configuration.ts b/src/system/common/configuration.ts index 3091f6de..05dad018 100644 --- a/src/system/common/configuration.ts +++ b/src/system/common/configuration.ts @@ -27,7 +27,7 @@ export function setConfiguration(configuration: IConfig = {}): IRequiredConfig { throw new ConfigurationError( 'Configuration has been already initialized. Possible configuration overwrite.', ); - currentConfig = merge(currentConfig ?? {}, defaultConfig, configuration); + currentConfig = merge({}, defaultConfig, configuration); redisKeys.setNamespace(currentConfig.namespace); return currentConfig; } diff --git a/src/system/common/message-rate-writer.ts b/src/system/common/message-rate-writer.ts index 51b2e4f1..08a0d0fd 100644 --- a/src/system/common/message-rate-writer.ts +++ b/src/system/common/message-rate-writer.ts @@ -1,7 +1,6 @@ import { Ticker } from './ticker/ticker'; import { ICallback, TMessageRateFields } from '../../../types'; import { events } from './events'; -import { waterfall } from '../lib/async'; export abstract class MessageRateWriter< TRateFields extends TMessageRateFields, @@ -32,22 +31,12 @@ export abstract class MessageRateWriter< cb: ICallback, ): void; - abstract onQuit(cb: ICallback): void; - onRateTick = (ts: number, rates: TRateFields): void => { this.rateStack.push([ts, rates]); }; quit(cb: ICallback): void { - waterfall( - [ - (cb: ICallback) => { - this.writerTicker.on(events.DOWN, cb); - this.writerTicker.quit(); - }, - (cb: ICallback) => this.onQuit(cb), - ], - cb, - ); + this.writerTicker.on(events.DOWN, cb); + this.writerTicker.quit(); } } diff --git a/src/system/common/time-series/hash-time-series.ts b/src/system/common/time-series/hash-time-series.ts index e66b953c..aaa15cb4 100644 --- a/src/system/common/time-series/hash-time-series.ts +++ b/src/system/common/time-series/hash-time-series.ts @@ -1,36 +1,19 @@ import { TimeSeries } from './time-series'; import { ICallback, + IHashTimeSeriesParams, TRedisClientMulti, TTimeSeriesRange, } from '../../../../types'; import { ArgumentError } from '../errors/argument.error'; import { RedisClient } from '../redis-client/redis-client'; -import { LockManager } from '../lock-manager/lock-manager'; -export class HashTimeSeries extends TimeSeries { +export class HashTimeSeries extends TimeSeries { protected indexKey: string; - protected lockManager: LockManager; - constructor( - redisClient: RedisClient, - key: string, - indexKey: string, - lockKey: string, - expireAfterInSeconds = 0, - retentionTimeInSeconds = 24 * 60 * 60, - windowSizeInSeconds = 60, - isMaster = false, - ) { - super( - redisClient, - key, - expireAfterInSeconds, - retentionTimeInSeconds, - windowSizeInSeconds, - isMaster, - ); - this.lockManager = new LockManager(redisClient, lockKey, 60000); + constructor(redisClient: RedisClient, params: IHashTimeSeriesParams) { + super(redisClient, params); + const { indexKey } = params; this.indexKey = indexKey; } @@ -55,29 +38,22 @@ export class HashTimeSeries extends TimeSeries { } cleanUp(cb: ICallback): void { - const process = (cb: ICallback) => { - const ts = TimeSeries.getCurrentTimestamp(); - const max = ts - this.retentionTime; - this.redisClient.zrangebyscore( - this.indexKey, - '-inf', - `${max}`, - (err, reply) => { - if (err) cb(err); - else if (reply && reply.length) { - const multi = this.redisClient.multi(); - multi.zrem(this.indexKey, ...reply); - multi.hdel(this.key, ...reply); - this.redisClient.execMulti(multi, (err) => cb(err)); - } else cb(); - }, - ); - }; - this.lockManager.acquireLock((err, locked) => { - if (err) cb(err); - else if (locked) process(cb); - else cb(); - }); + const ts = TimeSeries.getCurrentTimestamp(); + const max = ts - this.retentionTime; + this.redisClient.zrangebyscore( + this.indexKey, + '-inf', + `${max}`, + (err, reply) => { + if (err) cb(err); + else if (reply && reply.length) { + const multi = this.redisClient.multi(); + multi.zrem(this.indexKey, ...reply); + multi.hdel(this.key, ...reply); + this.redisClient.execMulti(multi, (err) => cb(err)); + } else cb(); + }, + ); } getRange(from: number, to: number, cb: ICallback): void { diff --git a/src/system/common/time-series/sorted-set-time-series.ts b/src/system/common/time-series/sorted-set-time-series.ts index 1900b0fc..3db2b32c 100644 --- a/src/system/common/time-series/sorted-set-time-series.ts +++ b/src/system/common/time-series/sorted-set-time-series.ts @@ -1,12 +1,13 @@ import { ICallback, TRedisClientMulti, + TTimeSeriesParams, TTimeSeriesRange, } from '../../../../types'; import { TimeSeries } from './time-series'; import { ArgumentError } from '../errors/argument.error'; -export class SortedSetTimeSeries extends TimeSeries { +export class SortedSetTimeSeries extends TimeSeries { add( ts: number, value: number, diff --git a/src/system/common/time-series/time-series.ts b/src/system/common/time-series/time-series.ts index b0a0da08..7d877de4 100644 --- a/src/system/common/time-series/time-series.ts +++ b/src/system/common/time-series/time-series.ts @@ -1,46 +1,65 @@ import { ICallback, TRedisClientMulti, + TTimeSeriesParams, TTimeSeriesRange, } from '../../../../types'; import { RedisClient } from '../redis-client/redis-client'; -import { Ticker } from '../ticker/ticker'; -import { events } from '../events'; import { EventEmitter } from 'events'; +import { ArgumentError } from '../errors/argument.error'; -export abstract class TimeSeries extends EventEmitter { - private readonly ticker: Ticker | null = null; - protected retentionTime: number; +export abstract class TimeSeries< + TimeSeriesParams extends TTimeSeriesParams, +> extends EventEmitter { + protected retentionTime = 24 * 60 * 60; + protected expireAfter = 0; + protected windowSize = 60; protected redisClient: RedisClient; - protected expireAfter: number | null = null; protected key: string; - protected windowSize: number; - constructor( - redisClient: RedisClient, - key: string, - expireAfterInSeconds = 0, - retentionTimeInSeconds = 24 * 60 * 60, - windowSizeInSeconds = 60, - isMaster = false, - ) { + constructor(redisClient: RedisClient, params: TimeSeriesParams) { super(); - this.retentionTime = retentionTimeInSeconds; + const { + key, + expireAfterInSeconds, + retentionTimeInSeconds, + windowSizeInSeconds, + } = params; this.redisClient = redisClient; - this.expireAfter = expireAfterInSeconds; - this.windowSize = windowSizeInSeconds; + if (expireAfterInSeconds !== undefined) + this.setExpiration(expireAfterInSeconds); + if (retentionTimeInSeconds !== undefined) + this.setRetentionTime(retentionTimeInSeconds); + if (windowSizeInSeconds !== undefined) + this.setWindowSize(windowSizeInSeconds); this.key = key; - if (isMaster) { - this.ticker = new Ticker(() => this.onTick(), 10000); - this.ticker.nextTick(); + } + + setWindowSize(windowSizeInSeconds: number): void { + if (windowSizeInSeconds < 1) { + throw new ArgumentError( + 'Expected a positive integer value in milliseconds >= 1', + ); + } + this.windowSize = windowSizeInSeconds; + } + + setRetentionTime(retentionTimeInSeconds: number): void { + if (retentionTimeInSeconds < 1) { + throw new ArgumentError( + 'Expected a positive integer value in milliseconds >= 1', + ); } + this.retentionTime = retentionTimeInSeconds; } - private onTick(): void { - this.cleanUp((err) => { - if (err) this.emit(events.ERROR, err); - else this.ticker?.nextTick(); - }); + setExpiration(expireAfterInSeconds: number): void { + if (expireAfterInSeconds < 0) { + throw new ArgumentError( + 'Expected a positive integer value in milliseconds', + ); + } + this.expireAfter = expireAfterInSeconds; } abstract add( @@ -63,13 +82,6 @@ export abstract class TimeSeries extends EventEmitter { this.getRange(min, max, cb); } - quit(cb: ICallback): void { - if (this.ticker) { - this.ticker.on(events.DOWN, cb); - this.ticker.quit(); - } else cb(); - } - static getCurrentTimestamp(): number { return Math.ceil(Date.now() / 1000); } diff --git a/src/system/workers/time-series.worker.ts b/src/system/workers/time-series.worker.ts new file mode 100644 index 00000000..77f80442 --- /dev/null +++ b/src/system/workers/time-series.worker.ts @@ -0,0 +1,153 @@ +import { RedisClient } from '../common/redis-client/redis-client'; +import { + ICallback, + IConsumerWorkerParameters, + TQueueParams, +} from '../../../types'; +import { EmptyCallbackReplyError } from '../common/errors/empty-callback-reply.error'; +import { Worker } from '../common/worker/worker'; +import { setConfiguration } from '../common/configuration'; +import { queueManager } from '../app/queue-manager/queue-manager'; +import { eachOf, waterfall } from '../lib/async'; +import { QueueAcknowledgedTimeSeries } from '../app/consumer/consumer-time-series/queue-acknowledged-time-series'; +import { QueueDeadLetteredTimeSeries } from '../app/consumer/consumer-time-series/queue-dead-lettered-time-series'; +import { QueuePublishedTimeSeries } from '../app/producer/producer-time-series/queue-published-time-series'; +import { GlobalAcknowledgedTimeSeries } from '../app/consumer/consumer-time-series/global-acknowledged-time-series'; +import { GlobalPublishedTimeSeries } from '../app/producer/producer-time-series/global-published-time-series'; +import { GlobalDeadLetteredTimeSeries } from '../app/consumer/consumer-time-series/global-dead-lettered-time-series'; +import { consumerQueues } from '../app/consumer/consumer-queues'; +import { ConsumerAcknowledgedTimeSeries } from '../app/consumer/consumer-time-series/consumer-acknowledged-time-series'; +import { ConsumerDeadLetteredTimeSeries } from '../app/consumer/consumer-time-series/consumer-dead-lettered-time-series'; + +export class TimeSeriesWorker extends Worker { + protected enabled: boolean; + + constructor( + redisClient: RedisClient, + params: IConsumerWorkerParameters, + managed: boolean, + ) { + super(redisClient, params, managed); + this.enabled = params.config.monitor.enabled; + } + + protected cleanUpGlobalTimeSeries = (cb: ICallback): void => { + waterfall( + [ + (cb: ICallback) => + GlobalAcknowledgedTimeSeries(this.redisClient).cleanUp(cb), + (cb: ICallback) => + GlobalPublishedTimeSeries(this.redisClient).cleanUp(cb), + (cb: ICallback) => + GlobalDeadLetteredTimeSeries(this.redisClient).cleanUp(cb), + ], + cb, + ); + }; + + protected cleanUpQueueTimeSeries = ( + queues: TQueueParams[], + cb: ICallback, + ): void => { + eachOf( + queues, + (queue, _, done) => { + waterfall( + [ + (cb: ICallback) => + QueueAcknowledgedTimeSeries(this.redisClient, queue).cleanUp(cb), + (cb: ICallback) => + QueueDeadLetteredTimeSeries(this.redisClient, queue).cleanUp(cb), + (cb: ICallback) => + QueuePublishedTimeSeries(this.redisClient, queue).cleanUp(cb), + ], + done, + ); + }, + cb, + ); + }; + + protected cleanUpConsumerTimeSeries = ( + consumerIds: string[], + queue: TQueueParams, + cb: ICallback, + ): void => { + eachOf( + consumerIds, + (consumerId, _, done) => { + waterfall( + [ + (cb: ICallback) => + ConsumerAcknowledgedTimeSeries( + this.redisClient, + consumerId, + queue, + ).cleanUp(cb), + (cb: ICallback) => + ConsumerDeadLetteredTimeSeries( + this.redisClient, + consumerId, + queue, + ).cleanUp(cb), + ], + done, + ); + }, + cb, + ); + }; + + work = (cb: ICallback): void => { + if (this.enabled) { + waterfall( + [ + (cb: ICallback) => this.cleanUpGlobalTimeSeries(cb), + (cb: ICallback) => + queueManager.getQueues(this.redisClient, (err, reply) => { + if (err) cb(err); + else { + const queues = reply ?? []; + this.cleanUpQueueTimeSeries(queues, (err) => { + if (err) cb(err); + else cb(null, queues); + }); + } + }), + (queues: TQueueParams[], cb: ICallback) => { + eachOf( + queues, + (queue, _, done) => { + consumerQueues.getQueueConsumerIds( + this.redisClient, + queue, + (err, reply) => { + if (err) done(err); + else { + const consumerIds = reply ?? []; + this.cleanUpConsumerTimeSeries(consumerIds, queue, done); + } + }, + ); + }, + cb, + ); + }, + ], + cb, + ); + } else cb(); + }; +} + +export default TimeSeriesWorker; + +process.on('message', (payload: string) => { + const params: IConsumerWorkerParameters = JSON.parse(payload); + setConfiguration(params.config); + RedisClient.getNewInstance((err, client) => { + if (err) throw err; + else if (!client) throw new EmptyCallbackReplyError(); + else new TimeSeriesWorker(client, params, false).run(); + }); +}); diff --git a/tests/common.ts b/tests/common.ts index 703b498d..c56a0232 100644 --- a/tests/common.ts +++ b/tests/common.ts @@ -25,7 +25,9 @@ import { WebsocketOnlineStreamWorker } from '../src/monitor-server/workers/webso import { TimeSeriesResponseBodyDTO } from '../src/monitor-server/controllers/common/dto/time-series/time-series-response.DTO'; import * as configuration from '../src/system/common/configuration'; import ScheduleWorker from '../src/system/workers/schedule.worker'; +import TimeSeriesWorker from '../src/system/workers/time-series.worker'; import { merge } from 'lodash'; +import { queueManager } from '../src/system/app/queue-manager/queue-manager'; export const config = configuration.setConfiguration(testConfig); @@ -65,6 +67,7 @@ let websocketHeartbeatStreamWorker: WebsocketHeartbeatStreamWorker | null = null; let websocketOnlineStreamWorker: WebsocketOnlineStreamWorker | null = null; let scheduleWorker: ScheduleWorker | null = null; +let timeSeriesWorker: TimeSeriesWorker | null = null; let messageManager: MessageManager | null = null; let queueManagerFrontend: QueueManagerFrontend | null = null; @@ -100,6 +103,7 @@ export async function shutdown(): Promise { await stopWebsocketHeartbeatStreamWorker(); await stopWebsocketOnlineStreamWorker(); await stopScheduleWorker(); + await stopTimeSeriesWorker(); if (messageManager) { const m = promisifyAll(messageManager); @@ -238,6 +242,33 @@ export async function stopScheduleWorker(): Promise { }); } +export async function startTimeSeriesWorker(): Promise { + if (!timeSeriesWorker) { + const redisClient = await getRedisInstance(); + timeSeriesWorker = new TimeSeriesWorker( + redisClient, + { + timeout: 1000, + config, + consumerId: 'abc', + }, + false, + ); + timeSeriesWorker.run(); + } +} + +export async function stopTimeSeriesWorker(): Promise { + return new Promise((resolve) => { + if (timeSeriesWorker) { + timeSeriesWorker.quit(() => { + timeSeriesWorker = null; + resolve(); + }); + } else resolve(); + }); +} + export async function startWebsocketRateStreamWorker(): Promise { if (!websocketRateStreamWorker) { const redisClient = await getRedisInstance(); @@ -488,3 +519,11 @@ export async function validateTimeSeriesFrom(url: string) { value: 0, }); } + +export async function setUpMessageQueue(queue: TQueueParams = defaultQueue) { + const qm = promisifyAll(queueManager); + const redisClient = await getRedisInstance(); + const multi = redisClient.multi(); + qm.setUpMessageQueue(multi, queue); + await redisClient.execMultiAsync(multi); +} diff --git a/tests/shared-components/test00003.test.ts b/tests/shared-components/test00003.test.ts index c7a93bda..0b19f490 100644 --- a/tests/shared-components/test00003.test.ts +++ b/tests/shared-components/test00003.test.ts @@ -6,16 +6,11 @@ import { TimeSeries } from '../../src/system/common/time-series/time-series'; test('HashTimeSeries: Case 1', async () => { const redisClient = await getRedisInstance(); const hashTimeSeries = promisifyAll( - new HashTimeSeries( - redisClient, - 'my-key', - 'my-key-index', - 'my-key-lock', - 20, - undefined, - undefined, - true, - ), + new HashTimeSeries(redisClient, { + key: 'my-key', + indexKey: 'my-key-index', + expireAfterInSeconds: 20, + }), ); const multi = redisClient.multi(); const ts = TimeSeries.getCurrentTimestamp(); @@ -31,5 +26,4 @@ test('HashTimeSeries: Case 1', async () => { expect(range[23]).toEqual({ timestamp: ts + 3, value: 56 }); expect(range[30]).toEqual({ timestamp: ts + 10, value: 70 }); expect(range[39]).toEqual({ timestamp: ts + 20 - 1, value: 0 }); - await hashTimeSeries.quitAsync(); }); diff --git a/tests/shared-components/test00004.test.ts b/tests/shared-components/test00004.test.ts index 0b5e421b..d08d075d 100644 --- a/tests/shared-components/test00004.test.ts +++ b/tests/shared-components/test00004.test.ts @@ -6,16 +6,12 @@ import { TimeSeries } from '../../src/system/common/time-series/time-series'; test('HashTimeSeries: Case 2', async () => { const redisClient = await getRedisInstance(); const hashTimeSeries = promisifyAll( - new HashTimeSeries( - redisClient, - 'my-key', - 'my-key-index', - 'my-key-lock', - undefined, - 5, - 60, - true, - ), + new HashTimeSeries(redisClient, { + key: 'key-123', + indexKey: 'key-index-123', + retentionTimeInSeconds: 5, + windowSizeInSeconds: 60, + }), ); const multi = redisClient.multi(); const ts = TimeSeries.getCurrentTimestamp(); @@ -26,12 +22,9 @@ test('HashTimeSeries: Case 2', async () => { const range1 = await hashTimeSeries.getRangeAsync(ts, ts + 10); - // Time series cleanup ticker run once each 10s. - // It needs 2 rounds to clean up data (saved 10 seconds time range). - // In each round, data older than 5 seconds from run time, get deleted. - // Waiting extra 10 seconds to exclude errors due to javascript time drift. - // In the end, we expect that range2 is filled with 0 values (after 30s all data should be expired and deleted) - await delay(30000); + // extra 5s to exclude js time drift related errors + await delay(15000); + await hashTimeSeries.cleanUpAsync(); const range2 = await hashTimeSeries.getRangeAsync(ts, ts + 10); expect(range1.length).toEqual(10); @@ -57,6 +50,4 @@ test('HashTimeSeries: Case 2', async () => { expect(range2[7]).toEqual({ timestamp: ts + 7, value: 0 }); expect(range2[8]).toEqual({ timestamp: ts + 8, value: 0 }); expect(range2[9]).toEqual({ timestamp: ts + 9, value: 0 }); - - await hashTimeSeries.quitAsync(); }); diff --git a/tests/shared-components/test00005.test.ts b/tests/shared-components/test00005.test.ts index 10ebccee..fbce7d84 100644 --- a/tests/shared-components/test00005.test.ts +++ b/tests/shared-components/test00005.test.ts @@ -6,16 +6,11 @@ import { TimeSeries } from '../../src/system/common/time-series/time-series'; test('HashTimeSeries: Case 3', async () => { const redisClient = await getRedisInstance(); const hashTimeSeries = promisifyAll( - new HashTimeSeries( - redisClient, - 'my-key', - 'my-key-index', - 'my-key-lock', - undefined, - 5, - undefined, - true, - ), + new HashTimeSeries(redisClient, { + key: 'my-key', + indexKey: 'my-key-index', + retentionTimeInSeconds: 5, + }), ); const ts = TimeSeries.getCurrentTimestamp(); for (let i = 0; i < 10; i += 1) { @@ -29,5 +24,4 @@ test('HashTimeSeries: Case 3', async () => { expect(range1[1]).toEqual({ timestamp: ts + 1, value: 71 }); expect(range1[5]).toEqual({ timestamp: ts + 5, value: 105 }); expect(range1[9]).toEqual({ timestamp: ts + 9, value: 9 }); - await hashTimeSeries.quitAsync(); }); diff --git a/tests/shared-components/test00006.test.ts b/tests/shared-components/test00006.test.ts index fd801213..6fc258ea 100644 --- a/tests/shared-components/test00006.test.ts +++ b/tests/shared-components/test00006.test.ts @@ -6,14 +6,10 @@ import { SortedSetTimeSeries } from '../../src/system/common/time-series/sorted- test('SortedSetTimeSeries: Case 1', async () => { const redisClient = await getRedisInstance(); const sortedSetTimeSeries = promisifyAll( - new SortedSetTimeSeries( - redisClient, - 'my-key', - undefined, - 20, - undefined, - true, - ), + new SortedSetTimeSeries(redisClient, { + key: 'my-key', + retentionTimeInSeconds: 20, + }), ); const multi = redisClient.multi(); const ts = TimeSeries.getCurrentTimestamp(); @@ -29,5 +25,4 @@ test('SortedSetTimeSeries: Case 1', async () => { expect(range[23]).toEqual({ timestamp: ts + 3, value: 56 }); expect(range[30]).toEqual({ timestamp: ts + 10, value: 70 }); expect(range[39]).toEqual({ timestamp: ts + 20 - 1, value: 0 }); - await sortedSetTimeSeries.quitAsync(); }); diff --git a/tests/shared-components/test00007.test.ts b/tests/shared-components/test00007.test.ts index 8a14a28b..c88677b5 100644 --- a/tests/shared-components/test00007.test.ts +++ b/tests/shared-components/test00007.test.ts @@ -1,19 +1,16 @@ import { delay, promisifyAll } from 'bluebird'; -import { getRedisInstance } from '../common'; +import { getRedisInstance, startTimeSeriesWorker } from '../common'; import { TimeSeries } from '../../src/system/common/time-series/time-series'; import { SortedSetTimeSeries } from '../../src/system/common/time-series/sorted-set-time-series'; test('SortedSetTimeSeries: Case 2', async () => { + await startTimeSeriesWorker(); const redisClient = await getRedisInstance(); const sortedSetTimeSeries = promisifyAll( - new SortedSetTimeSeries( - redisClient, - 'my-key', - undefined, - 5, - undefined, - true, - ), + new SortedSetTimeSeries(redisClient, { + key: 'my-key', + retentionTimeInSeconds: 5, + }), ); const ts = TimeSeries.getCurrentTimestamp(); for (let i = 0; i < 10; i += 1) { @@ -22,12 +19,9 @@ test('SortedSetTimeSeries: Case 2', async () => { const range1 = await sortedSetTimeSeries.getRangeAsync(ts, ts + 10); - // Time series cleanup ticker run once each 10s. - // It needs 2 rounds to clean up data (saved 10 seconds time range). - // In each round, data older than 5 seconds from run time, get deleted. - // Waiting extra 10 seconds to exclude errors due to javascript time drift. - // In the end, we expect that range2 is filled with 0 values (after 30s all data should be expired and deleted) - await delay(30000); + // extra 5s to exclude js time drift related errors + await delay(15000); + await sortedSetTimeSeries.cleanUpAsync(); const range2 = await sortedSetTimeSeries.getRangeAsync(ts, ts + 10); expect(range1.length).toEqual(10); @@ -53,6 +47,4 @@ test('SortedSetTimeSeries: Case 2', async () => { expect(range2[7]).toEqual({ timestamp: ts + 7, value: 0 }); expect(range2[8]).toEqual({ timestamp: ts + 8, value: 0 }); expect(range2[9]).toEqual({ timestamp: ts + 9, value: 0 }); - - await sortedSetTimeSeries.quitAsync(); }); diff --git a/tests/shared-components/test00008.test.ts b/tests/shared-components/test00008.test.ts index 168391be..fbdb3e85 100644 --- a/tests/shared-components/test00008.test.ts +++ b/tests/shared-components/test00008.test.ts @@ -6,16 +6,12 @@ import { TimeSeries } from '../../src/system/common/time-series/time-series'; test('HashTimeSeries: Case 4', async () => { const redisClient = await getRedisInstance(); const hashTimeSeries = promisifyAll( - new HashTimeSeries( - redisClient, - 'my-key', - 'my-key-index', - 'my-key-lock', - 5, // data will expire after 5s of inactivity - 20, - undefined, - true, - ), + new HashTimeSeries(redisClient, { + key: 'my-key', + indexKey: 'my-key-index', + expireAfterInSeconds: 5, // data will expire after 5s of inactivity + retentionTimeInSeconds: 20, + }), ); const ts = TimeSeries.getCurrentTimestamp(); for (let i = 0; i < 10; i += 1) { @@ -25,6 +21,7 @@ test('HashTimeSeries: Case 4', async () => { // Retention time is 20 but as data will be expired after 5s // we just wait 10 seconds. After which, we expect time series data filled with 0 values. await delay(10000); + await hashTimeSeries.cleanUpAsync(); const range1 = await hashTimeSeries.getRangeAsync(ts, ts + 10); expect(range1.length).toEqual(10); @@ -38,6 +35,4 @@ test('HashTimeSeries: Case 4', async () => { expect(range1[7]).toEqual({ timestamp: ts + 7, value: 0 }); expect(range1[8]).toEqual({ timestamp: ts + 8, value: 0 }); expect(range1[9]).toEqual({ timestamp: ts + 9, value: 0 }); - - await hashTimeSeries.quitAsync(); }); diff --git a/tests/shared-components/test00009.test.ts b/tests/shared-components/test00009.test.ts index c6f4ae71..2e95ce0a 100644 --- a/tests/shared-components/test00009.test.ts +++ b/tests/shared-components/test00009.test.ts @@ -6,14 +6,11 @@ import { SortedSetTimeSeries } from '../../src/system/common/time-series/sorted- test('SortedSetTimeSeries: Case 3', async () => { const redisClient = await getRedisInstance(); const sortedSetSeries = promisifyAll( - new SortedSetTimeSeries( - redisClient, - 'my-key', - 5, // data will expire after 5s of inactivity - 20, - undefined, - true, - ), + new SortedSetTimeSeries(redisClient, { + key: 'my-key', + expireAfterInSeconds: 5, // data will expire after 5s of inactivity + retentionTimeInSeconds: 20, + }), ); const ts = TimeSeries.getCurrentTimestamp(); @@ -24,6 +21,7 @@ test('SortedSetTimeSeries: Case 3', async () => { // Retention time is 20 but as data will be expired after 5s // we just wait 10 seconds. After which, we expect time series data filled with 0 values. await delay(10000); + await sortedSetSeries.cleanUpAsync(); const range1 = await sortedSetSeries.getRangeAsync(ts, ts + 10); expect(range1.length).toEqual(10); @@ -37,6 +35,4 @@ test('SortedSetTimeSeries: Case 3', async () => { expect(range1[7]).toEqual({ timestamp: ts + 7, value: 0 }); expect(range1[8]).toEqual({ timestamp: ts + 8, value: 0 }); expect(range1[9]).toEqual({ timestamp: ts + 9, value: 0 }); - - await sortedSetSeries.quitAsync(); }); diff --git a/tests/shared-components/test00016.test.ts b/tests/shared-components/test00016.test.ts new file mode 100644 index 00000000..ccdac2f2 --- /dev/null +++ b/tests/shared-components/test00016.test.ts @@ -0,0 +1,17 @@ +import { delay } from 'bluebird'; +import { + defaultQueue, + getConsumer, + setUpMessageQueue, + startTimeSeriesWorker, +} from '../common'; + +test('TimeSeriesWorker', async () => { + await startTimeSeriesWorker(); + await delay(5000); + await setUpMessageQueue(defaultQueue); + await delay(5000); + const consumer = await getConsumer({ queue: defaultQueue }); + await consumer.runAsync(); + await delay(5000); +}); diff --git a/types/index.ts b/types/index.ts index 80c73327..ba713625 100644 --- a/types/index.ts +++ b/types/index.ts @@ -342,3 +342,14 @@ export type TMessageMetadataJSON = { nextScheduledDelay: number; nextRetryDelay: number; }; + +export type TTimeSeriesParams = { + key: string; + expireAfterInSeconds?: number; + retentionTimeInSeconds?: number; + windowSizeInSeconds?: number; +}; + +export interface IHashTimeSeriesParams extends TTimeSeriesParams { + indexKey: string; +}