Skip to content

Commit

Permalink
fix: promote jobs to the right "list" when paused
Browse files Browse the repository at this point in the history
  • Loading branch information
ersoma authored Dec 15, 2020
1 parent 9aba722 commit d3df615
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ export class Scripts {
const keys = [
queue.keys.delayed,
queue.keys.wait,
queue.keys.paused,
queue.keys.priority,
queue.keys.events,
];
Expand Down
15 changes: 10 additions & 5 deletions src/commands/promote-4.lua → src/commands/promote-5.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
Input:
KEYS[1] 'delayed'
KEYS[2] 'wait'
KEYS[3] 'priority'
KEYS[4] 'event stream'
KEYS[3] 'paused'
KEYS[4] 'priority'
KEYS[5] 'event stream'
ARGV[1] queue.toKey('')
ARGV[2] jobId
Expand All @@ -21,13 +22,17 @@ if redis.call("ZREM", KEYS[1], jobId) == 1 then

local target = KEYS[2];

if rcall("EXISTS", KEYS[3]) == 1 then
target = KEYS[3]
end

if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[3], priority, jobId)
local count = rcall("ZCOUNT", KEYS[3], 0, priority)
rcall("ZADD", KEYS[4], priority, jobId)
local count = rcall("ZCOUNT", KEYS[4], 0, priority)

local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count - 1))
Expand All @@ -39,7 +44,7 @@ if redis.call("ZREM", KEYS[1], jobId) == 1 then
end

-- Emit waiting event (wait..ing@token)
rcall("XADD", KEYS[4], "*", "event", "waiting", "jobId", jobId, "prev", "delayed");
rcall("XADD", KEYS[5], "*", "event", "waiting", "jobId", jobId, "prev", "delayed");

rcall("HSET", ARGV[1] .. jobId, "delay", 0)

Expand Down
18 changes: 18 additions & 0 deletions src/test/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,24 @@ describe('Job', function() {
throw new Error('Job should not be promoted!');
} catch (err) {}
});

it('should promote delayed job to the right queue if queue is paused', async () => {
const normalJob = await queue.add('normal', { foo: 'bar' });
const delayedJob = await queue.add(
'delayed',
{ foo: 'bar' },
{ delay: 1 },
);

await queue.pause();
await delayedJob.promote();
await queue.resume();

const waitingJobsCount = await queue.getWaitingCount();
expect(waitingJobsCount).to.be.equal(2);
const delayedJobsNewState = await delayedJob.getState();
expect(delayedJobsNewState).to.be.equal('waiting');
});
});

// TODO:
Expand Down

0 comments on commit d3df615

Please sign in to comment.