Skip to content

Commit

Permalink
fix(scheduler): divide timestamp by 4096 in update set fixes #168
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored May 28, 2020
1 parent e126c25 commit 0c5db83
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 50 deletions.
10 changes: 6 additions & 4 deletions src/classes/queue-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ export class QueueScheduler extends QueueBase {

const key = this.keys.delay;
const opts = this.opts as QueueSchedulerOptions;
const delaySet = await this.updateDelaySet(Date.now());

const [nextTimestamp] = delaySet;
let streamLastId = delaySet[1] || '0-0';
const [nextTimestamp, streamId = '0-0'] = await this.updateDelaySet(
Date.now(),
);
let streamLastId = streamId;

if (nextTimestamp) {
this.nextTimestamp = nextTimestamp / 4096;

This comment has been minimized.

Copy link
@nodkz

nodkz May 29, 2020

Should we keep / 4096 here? While updateDelaySet already return it divided by 4096 in /updateDelaySet-7.lua So it looks like it divided by 4096 two times.

PS. I think that 4096 logic should be kept only in Lua scripts. While this logic is placed in Lua and JS we will have such problems.

This comment has been minimized.

Copy link
@manast

manast May 29, 2020

Author Contributor

No, we should not keep it twice, I did a search and replace but somehow I missed this division, thanks!

This comment has been minimized.

Copy link
@manast

manast May 29, 2020

Author Contributor
Expand All @@ -64,6 +65,7 @@ export class QueueScheduler extends QueueBase {
// Listen to the delay event stream from lastDelayStreamTimestamp
// Can we use XGROUPS to reduce redundancy?
const nextDelay = this.nextTimestamp - Date.now();

const blockTime = Math.round(
Math.min(opts.stalledInterval, Math.max(nextDelay, 0)),
);
Expand Down Expand Up @@ -104,7 +106,7 @@ export class QueueScheduler extends QueueBase {
if (delay <= 0) {
const [nextTimestamp, id] = await this.updateDelaySet(now);
if (nextTimestamp) {
this.nextTimestamp = nextTimestamp / 4096;
this.nextTimestamp = nextTimestamp;
streamLastId = id;
} else {
this.nextTimestamp = Number.MAX_VALUE;
Expand Down
3 changes: 2 additions & 1 deletion src/commands/updateDelaySet-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ end
local nextTimestamp = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2]
local id
if (nextTimestamp ~= nil) then
id = rcall("XADD", KEYS[7], "*", "nextTimestamp", nextTimestamp / 0x1000)
nextTimestamp = nextTimestamp / 0x1000
id = rcall("XADD", KEYS[7], "*", "nextTimestamp", nextTimestamp)
end

return {nextTimestamp, id}
91 changes: 46 additions & 45 deletions src/test/test_delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ describe('Delayed jobs', function() {
let publishHappened = false;

queueEvents.on('delayed', () => {
console.log('delayed!');
publishHappened = true;
});

Expand Down Expand Up @@ -71,6 +70,7 @@ describe('Delayed jobs', function() {
});

it('should process delayed jobs in correct order', async function() {
this.timeout(20000);
let order = 0;
const queueScheduler = new QueueScheduler(queueName);
await queueScheduler.waitUntilReady();
Expand Down Expand Up @@ -113,52 +113,53 @@ describe('Delayed jobs', function() {
await queueScheduler.close();
});

/*
it('should process delayed jobs in correct order even in case of restart', function(done) {
this.timeout(15000);
var QUEUE_NAME = 'delayed queue multiple' + uuid();
var order = 1;
queue = new Queue(QUEUE_NAME);
var fn = function(job, jobDone) {
expect(order).to.be.equal(job.data.order);
jobDone();
if (order === 4) {
queue.close().then(done, done);
}
order++;
};
Bluebird.join(
queue.add({ order: 2 }, { delay: 300 }),
queue.add({ order: 4 }, { delay: 500 }),
queue.add({ order: 1 }, { delay: 200 }),
queue.add({ order: 3 }, { delay: 400 }),
)
.then(function() {
//
// Start processing so that jobs get into the delay set.
//
queue.process(fn);
return Bluebird.delay(20);
})
.then(function() {
//We simulate a restart
// console.log('RESTART');
// return queue.close().then(function () {
// console.log('CLOSED');
// return Promise.delay(100).then(function () {
// queue = new Queue(QUEUE_NAME);
// queue.process(fn);
// });
// });
it('should process delayed jobs in correct order even in case of restart', async function() {
this.timeout(5000);

let worker: Worker;
const queueName = 'delayed queue multiple' + v4();
let order = 1;

let secondQueueScheduler: QueueScheduler;
const firstQueueScheduler = new QueueScheduler(queueName);
await firstQueueScheduler.waitUntilReady();

queue = new Queue(queueName);

const processing = new Promise((resolve, reject) => {
worker = new Worker(queueName, async (job: Job) => {
try {
expect(order).to.be.equal(job.data.order);

if (order === 1) {
await firstQueueScheduler.close();
secondQueueScheduler = new QueueScheduler(queueName);
await secondQueueScheduler.waitUntilReady();
}

if (order === 4) {
resolve();
}
order++;
} catch (err) {
reject(err);
}
});
});

await Promise.all([
queue.add('test', { order: 2 }, { delay: 500 }),
queue.add('test', { order: 4 }, { delay: 1500 }),
queue.add('test', { order: 1 }, { delay: 200 }),
queue.add('test', { order: 3 }, { delay: 800 }),
]);

await processing;

await queue.close();
worker && (await worker.close());
secondQueueScheduler && (await secondQueueScheduler.close());
});
*/

it('should process delayed jobs with exact same timestamps in correct order (FIFO)', async function() {
let order = 1;
Expand Down

0 comments on commit 0c5db83

Please sign in to comment.