From b62bddc054b266a809b4b1646558a095a276d6d1 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Mon, 26 Aug 2019 23:09:54 +0200 Subject: [PATCH] feat: add support for adding jobs in bulk --- src/classes/job.ts | 30 ++++++++++++++++++++- src/classes/queue.ts | 17 ++++++++++++ src/classes/scripts.ts | 4 +-- src/test/test_bulk.ts | 59 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 107 insertions(+), 3 deletions(-) create mode 100644 src/test/test_bulk.ts diff --git a/src/classes/job.ts b/src/classes/job.ts index fa2c9506e7..9084a6dce2 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -75,10 +75,38 @@ export class Job { job.id = await job.addJob(queue.client); - logger('Job added', job.id); return job; } + static async createBulk( + queue: QueueBase, + jobs: { + name: string; + data: any; + opts?: JobsOpts; + }[], + ) { + await queue.waitUntilReady(); + + const jobInstances = jobs.map( + job => new Job(queue, job.name, job.data, job.opts), + ); + + const multi = queue.client.multi(); + + for (const job of jobInstances) { + job.addJob((multi as unknown)); + } + + const result = (await multi.exec()) as [null | Error, string][]; + result.forEach((res, index: number) => { + const [err, id] = res; + jobInstances[index].id = id; + }); + + return jobInstances; + } + static fromJSON(queue: QueueBase, json: any, jobId?: string) { const data = JSON.parse(json.data || '{}'); const opts = JSON.parse(json.opts || '{}'); diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 53ea9855cf..e606452797 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -48,6 +48,23 @@ export class Queue extends QueueGetters { } } + /** + Adds an array of jobs to the queue. + @method add + @param jobs: [] The array of jobs to add to the queue. Each job is defined by 3 + properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'. +*/ + async addBulk(jobs: { name: string; data: any; opts?: JobsOpts }[]) { + return Job.createBulk( + this, + jobs.map(job => ({ + name: job.name, + data: job.data, + opts: { ...job.opts, ...this.jobsOpts }, + })), + ); + } + /** Pauses the processing of this queue globally. diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index a7ba1544b6..6d068e123d 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -27,7 +27,7 @@ export class Scripts { } static addJob( - client: IORedis.Redis, + client: any, queue: QueueBase, job: JobJson, opts: JobsOpts, @@ -59,7 +59,7 @@ export class Scripts { ]; keys = keys.concat(args); - return (client).addJob(keys); + return client.addJob(keys); } static pause(queue: Queue, pause: boolean) { diff --git a/src/test/test_bulk.ts b/src/test/test_bulk.ts new file mode 100644 index 0000000000..f4b9ded702 --- /dev/null +++ b/src/test/test_bulk.ts @@ -0,0 +1,59 @@ +import { Queue, Worker, Job } from '@src/classes'; +import { expect } from 'chai'; +import IORedis from 'ioredis'; +import { beforeEach, describe, it } from 'mocha'; +import { v4 } from 'node-uuid'; + +describe('bulk jobs', () => { + let queue: Queue; + let queueName: string; + let client: IORedis.Redis; + + beforeEach(function() { + client = new IORedis(); + return client.flushdb(); + }); + + beforeEach(async function() { + queueName = 'test-' + v4(); + queue = new Queue(queueName); + }); + + afterEach(async function() { + await queue.close(); + return client.quit(); + }); + + it('should process jobs', async () => { + const name = 'test'; + let processor; + const processing = new Promise(resolve => [ + (processor = async (job: Job) => { + if (job.data.idx === 0) { + expect(job.data.foo).to.be.equal('bar'); + } else { + expect(job.data.idx).to.be.equal(1); + expect(job.data.foo).to.be.equal('baz'); + resolve(); + } + }), + ]); + const worker = new Worker(queueName, processor); + await worker.waitUntilReady(); + + const jobs = await queue.addBulk([ + { name, data: { idx: 0, foo: 'bar' } }, + { name, data: { idx: 1, foo: 'baz' } }, + ]); + expect(jobs).to.have.length(2); + + expect(jobs[0].id).to.be.ok; + expect(jobs[0].data.foo).to.be.eql('bar'); + expect(jobs[1].id).to.be.ok; + expect(jobs[1].data.foo).to.be.eql('baz'); + + await processing; + + await worker.close(); + }); +});