Skip to content

Commit

Permalink
feat: add support for adding jobs in bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Aug 26, 2019
1 parent 4bba23a commit b62bddc
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 3 deletions.
30 changes: 29 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,38 @@ export class Job {

job.id = await job.addJob(queue.client);

logger('Job added', job.id);
return job;
}

static async createBulk(
queue: QueueBase,
jobs: {
name: string;
data: any;
opts?: JobsOpts;
}[],
) {
await queue.waitUntilReady();

const jobInstances = jobs.map(
job => new Job(queue, job.name, job.data, job.opts),
);

const multi = queue.client.multi();

for (const job of jobInstances) {
job.addJob(<IORedis.Redis>(multi as unknown));
}

const result = (await multi.exec()) as [null | Error, string][];
result.forEach((res, index: number) => {
const [err, id] = res;
jobInstances[index].id = id;
});

return jobInstances;
}

static fromJSON(queue: QueueBase, json: any, jobId?: string) {
const data = JSON.parse(json.data || '{}');
const opts = JSON.parse(json.opts || '{}');
Expand Down
17 changes: 17 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ export class Queue extends QueueGetters {
}
}

/**
Adds an array of jobs to the queue.
@method add
@param jobs: [] The array of jobs to add to the queue. Each job is defined by 3
properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
*/
async addBulk(jobs: { name: string; data: any; opts?: JobsOpts }[]) {
return Job.createBulk(
this,
jobs.map(job => ({
name: job.name,
data: job.data,
opts: { ...job.opts, ...this.jobsOpts },
})),
);
}

/**
Pauses the processing of this queue globally.
Expand Down
4 changes: 2 additions & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class Scripts {
}

static addJob(
client: IORedis.Redis,
client: any,
queue: QueueBase,
job: JobJson,
opts: JobsOpts,
Expand Down Expand Up @@ -59,7 +59,7 @@ export class Scripts {
];

keys = keys.concat(<string[]>args);
return (<any>client).addJob(keys);
return client.addJob(keys);
}

static pause(queue: Queue, pause: boolean) {
Expand Down
59 changes: 59 additions & 0 deletions src/test/test_bulk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { Queue, Worker, Job } from '@src/classes';
import { expect } from 'chai';
import IORedis from 'ioredis';
import { beforeEach, describe, it } from 'mocha';
import { v4 } from 'node-uuid';

describe('bulk jobs', () => {
let queue: Queue;
let queueName: string;
let client: IORedis.Redis;

beforeEach(function() {
client = new IORedis();
return client.flushdb();
});

beforeEach(async function() {
queueName = 'test-' + v4();
queue = new Queue(queueName);
});

afterEach(async function() {
await queue.close();
return client.quit();
});

it('should process jobs', async () => {
const name = 'test';
let processor;
const processing = new Promise(resolve => [
(processor = async (job: Job) => {
if (job.data.idx === 0) {
expect(job.data.foo).to.be.equal('bar');
} else {
expect(job.data.idx).to.be.equal(1);
expect(job.data.foo).to.be.equal('baz');
resolve();
}
}),
]);
const worker = new Worker(queueName, processor);
await worker.waitUntilReady();

const jobs = await queue.addBulk([
{ name, data: { idx: 0, foo: 'bar' } },
{ name, data: { idx: 1, foo: 'baz' } },
]);
expect(jobs).to.have.length(2);

expect(jobs[0].id).to.be.ok;
expect(jobs[0].data.foo).to.be.eql('bar');
expect(jobs[1].id).to.be.ok;
expect(jobs[1].data.foo).to.be.eql('baz');

await processing;

await worker.close();
});
});

0 comments on commit b62bddc

Please sign in to comment.