From 13b2df20cf045c069b8b581751e117722681dcd4 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sun, 1 Sep 2019 10:36:47 +0200 Subject: [PATCH] fix: do not block if blockTime is zero --- src/classes/queue-scheduler.ts | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/classes/queue-scheduler.ts b/src/classes/queue-scheduler.ts index 1ebd71963b..d3c0fd1c42 100644 --- a/src/classes/queue-scheduler.ts +++ b/src/classes/queue-scheduler.ts @@ -46,9 +46,8 @@ export class QueueScheduler extends QueueBase { this.run(); } - private async run() { + private async run(streamLastId = '0-0') { const key = this.delayStreamKey(); - let streamLastId = '0-0'; // TODO: updateDelaySet should also return the last event id while (!this.closing) { // Check if at least the min stalled check time has passed. @@ -63,13 +62,18 @@ export class QueueScheduler extends QueueBase { ), ); - const data = await this.client.xread( - 'BLOCK', - blockTime, - 'STREAMS', - key, - streamLastId, - ); + let data; + if (blockTime) { + data = await this.client.xread( + 'BLOCK', + blockTime, + 'STREAMS', + key, + streamLastId, + ); + } else { + data = await this.client.xread('STREAMS', key, streamLastId); + } if (data && data[0]) { const stream = data[0]; @@ -118,7 +122,5 @@ export class QueueScheduler extends QueueBase { stalled.forEach((jobId: string) => { this.emit('stalled', jobId); }); - - console.log({ failed, stalled }); } }