Skip to content

Commit

Permalink
Implement TimeSeriesWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Feb 8, 2022
1 parent 28d0be0 commit a5c404e
Show file tree
Hide file tree
Showing 28 changed files with 390 additions and 359 deletions.
1 change: 0 additions & 1 deletion examples/javascript/config.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
'use strict';
const path = require('path');

module.exports = {
namespace: 'ns1',
Expand Down
31 changes: 4 additions & 27 deletions src/system/app/consumer/consumer-message-rate-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
TRedisClientMulti,
} from '../../../../types';
import { MessageRateWriter } from '../../common/message-rate-writer';
import { waterfall } from '../../lib/async';

export class ConsumerMessageRateWriter extends MessageRateWriter<IConsumerMessageRateFields> {
protected redisClient: RedisClient;
Expand Down Expand Up @@ -41,35 +40,27 @@ export class ConsumerMessageRateWriter extends MessageRateWriter<IConsumerMessag
) {
super();
this.redisClient = redisClient;
this.globalAcknowledgedRateTimeSeries = GlobalAcknowledgedTimeSeries(
redisClient,
true,
);
this.globalDeadLetteredTimeSeries = GlobalDeadLetteredTimeSeries(
redisClient,
true,
);
this.globalAcknowledgedRateTimeSeries =
GlobalAcknowledgedTimeSeries(redisClient);
this.globalDeadLetteredTimeSeries =
GlobalDeadLetteredTimeSeries(redisClient);
this.acknowledgedTimeSeries = ConsumerAcknowledgedTimeSeries(
redisClient,
consumerId,
queue,
true,
);
this.deadLetteredTimeSeries = ConsumerDeadLetteredTimeSeries(
redisClient,
consumerId,
queue,
true,
);
this.queueAcknowledgedRateTimeSeries = QueueAcknowledgedTimeSeries(
redisClient,
queue,
true,
);
this.queueDeadLetteredTimeSeries = QueueDeadLetteredTimeSeries(
redisClient,
queue,
true,
);
}

Expand Down Expand Up @@ -97,18 +88,4 @@ export class ConsumerMessageRateWriter extends MessageRateWriter<IConsumerMessag
if (multi) this.redisClient.execMulti(multi, () => cb());
else cb();
}

onQuit(cb: ICallback<void>): void {
waterfall(
[
(cb: ICallback<void>) => this.acknowledgedTimeSeries.quit(cb),
(cb: ICallback<void>) => this.queueAcknowledgedRateTimeSeries.quit(cb),
(cb: ICallback<void>) => this.globalAcknowledgedRateTimeSeries.quit(cb),
(cb: ICallback<void>) => this.deadLetteredTimeSeries.quit(cb),
(cb: ICallback<void>) => this.queueDeadLetteredTimeSeries.quit(cb),
(cb: ICallback<void>) => this.globalDeadLetteredTimeSeries.quit(cb),
],
cb,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@ export const ConsumerAcknowledgedTimeSeries = (
redisClient: RedisClient,
consumerId: string,
queue: TQueueParams,
isMaster?: boolean,
) => {
const { keyRateConsumerAcknowledged } = redisKeys.getQueueConsumerKeys(
queue,
consumerId,
);
return new SortedSetTimeSeries(
redisClient,
keyRateConsumerAcknowledged,
30,
undefined,
undefined,
isMaster,
);
return new SortedSetTimeSeries(redisClient, {
key: keyRateConsumerAcknowledged,
expireAfterInSeconds: 30,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@ export const ConsumerDeadLetteredTimeSeries = (
redisClient: RedisClient,
consumerId: string,
queue: TQueueParams,
isMaster?: boolean,
) => {
const { keyRateConsumerDeadLettered } = redisKeys.getQueueConsumerKeys(
queue,
consumerId,
);
return new SortedSetTimeSeries(
redisClient,
keyRateConsumerDeadLettered,
30,
undefined,
undefined,
isMaster,
);
return new SortedSetTimeSeries(redisClient, {
key: keyRateConsumerDeadLettered,
expireAfterInSeconds: 30,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,11 @@ import { RedisClient } from '../../../common/redis-client/redis-client';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import { HashTimeSeries } from '../../../common/time-series/hash-time-series';

export const GlobalAcknowledgedTimeSeries = (
redisClient: RedisClient,
isMaster?: boolean,
) => {
const {
keyRateGlobalAcknowledged,
keyRateGlobalAcknowledgedIndex,
keyLockRateGlobalAcknowledged,
} = redisKeys.getMainKeys();
return new HashTimeSeries(
redisClient,
keyRateGlobalAcknowledged,
keyRateGlobalAcknowledgedIndex,
keyLockRateGlobalAcknowledged,
undefined,
undefined,
undefined,
isMaster,
);
export const GlobalAcknowledgedTimeSeries = (redisClient: RedisClient) => {
const { keyRateGlobalAcknowledged, keyRateGlobalAcknowledgedIndex } =
redisKeys.getMainKeys();
return new HashTimeSeries(redisClient, {
key: keyRateGlobalAcknowledged,
indexKey: keyRateGlobalAcknowledgedIndex,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,11 @@ import { RedisClient } from '../../../common/redis-client/redis-client';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import { HashTimeSeries } from '../../../common/time-series/hash-time-series';

export const GlobalDeadLetteredTimeSeries = (
redisClient: RedisClient,
isMaster?: boolean,
) => {
const {
keyRateGlobalDeadLettered,
keyRateGlobalDeadLetteredIndex,
keyLockRateGlobalDeadLettered,
} = redisKeys.getMainKeys();
return new HashTimeSeries(
redisClient,
keyRateGlobalDeadLettered,
keyRateGlobalDeadLetteredIndex,
keyLockRateGlobalDeadLettered,
undefined,
undefined,
undefined,
isMaster,
);
export const GlobalDeadLetteredTimeSeries = (redisClient: RedisClient) => {
const { keyRateGlobalDeadLettered, keyRateGlobalDeadLetteredIndex } =
redisKeys.getMainKeys();
return new HashTimeSeries(redisClient, {
key: keyRateGlobalDeadLettered,
indexKey: keyRateGlobalDeadLetteredIndex,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,11 @@ import { TQueueParams } from '../../../../../types';
export const QueueAcknowledgedTimeSeries = (
redisClient: RedisClient,
queue: TQueueParams,
isMaster?: boolean,
) => {
const {
keyRateQueueAcknowledged,
keyRateQueueAcknowledgedIndex,
keyLockRateQueueAcknowledged,
} = redisKeys.getQueueKeys(queue);
return new HashTimeSeries(
redisClient,
keyRateQueueAcknowledged,
keyRateQueueAcknowledgedIndex,
keyLockRateQueueAcknowledged,
undefined,
undefined,
undefined,
isMaster,
);
const { keyRateQueueAcknowledged, keyRateQueueAcknowledgedIndex } =
redisKeys.getQueueKeys(queue);
return new HashTimeSeries(redisClient, {
key: keyRateQueueAcknowledged,
indexKey: keyRateQueueAcknowledgedIndex,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,11 @@ import { TQueueParams } from '../../../../../types';
export const QueueDeadLetteredTimeSeries = (
redisClient: RedisClient,
queue: TQueueParams,
isMaster?: boolean,
) => {
const {
keyRateQueueDeadLettered,
keyRateQueueDeadLetteredIndex,
keyLockRateQueueDeadLettered,
} = redisKeys.getQueueKeys(queue);
return new HashTimeSeries(
redisClient,
keyRateQueueDeadLettered,
keyRateQueueDeadLetteredIndex,
keyLockRateQueueDeadLettered,
undefined,
undefined,
undefined,
isMaster,
);
const { keyRateQueueDeadLettered, keyRateQueueDeadLetteredIndex } =
redisKeys.getQueueKeys(queue);
return new HashTimeSeries(redisClient, {
key: keyRateQueueDeadLettered,
indexKey: keyRateQueueDeadLetteredIndex,
});
};
28 changes: 2 additions & 26 deletions src/system/app/producer/producer-message-rate-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
import { RedisClient } from '../../common/redis-client/redis-client';
import { QueuePublishedTimeSeries } from './producer-time-series/queue-published-time-series';
import { GlobalPublishedTimeSeries } from './producer-time-series/global-published-time-series';
import { each, waterfall } from '../../lib/async';

export class ProducerMessageRateWriter extends MessageRateWriter<IProducerMessageRateFields> {
protected redisClient: RedisClient;
Expand All @@ -22,10 +21,7 @@ export class ProducerMessageRateWriter extends MessageRateWriter<IProducerMessag
constructor(redisClient: RedisClient) {
super();
this.redisClient = redisClient;
this.globalPublishedTimeSeries = GlobalPublishedTimeSeries(
redisClient,
true,
);
this.globalPublishedTimeSeries = GlobalPublishedTimeSeries(redisClient);
}

onUpdate(
Expand All @@ -35,9 +31,8 @@ export class ProducerMessageRateWriter extends MessageRateWriter<IProducerMessag
): void {
const { publishedRate, queuePublishedRate } = rates;
if (Object.keys(queuePublishedRate).length) {
let multi: TRedisClientMulti | null = null;
if (publishedRate) {
multi = multi ?? this.redisClient.multi();
const multi = this.redisClient.multi();
this.globalPublishedTimeSeries.add(ts, publishedRate, multi);
for (const key in queuePublishedRate) {
const value = queuePublishedRate[key];
Expand All @@ -47,7 +42,6 @@ export class ProducerMessageRateWriter extends MessageRateWriter<IProducerMessag
this.queuePublishedTimeSeries[key] = QueuePublishedTimeSeries(
this.redisClient,
{ ns, name },
true,
);
}
this.queuePublishedTimeSeries[key].add(ts, value, multi);
Expand All @@ -57,22 +51,4 @@ export class ProducerMessageRateWriter extends MessageRateWriter<IProducerMessag
} else cb();
} else cb();
}

onQuit(cb: ICallback<void>): void {
waterfall(
[
(cb: ICallback<void>) => this.globalPublishedTimeSeries.quit(cb),
(cb: ICallback<void>) => {
each(
this.queuePublishedTimeSeries,
(item, key, done) => {
item.quit(() => done());
},
cb,
);
},
],
cb,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,11 @@ import { RedisClient } from '../../../common/redis-client/redis-client';
import { redisKeys } from '../../../common/redis-keys/redis-keys';
import { HashTimeSeries } from '../../../common/time-series/hash-time-series';

export const GlobalPublishedTimeSeries = (
redisClient: RedisClient,
isMaster?: boolean,
) => {
const {
keyRateGlobalPublished,
keyRateGlobalInputIndex,
keyLockRateGlobalPublished,
} = redisKeys.getMainKeys();
return new HashTimeSeries(
redisClient,
keyRateGlobalPublished,
keyRateGlobalInputIndex,
keyLockRateGlobalPublished,
undefined,
undefined,
undefined,
isMaster,
);
export const GlobalPublishedTimeSeries = (redisClient: RedisClient) => {
const { keyRateGlobalPublished, keyRateGlobalInputIndex } =
redisKeys.getMainKeys();
return new HashTimeSeries(redisClient, {
key: keyRateGlobalPublished,
indexKey: keyRateGlobalInputIndex,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,11 @@ import { TQueueParams } from '../../../../../types';
export const QueuePublishedTimeSeries = (
redisClient: RedisClient,
queue: TQueueParams,
isMaster?: boolean,
) => {
const {
keyRateQueuePublished,
keyRateQueuePublishedIndex,
keyLockRateQueuePublished,
} = redisKeys.getQueueKeys(queue);
return new HashTimeSeries(
redisClient,
keyRateQueuePublished,
keyRateQueuePublishedIndex,
keyLockRateQueuePublished,
undefined,
undefined,
undefined,
isMaster,
);
const { keyRateQueuePublished, keyRateQueuePublishedIndex } =
redisKeys.getQueueKeys(queue);
return new HashTimeSeries(redisClient, {
key: keyRateQueuePublished,
indexKey: keyRateQueuePublishedIndex,
});
};
2 changes: 1 addition & 1 deletion src/system/app/queue-manager/queue-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ export const queueManager = {
redisClient.smembers(keyNamespaces, (err, reply) => {
if (err) cb(err);
else if (!reply) cb(new EmptyCallbackReplyError());
else cb(null, reply ?? []);
else cb(null, reply);
});
},

Expand Down
2 changes: 1 addition & 1 deletion src/system/common/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export function setConfiguration(configuration: IConfig = {}): IRequiredConfig {
throw new ConfigurationError(
'Configuration has been already initialized. Possible configuration overwrite.',
);
currentConfig = merge(currentConfig ?? {}, defaultConfig, configuration);
currentConfig = merge({}, defaultConfig, configuration);
redisKeys.setNamespace(currentConfig.namespace);
return currentConfig;
}
Expand Down
Loading

0 comments on commit a5c404e

Please sign in to comment.