From d3df615d37b1114c02eacb45f23643ee2f05374d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Soma=20Erd=C3=A9lyi?= <6604988+ersoma@users.noreply.github.com> Date: Tue, 15 Dec 2020 12:58:54 +0100 Subject: [PATCH] fix: promote jobs to the right "list" when paused --- src/classes/scripts.ts | 1 + src/commands/{promote-4.lua => promote-5.lua} | 15 ++++++++++----- src/test/test_job.ts | 18 ++++++++++++++++++ 3 files changed, 29 insertions(+), 5 deletions(-) rename src/commands/{promote-4.lua => promote-5.lua} (74%) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index f92c4fd2bd..ec0575512b 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -430,6 +430,7 @@ export class Scripts { const keys = [ queue.keys.delayed, queue.keys.wait, + queue.keys.paused, queue.keys.priority, queue.keys.events, ]; diff --git a/src/commands/promote-4.lua b/src/commands/promote-5.lua similarity index 74% rename from src/commands/promote-4.lua rename to src/commands/promote-5.lua index 5379bb4206..de5e2cab14 100644 --- a/src/commands/promote-4.lua +++ b/src/commands/promote-5.lua @@ -4,8 +4,9 @@ Input: KEYS[1] 'delayed' KEYS[2] 'wait' - KEYS[3] 'priority' - KEYS[4] 'event stream' + KEYS[3] 'paused' + KEYS[4] 'priority' + KEYS[5] 'event stream' ARGV[1] queue.toKey('') ARGV[2] jobId @@ -21,13 +22,17 @@ if redis.call("ZREM", KEYS[1], jobId) == 1 then local target = KEYS[2]; + if rcall("EXISTS", KEYS[3]) == 1 then + target = KEYS[3] + end + if priority == 0 then -- LIFO or FIFO rcall("LPUSH", target, jobId) else -- Priority add - rcall("ZADD", KEYS[3], priority, jobId) - local count = rcall("ZCOUNT", KEYS[3], 0, priority) + rcall("ZADD", KEYS[4], priority, jobId) + local count = rcall("ZCOUNT", KEYS[4], 0, priority) local len = rcall("LLEN", target) local id = rcall("LINDEX", target, len - (count - 1)) @@ -39,7 +44,7 @@ if redis.call("ZREM", KEYS[1], jobId) == 1 then end -- Emit waiting event (wait..ing@token) - rcall("XADD", KEYS[4], "*", "event", "waiting", "jobId", jobId, "prev", "delayed"); + rcall("XADD", KEYS[5], "*", "event", "waiting", "jobId", jobId, "prev", "delayed"); rcall("HSET", ARGV[1] .. jobId, "delay", 0) diff --git a/src/test/test_job.ts b/src/test/test_job.ts index 02eee0c752..31f116414e 100644 --- a/src/test/test_job.ts +++ b/src/test/test_job.ts @@ -338,6 +338,24 @@ describe('Job', function() { throw new Error('Job should not be promoted!'); } catch (err) {} }); + + it('should promote delayed job to the right queue if queue is paused', async () => { + const normalJob = await queue.add('normal', { foo: 'bar' }); + const delayedJob = await queue.add( + 'delayed', + { foo: 'bar' }, + { delay: 1 }, + ); + + await queue.pause(); + await delayedJob.promote(); + await queue.resume(); + + const waitingJobsCount = await queue.getWaitingCount(); + expect(waitingJobsCount).to.be.equal(2); + const delayedJobsNewState = await delayedJob.getState(); + expect(delayedJobsNewState).to.be.equal('waiting'); + }); }); // TODO: