Skip to content

Commit

Permalink
fix(repeatable): avoid delayed job deletion if next job already exist…
Browse files Browse the repository at this point in the history
…ed (#2778)
  • Loading branch information
roggervalf authored Sep 20, 2024
1 parent e170733 commit 6a851c1
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/commands/addRepeatableJob-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ end
local prevMillis = rcall("ZSCORE", repeatKey, customKey)
if prevMillis ~= false then
local delayedJobId = "repeat:" .. customKey .. ":" .. prevMillis
local nextDelayedJobId = repeatKey .. ":" .. customKey .. ":" .. nextMillis

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false then
if rcall("ZSCORE", delayedKey, delayedJobId) ~= false
and rcall("EXISTS", nextDelayedJobId) ~= 1 then
removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]])
rcall("ZREM", delayedKey, delayedJobId)
end
Expand Down
61 changes: 61 additions & 0 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,67 @@ describe('Job', function () {
const isCompleted = await job.isCompleted();
expect(isCompleted).to.be.equal(true);
});

describe('when re-adding same repeatable job after previous delayed one is promoted', () => {
it('keep one delayed job', async () => {
const job = await queue.add(
'test',
{ foo: 'bar' },
{
repeat: {
pattern: '0 0 7 * * *',
},
},
);
const isDelayed = await job.isDelayed();
expect(isDelayed).to.be.equal(true);

await queue.add(
'test',
{ foo: 'bar' },
{
repeat: {
pattern: '0 0 7 * * *',
},
},
);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await job.promote();
expect(job.delay).to.be.equal(0);

const worker = new Worker(queueName, null, { connection, prefix });
const currentJob1 = (await worker.getNextJob('token')) as Job;
expect(currentJob1).to.not.be.undefined;

await currentJob1.moveToCompleted('succeeded', 'token', true);
const completedCount = await queue.getCompletedCount();
const delayedCountAfterPromote = await queue.getDelayedCount();
expect(completedCount).to.be.equal(1);
expect(delayedCountAfterPromote).to.be.equal(1);

const completedCountAfterRestart = await queue.getCompletedCount();
const delayedCountAfterRestart = await queue.getDelayedCount();
expect(completedCountAfterRestart).to.be.equal(1);
expect(delayedCountAfterRestart).to.be.equal(1);

await queue.add(
'test',
{ foo: 'bar' },
{
repeat: {
pattern: '0 0 7 * * *',
},
},
);

const completedCountAfterReAddition = await queue.getCompletedCount();
const delayedCountAfterReAddition = await queue.getDelayedCount();
expect(completedCountAfterReAddition).to.be.equal(1);
expect(delayedCountAfterReAddition).to.be.equal(1);
});
});
});

describe('when queue is paused', () => {
Expand Down

0 comments on commit 6a851c1

Please sign in to comment.