diff --git a/src/classes/compat.ts b/src/classes/compat.ts index 84500e0421..c83fa25857 100644 --- a/src/classes/compat.ts +++ b/src/classes/compat.ts @@ -497,6 +497,10 @@ export class Queue3 extends EventEmitter { } } + isPaused(): boolean { + return this.worker && this.worker.isPaused(); + } + /** * Returns a promise that returns the number of jobs in the queue, waiting or paused. * Since there may be other processes adding or processing jobs, @@ -677,7 +681,7 @@ export class Queue3 extends EventEmitter { * Optional parameters for range and ordering are provided. */ getJobs( - types: string[], + types: string[] | string, start = 0, end = -1, asc = false, @@ -705,16 +709,14 @@ export class Queue3 extends EventEmitter { /** * Returns a promise that resolves with the job counts for the given queue. */ - getJobCounts( - types?: string[] | string, - ): Promise<{ [index: string]: number }> { + getJobCounts(...types: string[]): Promise<{ [index: string]: number }> { return this.getQueue().getJobCounts(...Utils.parseTypeArg(types)); } /** * Returns a promise that resolves with the job counts for the given queue of the given types. */ - async getJobCountByTypes(types?: string[] | string): Promise { + async getJobCountByTypes(...types: string[]): Promise { return this.getQueue().getJobCountByTypes(...Utils.parseTypeArg(types)); } @@ -1020,6 +1022,7 @@ export class Queue3 extends EventEmitter { this.name, Utils.convertToQueueEventsOptions(this.opts), ); + this.queueEvents.init(); } return this.queueEvents; } @@ -1089,15 +1092,11 @@ export class Queue3 extends EventEmitter { case 'drained': if (once) { this.onWorkerInit(worker => { - worker.once('drained', () => { - listener(); - }); + worker.once('drained', listener); }); } else { this.onWorkerInit(worker => { - worker.on('drained', () => { - listener(); - }); + worker.on('drained', listener); }); } break; @@ -1216,13 +1215,9 @@ export class Queue3 extends EventEmitter { break; case 'global:drained': if (once) { - this.getQueueEvents().once('drained', () => { - listener(); - }); + this.getQueueEvents().once('drained', listener); } else { - this.getQueueEvents().on('drained', () => { - listener(); - }); + this.getQueueEvents().on('drained', listener); } break; case 'global:failed': @@ -1612,13 +1607,15 @@ class Utils { const target: QueueBaseOptions = {}; - if (source.redis) { + if (source.redis !== undefined) { const client = new IORedis(source.redis); target.connection = client; target.client = client; } - target.prefix = source.prefix; + if (source.prefix !== undefined) { + target.prefix = source.prefix; + } return target; } @@ -1630,13 +1627,18 @@ class Utils { const target: QueueOptions = Utils.convertToQueueBaseOptions(source); - target.defaultJobOptions = Utils.convertToJobsOpts( - source.defaultJobOptions, - ); - target.createClient = Utils.adaptToCreateClient( - source.createClient, - source.redis, - ); + if (source.defaultJobOptions) { + target.defaultJobOptions = Utils.convertToJobsOpts( + source.defaultJobOptions, + ); + } + + if (source.createClient) { + target.createClient = Utils.adaptToCreateClient( + source.createClient, + source.redis, + ); + } return target; } @@ -1650,8 +1652,8 @@ class Utils { const target: QueueEventsOptions = Utils.convertToQueueBaseOptions(source); - target.lastEventId = undefined; - target.blockingTimeout = undefined; + // target.lastEventId = undefined; + // target.blockingTimeout = undefined; return target; } @@ -1666,8 +1668,13 @@ class Utils { const target: QueueKeeperOptions = Utils.convertToQueueBaseOptions(source); if (source.settings) { - target.maxStalledCount = source.settings.maxStalledCount; - target.stalledInterval = source.settings.stalledInterval; + if (source.settings.maxStalledCount !== undefined) { + target.maxStalledCount = source.settings.maxStalledCount; + } + + if (source.settings.stalledInterval !== undefined) { + target.stalledInterval = source.settings.stalledInterval; + } } return target; @@ -1679,12 +1686,21 @@ class Utils { } const target: JobsOpts = {}; - - target.timestamp = (source as any).timestamp; - target.priority = source.priority; - target.delay = source.delay; - target.attempts = source.attempts; - target.repeat = Utils.convertToRepeatOpts(source.repeat); + if ((source as any).timestamp !== undefined) { + target.timestamp = (source as any).timestamp; + } + if (source.priority !== undefined) { + target.priority = source.priority; + } + if (source.delay !== undefined) { + target.delay = source.delay; + } + if (source.attempts !== undefined) { + target.attempts = source.attempts; + } + if (source.repeat !== undefined) { + target.repeat = Utils.convertToRepeatOpts(source.repeat); + } if (source.backoff !== undefined) { if (typeof source.backoff === 'number') { @@ -1694,16 +1710,26 @@ class Utils { } } - target.lifo = source.lifo; - target.timeout = source.timeout; - + if (source.lifo !== undefined) { + target.lifo = source.lifo; + } + if (source.timeout !== undefined) { + target.timeout = source.timeout; + } if (source.jobId !== undefined) { target.jobId = source.jobId; } - target.removeOnComplete = source.removeOnComplete; - target.removeOnFail = source.removeOnFail; - target.stackTraceLimit = source.stackTraceLimit; + if (source.removeOnComplete !== undefined) { + target.removeOnComplete = source.removeOnComplete; + } + if (source.removeOnFail !== undefined) { + target.removeOnFail = source.removeOnFail; + } + if (source.stackTraceLimit !== undefined) { + target.stackTraceLimit = source.stackTraceLimit; + } + return target; } @@ -1716,14 +1742,26 @@ class Utils { const target: RepeatOpts = {}; - target.cron = (source as CronRepeatOptions3).cron; - target.tz = (source as CronRepeatOptions3).tz; - target.startDate = (source as CronRepeatOptions3).startDate; - target.endDate = (source as CronRepeatOptions3).endDate; - target.limit = (source as EveryRepeatOptions3).limit; - target.every = (source as EveryRepeatOptions3).every; - target.count = undefined; - target.prevMillis = undefined; + if ((source as CronRepeatOptions3).cron !== undefined) { + target.cron = (source as CronRepeatOptions3).cron; + } + if ((source as CronRepeatOptions3).tz !== undefined) { + target.tz = (source as CronRepeatOptions3).tz; + } + if ((source as CronRepeatOptions3).startDate !== undefined) { + target.startDate = (source as CronRepeatOptions3).startDate; + } + if ((source as CronRepeatOptions3).endDate !== undefined) { + target.endDate = (source as CronRepeatOptions3).endDate; + } + if ((source as EveryRepeatOptions3).limit !== undefined) { + target.limit = (source as EveryRepeatOptions3).limit; + } + if ((source as EveryRepeatOptions3).every !== undefined) { + target.every = (source as EveryRepeatOptions3).every; + } + // target.count = undefined; + // target.prevMillis = undefined; return target; } @@ -1735,8 +1773,12 @@ class Utils { const target: BackoffOpts = { type: undefined, delay: undefined }; - target.type = source.type; - target.delay = source.delay; + if (source.type !== undefined) { + target.type = source.type; + } + if (source.delay !== undefined) { + target.delay = source.delay; + } return target; } @@ -1747,14 +1789,19 @@ class Utils { } const target: WorkerOptions = Utils.convertToQueueBaseOptions(source); - target.concurrency = undefined; - target.limiter = Utils.convertToRateLimiterOpts(source.limiter); - target.skipDelayCheck = undefined; - target.drainDelay = source.settings - ? source.settings.drainDelay - : undefined; - target.visibilityWindow = undefined; - target.settings = Utils.convertToAdvancedOpts(source.settings); + // target.concurrency = undefined; + if (source.limiter !== undefined) { + target.limiter = Utils.convertToRateLimiterOpts(source.limiter); + } + // target.skipDelayCheck = undefined; + if (source.settings && source.settings.drainDelay !== undefined) { + target.drainDelay = source.settings.drainDelay; + } + + // target.visibilityWindow = undefined; + if (source.settings) { + target.settings = Utils.convertToAdvancedOpts(source.settings); + } return target; } @@ -1766,8 +1813,12 @@ class Utils { const target: RateLimiterOpts = { max: undefined, duration: undefined }; - target.max = source.max; - target.duration = source.duration; + if (source.max !== undefined) { + target.max = source.max; + } + if (source.duration !== undefined) { + target.duration = source.duration; + } if (source.bounceBack !== undefined) { console.warn('bounceBack option is not supported'); @@ -1783,14 +1834,27 @@ class Utils { const target: AdvancedOpts = {}; - target.lockDuration = source.lockDuration; - target.stalledInterval = source.stalledInterval; - target.maxStalledCount = source.maxStalledCount; - target.guardInterval = source.guardInterval; - target.retryProcessDelay = source.retryProcessDelay; - target.backoffStrategies = source.backoffStrategies; - target.drainDelay = source.drainDelay; - + if (source.lockDuration !== undefined) { + target.lockDuration = source.lockDuration; + } + if (source.stalledInterval !== undefined) { + target.stalledInterval = source.stalledInterval; + } + if (source.maxStalledCount !== undefined) { + target.maxStalledCount = source.maxStalledCount; + } + if (source.guardInterval !== undefined) { + target.guardInterval = source.guardInterval; + } + if (source.retryProcessDelay !== undefined) { + target.retryProcessDelay = source.retryProcessDelay; + } + if (source.backoffStrategies !== undefined) { + target.backoffStrategies = source.backoffStrategies; + } + if (source.drainDelay !== undefined) { + target.drainDelay = source.drainDelay; + } if (source.lockRenewTime !== undefined) { console.warn('lockRenewTime option is not supported'); } diff --git a/src/test/test_compat.ts b/src/test/test_compat.ts new file mode 100644 index 0000000000..0f2249fe9b --- /dev/null +++ b/src/test/test_compat.ts @@ -0,0 +1,626 @@ +/*eslint-env node */ +'use strict'; + +import { Job, Queue, QueueEvents, QueueScheduler, Worker } from '@src/classes'; +import { describe, beforeEach, afterEach, it } from 'mocha'; +import { expect } from 'chai'; +import IORedis from 'ioredis'; +import { v4 } from 'node-uuid'; +import { after } from 'lodash'; + +import { delay } from 'bluebird'; +import { Queue3 } from '@src/classes/compat'; + +describe('Compat', function() { + describe('jobs getters', function() { + this.timeout(4000); + let queue: Queue3; + let queueName: string; + let client: IORedis.Redis; + + beforeEach(function() { + client = new IORedis(); + return client.flushdb(); + }); + + beforeEach(async function() { + queueName = 'test-' + v4(); + queue = new Queue3(queueName, 'redis://127.0.0.1:6379/0'); + }); + + afterEach(async function() { + await queue.close(); + return client.quit(); + }); + + it('should get waiting jobs', async function() { + await queue.add('test', { foo: 'bar' }); + await queue.add('test', { baz: 'qux' }); + + const jobs = await queue.getWaiting(); + expect(jobs).to.be.a('array'); + expect(jobs.length).to.be.equal(2); + expect(jobs[0].data.foo).to.be.equal('bar'); + expect(jobs[1].data.baz).to.be.equal('qux'); + }); + + it('should get paused jobs', async function() { + await queue.pause(); + await Promise.all([ + queue.add('test', { foo: 'bar' }), + queue.add('test', { baz: 'qux' }), + ]); + const jobs = await queue.getWaiting(); + expect(jobs).to.be.a('array'); + expect(jobs.length).to.be.equal(2); + expect(jobs[0].data.foo).to.be.equal('bar'); + expect(jobs[1].data.baz).to.be.equal('qux'); + }); + + it('should get active jobs', async function() { + let processor; + const processing = new Promise(resolve => { + processor = async (job: Job) => { + const jobs = await queue.getActive(); + expect(jobs).to.be.a('array'); + expect(jobs.length).to.be.equal(1); + expect(jobs[0].data.foo).to.be.equal('bar'); + resolve(); + }; + }); + + await queue.add('test', { foo: 'bar' }); + await queue.process('test', processor); + await processing; + }); + + it('should get completed jobs', function(done) { + queue.process('test', async job => {}); + + let counter = 2; + + queue.on('completed', async function() { + counter--; + + if (counter === 0) { + const jobs = await queue.getCompleted(); + expect(jobs).to.be.a('array'); + done(); + } + }); + + queue.add('test', { foo: 'bar' }); + queue.add('test', { baz: 'qux' }); + }); + + it('should get failed jobs', function(done) { + queue.process('test', async job => { + throw new Error('Forced error'); + }); + + let counter = 2; + + queue.on('failed', async function() { + counter--; + + if (counter === 0) { + const jobs = await queue.getFailed(); + expect(jobs).to.be.a('array'); + done(); + } + }); + + queue.add('test', { foo: 'bar' }); + queue.add('test', { baz: 'qux' }); + }); + + it('should return all completed jobs when not setting start/end', function(done) { + queue.process('test', async job => {}); + + queue.on( + 'completed', + after(3, async function() { + try { + const jobs = await queue.getJobs('completed'); + expect(jobs) + .to.be.an('array') + .that.have.length(3); + expect(jobs[0]).to.have.property('finishedOn'); + expect(jobs[1]).to.have.property('finishedOn'); + expect(jobs[2]).to.have.property('finishedOn'); + + expect(jobs[0]).to.have.property('processedOn'); + expect(jobs[1]).to.have.property('processedOn'); + expect(jobs[2]).to.have.property('processedOn'); + done(); + } catch (err) { + done(err); + } + }), + ); + + queue.add('test', { foo: 1 }); + queue.add('test', { foo: 2 }); + queue.add('test', { foo: 3 }); + }); + + it('should return all failed jobs when not setting start/end', function(done) { + queue.process('test', async job => { + throw new Error('error'); + }); + + queue.on( + 'failed', + after(3, async function() { + try { + queue; + const jobs = await queue.getJobs('failed'); + expect(jobs) + .to.be.an('array') + .that.has.length(3); + expect(jobs[0]).to.have.property('finishedOn'); + expect(jobs[1]).to.have.property('finishedOn'); + expect(jobs[2]).to.have.property('finishedOn'); + + expect(jobs[0]).to.have.property('processedOn'); + expect(jobs[1]).to.have.property('processedOn'); + expect(jobs[2]).to.have.property('processedOn'); + done(); + } catch (err) { + done(err); + } + }), + ); + + queue.add('test', { foo: 1 }); + queue.add('test', { foo: 2 }); + queue.add('test', { foo: 3 }); + }); + + it('should return subset of jobs when setting positive range', function(done) { + queue.process('test', async job => {}); + + queue.on( + 'completed', + after(3, async function() { + try { + const jobs = await queue.getJobs('completed', 1, 2, true); + expect(jobs) + .to.be.an('array') + .that.has.length(2); + expect(jobs[0].data.foo).to.be.eql(2); + expect(jobs[1].data.foo).to.be.eql(3); + expect(jobs[0]).to.have.property('finishedOn'); + expect(jobs[1]).to.have.property('finishedOn'); + expect(jobs[0]).to.have.property('processedOn'); + expect(jobs[1]).to.have.property('processedOn'); + done(); + } catch (err) { + done(err); + } + }), + ); + + queue.add('test', { foo: 1 }); + queue.add('test', { foo: 2 }); + queue.add('test', { foo: 3 }); + }); + + it('should return subset of jobs when setting a negative range', function(done) { + queue.process('test', async job => {}); + + queue.on( + 'completed', + after(3, async function() { + try { + const jobs = await queue.getJobs('completed', -3, -1, true); + expect(jobs) + .to.be.an('array') + .that.has.length(3); + expect(jobs[0].data.foo).to.be.equal(1); + expect(jobs[1].data.foo).to.be.eql(2); + expect(jobs[2].data.foo).to.be.eql(3); + done(); + } catch (err) { + done(err); + } + }), + ); + + queue.add('test', { foo: 1 }); + queue.add('test', { foo: 2 }); + queue.add('test', { foo: 3 }); + }); + + it('should return subset of jobs when range overflows', function(done) { + queue.process('test', async job => {}); + + queue.on( + 'completed', + after(3, async function() { + try { + const jobs = await queue.getJobs('completed', -300, 99999, true); + expect(jobs) + .to.be.an('array') + .that.has.length(3); + expect(jobs[0].data.foo).to.be.equal(1); + expect(jobs[1].data.foo).to.be.eql(2); + expect(jobs[2].data.foo).to.be.eql(3); + done(); + } catch (err) { + done(err); + } + }), + ); + + queue.add('test', { foo: 1 }); + queue.add('test', { foo: 2 }); + queue.add('test', { foo: 3 }); + }); + + it('should return jobs for multiple types', function(done) { + let counter = 0; + + queue.process('test', async job => { + counter++; + if (counter == 2) { + await queue.add('test', { foo: 3 }); + return queue.pause(); + } + }); + + queue.on( + 'completed', + after(2, async function() { + try { + const jobs = await queue.getJobs(['completed', 'waiting']); + expect(jobs).to.be.an('array'); + expect(jobs).to.have.length(3); + done(); + } catch (err) { + done(err); + } + }), + ); + + queue.add('test', { foo: 1 }); + queue.add('test', { foo: 2 }); + }); + }); + + describe('events', function() { + this.timeout(4000); + let queue: Queue3; + let queueName: string; + let client: IORedis.Redis; + + beforeEach(function() { + client = new IORedis(); + return client.flushdb(); + }); + + beforeEach(async function() { + queueName = 'test-' + v4(); + queue = new Queue3(queueName); + }); + + afterEach(async function() { + await queue.close(); + return client.quit(); + }); + + it('should emit waiting when a job has been added', function(done) { + queue.on('waiting', function() { + done(); + }); + + queue.add('test', { foo: 'bar' }); + }); + + it('should emit global waiting event when a job has been added', function(done) { + queue.on('waiting', function() { + done(); + }); + + queue.add('test', { foo: 'bar' }); + }); + + it('emits drained and global:drained event when all jobs have been processed', function(done) { + queue.process('test', async job => {}); + + const drainedCallback = after(2, async function() { + const jobs = await queue.getJobCountByTypes('completed'); + expect(jobs).to.be.equal(2); + done(); + }); + + queue.once('drained', drainedCallback); + queue.once('global:drained', drainedCallback); + + queue.add('test', { foo: 'bar' }); + queue.add('test', { foo: 'baz' }); + }); + + it('should emit an event when a job becomes active', function(done) { + queue.add('test', {}); + + queue.once('active', function() { + queue.once('completed', async function() { + done(); + }); + }); + + queue.process('test', async () => {}); + }); + + it('should listen to global events', function(done) { + let state: string; + queue.on('global:waiting', function() { + expect(state).to.be.undefined; + state = 'waiting'; + }); + queue.once('global:active', function() { + expect(state).to.be.equal('waiting'); + state = 'active'; + }); + queue.once('global:completed', async function() { + expect(state).to.be.equal('active'); + done(); + }); + + queue.add('test', {}); + queue.process('test', async () => {}); + }); + }); + + describe('Pause', function() { + let queue: Queue3; + let queueName: string; + let client: IORedis.Redis; + + beforeEach(function() { + client = new IORedis(); + return client.flushdb(); + }); + + beforeEach(async function() { + queueName = 'test-' + v4(); + queue = new Queue3(queueName); + }); + + afterEach(async function() { + await queue.close(); + return client.quit(); + }); + + // it('should pause a queue until resumed', async () => { + // let process; + // let isPaused = false; + // let counter = 2; + // const processPromise = new Promise(resolve => { + // process = async (job: Job) => { + // expect(isPaused).to.be.eql(false); + // expect(job.data.foo).to.be.equal('paused'); + // counter--; + // if (counter === 0) { + // resolve(); + // } + // }; + // }); + // + // await queue.process('test', process); + // + // await queue.pause(); + // isPaused = true; + // await queue.add('test', { foo: 'paused' }); + // await queue.add('test', { foo: 'paused' }); + // isPaused = false; + // await queue.resume(); + // + // await processPromise; + // }); + + it('should be able to pause a running queue and emit relevant events', async () => { + let process; + + let isPaused = false, + isResumed = true, + first = true; + + const processPromise = new Promise((resolve, reject) => { + process = async (job: Job) => { + try { + expect(isPaused).to.be.eql(false); + expect(job.data.foo).to.be.equal('paused'); + + if (first) { + first = false; + isPaused = true; + return queue.pause(); + } else { + expect(isResumed).to.be.eql(true); + await queue.close(); + resolve(); + } + } catch (err) { + reject(err); + } + }; + }); + + await queue.process('test', process); + + queue.add('test', { foo: 'paused' }); + queue.add('test', { foo: 'paused' }); + + queue.on('global:paused', async () => { + isPaused = false; + await queue.resume(); + }); + + queue.on('global:resumed', () => { + isResumed = true; + }); + + return processPromise; + }); + + it('should pause the queue locally', async () => { + let counter = 2; + let process; + const processPromise = new Promise(resolve => { + process = async (job: Job) => { + expect(queue.isPaused()).to.be.eql(false); + counter--; + if (counter === 0) { + await queue.close(); + resolve(); + } + }; + }); + + await queue.process('test', process); + await queue.pause(true); + + // Add the worker after the queue is in paused mode since the normal behavior is to pause + // it after the current lock expires. This way, we can ensure there isn't a lock already + // to test that pausing behavior works. + + await queue.add('test', { foo: 'paused' }); + await queue.add('test', { foo: 'paused' }); + + expect(counter).to.be.eql(2); + expect(queue.isPaused()).to.be.eql(true); + + await queue.resume(true); + return processPromise; + }); + + // it('should wait until active jobs are finished before resolving pause', async () => { + // let process; + // + // const startProcessing = new Promise(resolve => { + // process = async () => { + // resolve(); + // return delay(1000); + // }; + // }); + // + // await queue.process('test', process); + // + // const jobs = []; + // for (let i = 0; i < 10; i++) { + // jobs.push(queue.add('test', i)); + // } + // + // // + // // Add start processing so that we can test that pause waits for this job to be completed. + // // + // jobs.push(startProcessing); + // await Promise.all(jobs); + // await queue.pause(true); + // + // let active = await queue.getJobCountByTypes('active'); + // expect(active).to.be.eql(0); + // expect(queue.isPaused()).to.be.eql(true); + // + // // One job from the 10 posted above will be processed, so we expect 9 jobs pending + // let paused = await queue.getJobCountByTypes('delayed', 'waiting'); + // expect(paused).to.be.eql(9); + // await Promise.all([active, paused]); + // + // await queue.add('test', {}); + // + // active = await queue.getJobCountByTypes('active'); + // expect(active).to.be.eql(0); + // + // paused = await queue.getJobCountByTypes('paused', 'waiting', 'delayed'); + // expect(paused).to.be.eql(10); + // + // await Promise.all([active, paused]); + // }); + + it('should pause the queue locally when more than one worker is active', async () => { + let process1, process2; + + const startProcessing1 = new Promise(resolve => { + process1 = async () => { + resolve(); + return delay(200); + }; + }); + + const startProcessing2 = new Promise(resolve => { + process2 = async () => { + resolve(); + return delay(200); + }; + }); + + const worker1 = new Worker(queueName, process1); + await worker1.waitUntilReady(); + + const worker2 = new Worker(queueName, process2); + await worker2.waitUntilReady(); + + queue.add('test', 1); + queue.add('test', 2); + queue.add('test', 3); + queue.add('test', 4); + + await Promise.all([startProcessing1, startProcessing2]); + await Promise.all([worker1.pause(), worker2.pause()]); + + const count = await queue.getJobCounts('active', 'waiting', 'completed'); + expect(count.active).to.be.eql(0); + expect(count.waiting).to.be.eql(2); + expect(count.completed).to.be.eql(2); + + return Promise.all([worker1.close(), worker2.close()]); + }); + + // it('should wait for blocking job retrieval to complete before pausing locally', async () => { + // let process; + // + // const startProcessing = new Promise(resolve => { + // process = async () => { + // resolve(); + // return delay(200); + // }; + // }); + // + // await queue.process('test', process); + // + // await queue.add('test', 1); + // await startProcessing; + // await queue.pause(true); + // await queue.add('test', 2); + // + // const count = await queue.getJobCounts('active', 'waiting', 'completed'); + // expect(count.active).to.be.eql(0); + // expect(count.waiting).to.be.eql(1); + // expect(count.completed).to.be.eql(1); + // }); + + it('pauses fast when queue is drained', async function() { + await queue.process('test', async () => {}); + + await queue.add('test', {}); + + return new Promise((resolve, reject) => { + queue.on('global:drained', async () => { + try { + const start = new Date().getTime(); + await queue.pause(); + + const finish = new Date().getTime(); + expect(finish - start).to.be.lt(1000); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + }); + }); +});