Skip to content

Commit

Permalink
Improve offline consumers handling & message recovery strategy
Browse files Browse the repository at this point in the history
The HeartbeatMonitorWorker should trigger message recovery when handling offline consumers. We don't need anymore a dedicated worker to inspect processing queues.
  • Loading branch information
weyoss committed Feb 12, 2022
1 parent b73bd37 commit 4072939
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 227 deletions.
16 changes: 6 additions & 10 deletions src/system/app/consumer/consumer-heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { redisKeys } from '../../common/redis-keys/redis-keys';
import { EventEmitter } from 'events';
import { EmptyCallbackReplyError } from '../../common/errors/empty-callback-reply.error';
import { InvalidCallbackReplyError } from '../../common/errors/invalid-callback-reply.error';
import { consumerQueues } from './consumer-queues';
import { Consumer } from './consumer';
import { each, waterfall } from '../../lib/async';

Expand Down Expand Up @@ -250,25 +249,22 @@ export class ConsumerHeartbeat extends EventEmitter {
}

static handleExpiredHeartbeatIds(
client: RedisClient,
redisClient: RedisClient,
consumerIds: string[],
cb: ICallback<void>,
): void {
if (consumerIds.length) {
const { keyHeartbeats, keyHeartbeatInstanceIds } =
redisKeys.getMainKeys();
const multi = client.multi();
const multi = redisClient.multi();
each(
consumerIds,
(consumerId, _, done) => {
consumerQueues.getConsumerQueues(client, consumerId, (err, reply) => {
consumerQueues.removeConsumer(multi, consumerId, reply ?? []);
multi.hdel(keyHeartbeats, consumerId);
multi.zrem(keyHeartbeatInstanceIds, consumerId);
done();
});
multi.hdel(keyHeartbeats, consumerId);
multi.zrem(keyHeartbeatInstanceIds, consumerId);
Consumer.handleOfflineConsumer(multi, redisClient, consumerId, done);
},
() => client.execMulti(multi, (err) => cb(err)),
() => redisClient.execMulti(multi, (err) => cb(err)),
);
} else cb();
}
Expand Down
134 changes: 123 additions & 11 deletions src/system/app/consumer/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
THeartbeatRegistryPayload,
TQueueParams,
TUnaryFunction,
TRedisClientMulti,
EMessageUnacknowledgedCause,
} from '../../../../types';
import { ConsumerMessageRate } from './consumer-message-rate';
import { events } from '../../common/events';
Expand All @@ -23,7 +25,12 @@ import { consumerQueues } from './consumer-queues';
import { GenericError } from '../../common/errors/generic.error';
import { queueManager } from '../queue-manager/queue-manager';
import { WorkerPool } from '../../common/worker/worker-runner/worker-pool';
import { each } from '../../lib/async';
import { each, waterfall } from '../../lib/async';
import { Message } from '../message/message';
import { broker } from '../../common/broker';
import { TimeSeries } from '../../common/time-series/time-series';
import { GlobalDeadLetteredTimeSeries } from './consumer-time-series/global-dead-lettered-time-series';
import { QueueDeadLetteredTimeSeries } from './consumer-time-series/queue-dead-lettered-time-series';

export class Consumer extends Base {
private heartbeat: ConsumerHeartbeat | null = null;
Expand Down Expand Up @@ -357,16 +364,6 @@ export class Consumer extends Base {
}));
}

static isAlive(
redisClient: RedisClient,
queue: TQueueParams,
id: string,
cb: ICallback<boolean>,
): void {
const { keyQueueConsumers } = redisKeys.getQueueConsumerKeys(queue, id);
consumerQueues.exists(redisClient, keyQueueConsumers, id, cb);
}

static getOnlineConsumers(
redisClient: RedisClient,
queue: TQueueParams,
Expand All @@ -391,4 +388,119 @@ export class Consumer extends Base {
): void {
consumerQueues.countQueueConsumers(redisClient, queue, cb);
}

static handleOfflineConsumer(
multi: TRedisClientMulti, // pending transaction
redisClient: RedisClient, // for readonly operations
consumerId: string,
cb: ICallback<void>,
): void {
waterfall(
[
(cb: ICallback<TQueueParams[]>) =>
consumerQueues.getConsumerQueues(redisClient, consumerId, cb),
(queues: TQueueParams[], cb: ICallback<TQueueParams[]>) => {
each(
queues,
(queue, _, done) => {
Consumer.handleConsumerProcessingQueue(
multi,
redisClient,
consumerId,
queue,
done,
);
},
(err) => {
if (err) cb(err);
else cb(null, queues);
},
);
},
(queues: TQueueParams[], cb: ICallback<void>) => {
if (queues.length)
consumerQueues.removeConsumer(multi, consumerId, queues);
cb();
},
],
cb,
);
}

protected static fetchProcessingQueueMessage(
redisClient: RedisClient,
consumerId: string,
keyQueueProcessing: string,
cb: ICallback<Message>,
): void {
redisClient.lrange(
keyQueueProcessing,
0,
0,
(err?: Error | null, range?: string[] | null) => {
if (err) cb(err);
else if (range && range.length) {
const msg = Message.createFromMessage(range[0]);
cb(null, msg);
} else cb();
},
);
}

protected static handleConsumerProcessingQueue(
multi: TRedisClientMulti,
redisClient: RedisClient,
consumerId: string,
queue: TQueueParams,
cb: ICallback<void>,
): void {
const { keyQueueProcessing } = redisKeys.getQueueConsumerKeys(
queue,
consumerId,
);
waterfall(
[
(cb: ICallback<void>) => {
Consumer.fetchProcessingQueueMessage(
redisClient,
consumerId,
keyQueueProcessing,
(err, msg) => {
if (err) cb(err);
else if (msg) {
broker.retry(
multi,
keyQueueProcessing,
msg,
EMessageUnacknowledgedCause.RECOVERY,
(err, deadLetteredCause) => {
if (err) cb(err);
else if (deadLetteredCause) {
const timestamp = TimeSeries.getCurrentTimestamp();
GlobalDeadLetteredTimeSeries(redisClient).add(
timestamp,
1,
multi,
);
QueueDeadLetteredTimeSeries(redisClient, queue).add(
timestamp,
1,
multi,
);
cb();
} else cb();
},
);
} else cb();
},
);
},
(cb: ICallback<void>) => {
queueManager.deleteProcessingQueue(multi, queue, keyQueueProcessing);
cb();
},
],
cb,
);
}
}
5 changes: 1 addition & 4 deletions src/system/app/queue-manager/queue-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,15 @@ export const queueManager = {
},

deleteProcessingQueue(
redisClient: RedisClient,
multi: TRedisClientMulti,
queue: TQueueParams,
processingQueue: string,
cb: ICallback<void>,
): void {
const multi = redisClient.multi();
const { keyProcessingQueues, keyQueueProcessingQueues } =
redisKeys.getQueueKeys(queue);
multi.srem(keyProcessingQueues, processingQueue);
multi.hdel(keyQueueProcessingQueues, processingQueue);
multi.del(processingQueue);
multi.exec((err) => cb(err));
},

///
Expand Down
68 changes: 42 additions & 26 deletions src/system/common/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { ELuaScriptName } from './redis-client/lua-scripts';
import { getConfiguration } from './configuration';

function deadLetterMessage(
redisClient: RedisClient,
mixed: RedisClient | TRedisClientMulti,
message: Message,
keyQueueProcessing: string,
unacknowledgedCause: EMessageUnacknowledgedCause,
Expand All @@ -20,43 +20,59 @@ function deadLetterMessage(
): void {
const queue = message.getRequiredQueue();
const { storeMessages } = getConfiguration();
if (storeMessages) {
const { keyQueueDL } = redisKeys.getQueueKeys(queue);
redisClient.lpoprpush(keyQueueProcessing, keyQueueDL, (err) => {
if (err) cb(err);
else cb();
});
const { keyQueueDL } = redisKeys.getQueueKeys(queue);
if (mixed instanceof RedisClient) {
if (storeMessages) {
mixed.lpoprpush(keyQueueProcessing, keyQueueDL, (err) => {
if (err) cb(err);
else cb();
});
} else {
mixed.rpop(keyQueueProcessing, (err) => cb(err));
}
} else {
redisClient.rpop(keyQueueProcessing, (err) => cb(err));
if (storeMessages) {
mixed.lpop(keyQueueProcessing);
mixed.rpush(keyQueueDL, JSON.stringify(message));
} else {
mixed.rpop(keyQueueProcessing);
}
cb();
}
}

function delayUnacknowledgedMessageBeforeRequeuing(
redisClient: RedisClient,
function delayMessageBeforeRequeuing(
mixed: RedisClient | TRedisClientMulti,
message: Message,
keyQueueProcessing: string,
unacknowledgedCause: EMessageUnacknowledgedCause,
cb: ICallback<void>,
): void {
const queue = message.getRequiredQueue();
const { keyDelayedMessages } = redisKeys.getQueueKeys(queue);
redisClient.rpoplpush(keyQueueProcessing, keyDelayedMessages, (err) =>
cb(err),
);
if (mixed instanceof RedisClient) {
mixed.rpoplpush(keyQueueProcessing, keyDelayedMessages, (err) => cb(err));
} else {
mixed.rpoplpush(keyQueueProcessing, keyDelayedMessages);
cb();
}
}

function requeueUnacknowledgedMessage(
redisClient: RedisClient,
function requeueMessage(
mixed: RedisClient | TRedisClientMulti,
message: Message,
keyQueueProcessing: string,
unacknowledgedCause: EMessageUnacknowledgedCause,
cb: ICallback<void>,
): void {
const queue = message.getRequiredQueue();
const { keyRequeueMessages } = redisKeys.getQueueKeys(queue);
redisClient.rpoplpush(keyQueueProcessing, keyRequeueMessages, (err) =>
cb(err),
);
if (mixed instanceof RedisClient) {
mixed.rpoplpush(keyQueueProcessing, keyRequeueMessages, (err) => cb(err));
} else {
mixed.rpoplpush(keyQueueProcessing, keyRequeueMessages);
cb();
}
}

export const broker = {
Expand Down Expand Up @@ -133,7 +149,7 @@ export const broker = {
},

retry(
redisClient: RedisClient,
mixed: RedisClient | TRedisClientMulti,
processingQueue: string,
message: Message,
unacknowledgedCause: EMessageUnacknowledgedCause,
Expand All @@ -145,7 +161,7 @@ export const broker = {
) {
//consumer.emit(events.MESSAGE_EXPIRED, message);
deadLetterMessage(
redisClient,
mixed,
message,
processingQueue,
unacknowledgedCause,
Expand All @@ -159,7 +175,7 @@ export const broker = {
// Only non-periodic messages are re-queued. Failure of periodic messages is ignored since such
// messages are periodically scheduled for delivery.
deadLetterMessage(
redisClient,
mixed,
message,
processingQueue,
unacknowledgedCause,
Expand All @@ -172,16 +188,16 @@ export const broker = {
} else if (!message.hasRetryThresholdExceeded()) {
const delay = message.getRetryDelay();
if (delay) {
delayUnacknowledgedMessageBeforeRequeuing(
redisClient,
delayMessageBeforeRequeuing(
mixed,
message,
processingQueue,
unacknowledgedCause,
(err) => cb(err),
);
} else {
requeueUnacknowledgedMessage(
redisClient,
requeueMessage(
mixed,
message,
processingQueue,
unacknowledgedCause,
Expand All @@ -190,7 +206,7 @@ export const broker = {
}
} else {
deadLetterMessage(
redisClient,
mixed,
message,
processingQueue,
unacknowledgedCause,
Expand Down
Loading

0 comments on commit 4072939

Please sign in to comment.