Skip to content

Commit

Permalink
feat(queue): add getCountsPerPriority method (#2746)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jun 12, 2024
1 parent 66ffd9d commit 0376dcc
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 35 deletions.
7 changes: 7 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,13 @@ declare namespace Bull {
*/
close(doNotWaitJobs?: boolean): Promise<void>;

/**
* Returns the number of jobs per priority.
*/
getCountsPerPriority(priorities: number[]): Promise<{
[index: string]: number;
}>;

/**
* Returns a promise that will return the job instance associated with the jobId parameter.
* If the specified job cannot be located, the promise callback parameter will be set to null.
Expand Down
25 changes: 25 additions & 0 deletions lib/commands/getCountsPerPriority-2.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
--[[
Get counts per provided states
Input:
KEYS[1] wait key
KEYS[2] priority key
ARGV[1...] priorities
]]
local rcall = redis.call
local results = {}
local waitKey = KEYS[1]
local prioritizedKey = KEYS[2]

for i = 1, #ARGV do
local priority = tonumber(ARGV[i])
if priority == 0 then
results[#results+1] = rcall("LLEN", waitKey) - rcall("ZCARD", prioritizedKey)
else
results[#results+1] = rcall("ZCOUNT", prioritizedKey,
priority, priority)
end
end

return results
16 changes: 16 additions & 0 deletions lib/getters.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,29 @@

const _ = require('lodash');
const Job = require('./job');
const scripts = require('./scripts');

module.exports = function(Queue) {
Queue.prototype.getJob = async function(jobId) {
await this.isReady();
return Job.fromId(this, jobId);
};

Queue.prototype.getCountsPerPriority = async function(priorities) {
const uniquePriorities = [...new Set(priorities)];
const responses = await scripts.getCountsPerPriority(
this,
uniquePriorities
);

const counts = {};
responses.forEach((res, index) => {
counts[`${uniquePriorities[index]}`] = res || 0;
});

return counts;
};

Queue.prototype._commandByType = function(types, count, callback) {
return _.map(types, type => {
type = type === 'waiting' ? 'wait' : type; // alias
Expand Down
23 changes: 21 additions & 2 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ const scripts = {
return result;
},

getCountsPerPriorityArgs(queue, priorities) {
const keys = [queue.keys.wait, queue.keys.priority];

const args = priorities;

return keys.concat(args);
},

async getCountsPerPriority(queue, priorities) {
const client = await queue.client;
const args = this.getCountsPerPriorityArgs(queue, priorities);

return client.getCountsPerPriority(args);
},

moveToActive(queue, jobId) {
const queueKeys = queue.keys;
const keys = [queueKeys.wait, queueKeys.active, queueKeys.priority];
Expand Down Expand Up @@ -253,9 +268,13 @@ const scripts = {
case -2:
return new Error('Missing lock for job ' + jobId + ' ' + command);
case -3:
return new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
return new Error(
`Job ${jobId} is not in the ${state} state. ${command}`
);
case -6:
return new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`);
return new Error(
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`
);
}
},

Expand Down
23 changes: 23 additions & 0 deletions test/test_getters.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,29 @@ describe('Jobs getters', function() {
queue.add({ baz: 'qux' });
});

describe('.getCountsPerPriority', () => {
it('returns job counts per priority', done => {
const jobsArray = Array.from(Array(42).keys()).map(index => ({
name: 'test',
data: {},
opts: {
priority: index % 4
}
}));
queue.addBulk(jobsArray).then(() => {
queue.getCountsPerPriority([0, 1, 2, 3]).then(counts => {
expect(counts).to.be.eql({
'0': 11,
'1': 11,
'2': 10,
'3': 10
});
done();
});
});
});
});

it('fails jobs that exceed their specified timeout', done => {
queue.process((job, jobDone) => {
setTimeout(jobDone, 200);
Expand Down
64 changes: 32 additions & 32 deletions test/test_job.js
Original file line number Diff line number Diff line change
Expand Up @@ -759,35 +759,35 @@ describe('Job', () => {

it('applies stacktrace limit on failure', () => {
const stackTraceLimit = 1;
return Job.create(queue, { foo: 'bar' }, { stackTraceLimit, attempts: 2 }).then(
job => {
return job
.isFailed()
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job
.moveToFailed(new Error('test error'), true)
.then(() => {
return job.isFailed().then(isFailed => {
expect(isFailed).to.be(true);
expect(job.stacktrace).not.be(null);
expect(job.stacktrace.length).to.be(stackTraceLimit);
});
});
return Job.create(
queue,
{ foo: 'bar' },
{ stackTraceLimit, attempts: 2 }
).then(job => {
return job
.isFailed()
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true).then(() => {
return job.isFailed().then(isFailed => {
expect(isFailed).to.be(true);
expect(job.stacktrace).not.be(null);
expect(job.stacktrace.length).to.be(stackTraceLimit);
});
});
}
);
});
});
});
});

Expand Down Expand Up @@ -914,8 +914,8 @@ describe('Job', () => {
})
.then(state => {
expect(state).to.be('completed');
return client.zrem(queue.toKey('completed'), job.id).then(()=>{
return client.lpush(queue.toKey('active'), job.id)
return client.zrem(queue.toKey('completed'), job.id).then(() => {
return client.lpush(queue.toKey('active'), job.id);
});
})
.then(() => {
Expand All @@ -930,8 +930,8 @@ describe('Job', () => {
})
.then(state => {
expect(state).to.be('delayed');
return client.zrem(queue.toKey('delayed'), job.id).then(()=>{
return client.lpush(queue.toKey('active'), job.id)
return client.zrem(queue.toKey('delayed'), job.id).then(() => {
return client.lpush(queue.toKey('active'), job.id);
});
})
.then(() => {
Expand Down
2 changes: 1 addition & 1 deletion test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,7 @@ describe('Queue', () => {
});

queue2
.add({ foo: 'bar' }, {removeOnFail: true})
.add({ foo: 'bar' }, { removeOnFail: true })
.then(job => {
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
Expand Down

0 comments on commit 0376dcc

Please sign in to comment.