From 1b6b1927c7e8e6b6f1bf0bbd6c74eb59cc17deb6 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 20 Jul 2019 10:41:49 +0200 Subject: [PATCH] feat: ported tests and functionality from bull 3 --- src/classes/index.ts | 16 +- src/classes/job.ts | 15 +- src/classes/queue-base.ts | 11 +- src/classes/queue-events.ts | 1 + src/classes/queue-keeper.ts | 15 +- src/classes/redis-connection.ts | 6 +- src/classes/scripts.ts | 80 ++--- src/classes/worker.ts | 113 ++++--- src/commands/addJob-8.lua | 2 +- ...{moveToActive-8.lua => moveToActive-9.lua} | 34 +- src/commands/moveToFinished-6.lua | 15 - src/commands/pause-4.lua | 4 +- src/commands/updateDelaySet-7.lua | 2 +- src/interfaces/rate-limiter-opts.ts | 3 - src/test/test_delay.ts | 1 - src/test/test_pause.ts | 309 ++++++++++++++++++ src/test/test_rate_limiter.ts | 113 +++++++ src/test/test_repeat.ts | 125 ++++--- 18 files changed, 645 insertions(+), 220 deletions(-) rename src/commands/{moveToActive-8.lua => moveToActive-9.lua} (65%) create mode 100644 src/test/test_pause.ts create mode 100644 src/test/test_rate_limiter.ts diff --git a/src/classes/index.ts b/src/classes/index.ts index 2d2ddb8e9b..54bd261938 100644 --- a/src/classes/index.ts +++ b/src/classes/index.ts @@ -1,5 +1,11 @@ -export * from './queue'; -export * from './job'; -export * from './redis-connection'; -export * from './scripts'; -export * from './backoffs'; +export * from './backoffs'; +export * from './job'; +export * from './queue-base'; +export * from './queue-events'; +export * from './queue-getters'; +export * from './queue-keeper'; +export * from './queue'; +export * from './redis-connection'; +export * from './repeat'; +export * from './scripts'; +export * from './worker'; diff --git a/src/classes/job.ts b/src/classes/job.ts index d814ec52ae..d53675e727 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -171,12 +171,12 @@ export class Job { * Moves a job to the completed queue. * Returned job to be used with Queue.prototype.nextJobFromJobData. * @param returnValue {string} The jobs success message. - * @param ignoreLock {boolean} True when wanting to ignore the redis lock on this job. + * @param fetchNext {boolean} True when wanting to fetch the next job * @returns {Promise} Returns the jobData of the next job in the waiting queue. */ async moveToCompleted( returnValue: any, - ignoreLock = true, + fetchNext = true, ): Promise<[JobJson, string]> { await this.queue.waitUntilReady(); @@ -192,17 +192,17 @@ export class Job { this, returnValue, this.opts.removeOnComplete, - ignoreLock, + fetchNext, ); } /** * Moves a job to the failed queue. * @param err {Error} The jobs error message. - * @param ignoreLock {boolean} True when wanting to ignore the redis lock on this job. + * @param fetchNext {boolean} True when wanting to fetch the next job * @returns void */ - async moveToFailed(err: Error, ignoreLock = true) { + async moveToFailed(err: Error, fetchNext = false) { await this.queue.waitUntilReady(); const queue = this.queue; @@ -235,13 +235,12 @@ export class Job { queue, this.id, Date.now() + delay, - ignoreLock, ); (multi).moveToDelayed(args); command = 'delayed'; } else { // If not, retry immediately - (multi).retryJob(Scripts.retryJobArgs(queue, this, ignoreLock)); + (multi).retryJob(Scripts.retryJobArgs(queue, this)); command = 'retry'; } } else { @@ -255,7 +254,7 @@ export class Job { this, err.message, this.opts.removeOnFail, - ignoreLock, + fetchNext, ); (multi).moveToFinished(args); command = 'failed'; diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 46484e0f77..3d256dcb52 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -1,12 +1,7 @@ -import { RedisConnection } from './redis-connection'; -import IORedis from 'ioredis'; -import { - QueueBaseOptions, - QueueEventsOptions, - QueueOptions, -} from '@src/interfaces'; +import { QueueBaseOptions } from '@src/interfaces'; import { EventEmitter } from 'events'; -import { WorkerOptions } from '@src/interfaces/worker-opts'; +import IORedis from 'ioredis'; +import { RedisConnection } from './redis-connection'; export class QueueBase extends EventEmitter { keys: { [index: string]: string }; diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 8c7d1663b2..39fefa7e87 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -1,6 +1,7 @@ import { QueueEventsOptions } from '@src/interfaces'; import { QueueBase } from './queue-base'; import { array2obj } from '../utils'; +import { Job } from './job'; export class QueueEvents extends QueueBase { constructor(name: string, opts?: QueueEventsOptions) { diff --git a/src/classes/queue-keeper.ts b/src/classes/queue-keeper.ts index f2e7d635dc..11b10b0228 100644 --- a/src/classes/queue-keeper.ts +++ b/src/classes/queue-keeper.ts @@ -36,7 +36,7 @@ export class QueueKeeper extends QueueBase { async init() { await this.waitUntilReady(); - // TODO: updateDelaySet should also retun the lastDelayStreamTimestamp + // TODO: updateDelaySet should also return the lastDelayStreamTimestamp const timestamp = await Scripts.updateDelaySet(this, Date.now()); if (timestamp) { @@ -53,10 +53,12 @@ export class QueueKeeper extends QueueBase { while (!this.closing) { // Listen to the delay event stream from lastDelayStreamTimestamp // Can we use XGROUPS to reduce redundancy? - const blockTime = Math.round(Math.min( - (this.opts).stalledInterval, - Math.max(this.nextTimestamp - Date.now(), 0), - )); + const blockTime = Math.round( + Math.min( + (this.opts).stalledInterval, + Math.max(this.nextTimestamp - Date.now(), 0), + ), + ); const data = await this.client.xread( 'BLOCK', @@ -83,6 +85,9 @@ export class QueueKeeper extends QueueBase { const now = Date.now(); const delay = this.nextTimestamp - now; + + console.log('DELAY', delay, this.nextTimestamp); + if (delay <= 0) { const nextTimestamp = await Scripts.updateDelaySet(this, now); if (nextTimestamp) { diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 0a919edddc..1c064fc542 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -57,7 +57,7 @@ export class RedisConnection { await RedisConnection.waitUntilReady(this.client); - this.client.on('error', err => { + this.client.on('error', (err: Error) => { console.error(err); }); @@ -65,9 +65,7 @@ export class RedisConnection { const version = await this.getRedisVersion(); if (semver.lt(version, RedisConnection.minimumVersion)) { throw new Error( - `Redis version needs to be greater than ${ - RedisConnection.minimumVersion - } Current: ${version}`, + `Redis version needs to be greater than ${RedisConnection.minimumVersion} Current: ${version}`, ); } } diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index e77d4bd142..07c5278026 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -5,19 +5,16 @@ /*eslint-env node */ 'use strict'; -import { debuglog } from 'util'; +import { QueueKeeperOptions } from '@src/interfaces'; +import { WorkerOptions } from '@src/interfaces/worker-opts'; import IORedis from 'ioredis'; -import { Queue } from './queue'; -import { Job, JobJson } from './job'; import { JobsOpts } from '../interfaces'; -import { QueueBase } from './queue-base'; -import { Worker } from './worker'; -import { WorkerOptions } from '@src/interfaces/worker-opts'; import { array2obj } from '../utils'; +import { Job, JobJson } from './job'; +import { Queue } from './queue'; +import { QueueBase } from './queue-base'; import { QueueKeeper } from './queue-keeper'; -import { QueueKeeperOptions } from '@src/interfaces'; - -const logger = debuglog('bull'); +import { Worker } from './worker'; export class Scripts { static async isJobInList( @@ -71,10 +68,12 @@ export class Scripts { dst = 'wait'; } - const keys = [src, dst, 'meta-paused', pause ? 'paused' : 'resumed'].map( - (name: string) => queue.toKey(name), + const keys = [src, dst, 'meta-paused'].map((name: string) => + queue.toKey(name), ); + keys.push(queue.eventStreamKey()); + return (queue.client).pause( keys.concat([pause ? 'paused' : 'resumed']), ); @@ -123,8 +122,7 @@ export class Scripts { propVal: string, shouldRemove: boolean, target: string, - ignoreLock: boolean, - notFetch?: boolean, + fetchNext = true, ) { const queueKeys = queue.keys; @@ -145,7 +143,9 @@ export class Scripts { target, shouldRemove ? '1' : '0', JSON.stringify({ jobId: job.id, val: val }), - notFetch || queue.closing || (queue.opts).limiter ? 0 : 1, + !fetchNext || queue.closing || (queue.opts).limiter + ? 0 + : 1, queueKeys[''], ]; @@ -159,7 +159,7 @@ export class Scripts { propVal: string, shouldRemove: boolean, target: string, - ignoreLock: boolean, + fetchNext: boolean, ) { const args = this.moveToFinishedArgs( queue, @@ -168,7 +168,7 @@ export class Scripts { propVal, shouldRemove, target, - ignoreLock, + fetchNext, ); const result = await (queue.client).moveToFinished(args); if (result < 0) { @@ -193,7 +193,7 @@ export class Scripts { job: Job, returnvalue: any, removeOnComplete: boolean, - ignoreLock: boolean, + fetchNext: boolean, ): Promise<[JobJson, string]> { return this.moveToFinished( queue, @@ -202,7 +202,7 @@ export class Scripts { 'returnvalue', removeOnComplete, 'completed', - ignoreLock, + fetchNext, ); } @@ -211,7 +211,7 @@ export class Scripts { job: Job, failedReason: string, removeOnFailed: boolean, - ignoreLock: boolean, + fetchNext = false, ) { return this.moveToFinishedArgs( queue, @@ -220,8 +220,7 @@ export class Scripts { 'failedReason', removeOnFailed, 'failed', - ignoreLock, - true, + fetchNext, ); } @@ -234,12 +233,7 @@ export class Scripts { } // Note: We have an issue here with jobs using custom job ids - static moveToDelayedArgs( - queue: QueueBase, - jobId: string, - timestamp: number, - ignoreLock: boolean, - ) { + static moveToDelayedArgs(queue: QueueBase, jobId: string, timestamp: number) { // // Bake in the job id first 12 bits into the timestamp // to guarantee correct execution order of delayed jobs @@ -258,20 +252,11 @@ export class Scripts { const keys = ['active', 'delayed', jobId].map(function(name) { return queue.toKey(name); }); - return keys.concat([ - JSON.stringify(timestamp), - jobId, - ignoreLock ? '0' : 'queue.token', - ]); + return keys.concat([JSON.stringify(timestamp), jobId]); } - static async moveToDelayed( - queue: Queue, - jobId: string, - timestamp: number, - ignoreLock: boolean, - ) { - const args = this.moveToDelayedArgs(queue, jobId, timestamp, ignoreLock); + static async moveToDelayed(queue: Queue, jobId: string, timestamp: number) { + const args = this.moveToDelayedArgs(queue, jobId, timestamp); const result = await (queue.client).moveToDelayed(args); switch (result) { case -1: @@ -280,16 +265,10 @@ export class Scripts { jobId + ' when trying to move from active to delayed', ); - case -2: - throw new Error( - 'Job ' + - jobId + - ' was locked when trying to move from active to delayed', - ); } } - static retryJobArgs(queue: QueueBase, job: Job, ignoreLock: boolean) { + static retryJobArgs(queue: QueueBase, job: Job) { const jobId = job.id; const keys = ['active', 'wait', jobId].map(function(name) { @@ -298,7 +277,7 @@ export class Scripts { const pushCmd = (job.opts.lifo ? 'R' : 'L') + 'PUSH'; - return keys.concat([pushCmd, jobId, ignoreLock ? '0' : 'queue.token']); + return keys.concat([pushCmd, jobId]); } static moveToActive(queue: Worker, jobId: string) { @@ -310,6 +289,7 @@ export class Scripts { keys[5] = queueKeys.limiter; keys[6] = queueKeys.delayed; keys[7] = queue.eventStreamKey(); + keys[8] = queue.delayStreamKey(); const args: (string | number | boolean)[] = [ queueKeys[''], @@ -320,11 +300,7 @@ export class Scripts { const opts: WorkerOptions = queue.opts; if (opts.limiter) { - args.push( - opts.limiter.max, - opts.limiter.duration, - !!opts.limiter.bounceBack, - ); + args.push(opts.limiter.max, opts.limiter.duration); } return (queue.client) .moveToActive((<(string | number | boolean)[]>keys).concat(args)) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 1e3c10b7bb..5b0633c9eb 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -12,6 +12,7 @@ import { Repeat } from './repeat'; export class Worker extends QueueBase { private drained: boolean; + private waiting = false; private processFn: Processor; private resumeWorker: () => void; @@ -110,18 +111,14 @@ export class Worker extends QueueBase { return; } + if (this.paused) { + await this.paused; + } + if (this.drained) { - // - // Waiting for new jobs to arrive - // try { - const opts: WorkerOptions = this.opts; + const jobId = await this.waitForJob(); - const jobId = await this.client.brpoplpush( - this.keys.wait, - this.keys.active, - opts.drainDelay, - ); if (jobId) { return this.moveToActive(jobId); } @@ -141,6 +138,23 @@ export class Worker extends QueueBase { return this.nextJobFromJobData(jobData, id); } + private async waitForJob() { + let jobId; + const opts: WorkerOptions = this.opts; + + try { + this.waiting = true; + jobId = await this.client.brpoplpush( + this.keys.wait, + this.keys.active, + opts.drainDelay, + ); + } finally { + this.waiting = false; + } + return jobId; + } + private async nextJobFromJobData(jobData: any, jobId: string) { if (jobData) { this.drained = false; @@ -158,12 +172,14 @@ export class Worker extends QueueBase { } async processJob(job: Job) { - if (!job) { + if (!job || this.closing || this.paused) { return; } - const handleCompleted = async (result: any) => { - const jobData = await job.moveToCompleted(result); + const jobData = await job.moveToCompleted( + result, + !(this.closing || this.paused), + ); this.emit('completed', job, result, 'active'); return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null; }; @@ -174,14 +190,22 @@ export class Worker extends QueueBase { error instanceof Bluebird.OperationalError && (error).cause instanceof Error ) { - error = (error).cause; //Handle explicit rejection + error = (error).cause; // Handle explicit rejection } await job.moveToFailed(err); this.emit('failed', job, error, 'active'); }; - const jobPromise = this.processFn(job); + // TODO: how to cancel the processing? (null -> job.cancel() => throw CancelError()void) + this.emit('active', job, null, 'waiting'); + + try { + const result = await this.processFn(job); + return handleCompleted(result); + } catch (err) { + return handleFailed(err); + } /* var timeoutMs = job.opts.timeout; @@ -190,11 +214,10 @@ export class Worker extends QueueBase { jobPromise = jobPromise.timeout(timeoutMs); } */ - // Local event with jobPromise so that we can cancel job. - this.emit('active', job, jobPromise, 'waiting'); + // this.emit('active', job, jobPromise, 'waiting'); - return jobPromise.then(handleCompleted).catch(handleFailed); + // return jobPromise.then(handleCompleted).catch(handleFailed); } /** @@ -221,6 +244,10 @@ export class Worker extends QueueBase { } } + isPaused() { + return !!this.paused; + } + /** * Returns a promise that resolves when active jobs are cleared * @@ -230,37 +257,31 @@ export class Worker extends QueueBase { // // Force reconnection of blocking connection to abort blocking redis call immediately. // - await redisClientDisconnect(this.client); - await Promise.all(Object.values(this.processing)); - - this.client.connect(); + this.waiting && (await redisClientDisconnect(this.client)); + const processingPromises = Object.values(this.processing); + await Promise.all(processingPromises); + this.waiting && (await this.client.connect()); } } -function redisClientDisconnect(client: IORedis.Redis) { - if (client.status === 'end') { - return Promise.resolve(); +async function redisClientDisconnect(client: IORedis.Redis) { + if (client.status !== 'end') { + let _resolve, _reject; + + const disconnecting = new Promise((resolve, reject) => { + client.once('end', resolve); + client.once('error', reject); + _resolve = resolve; + _reject = reject; + }); + + client.disconnect(); + + try { + await disconnecting; + } finally { + client.removeListener('end', _resolve); + client.removeListener('error', _reject); + } } - let _resolve: any, _reject: any; - return new Promise(function(resolve, reject) { - _resolve = resolve; - _reject = reject; - client.once('end', resolve); - client.once('error', reject); - - client - .quit() - .catch(function(err) { - if (err.message !== 'Connection is closed.') { - throw err; - } - }) - // .timeout(500) - .catch(function() { - client.disconnect(); - }); - }).finally(function() { - client.removeListener('end', _resolve); - client.removeListener('error', _reject); - }); } diff --git a/src/commands/addJob-8.lua b/src/commands/addJob-8.lua index cc14f22a59..d1ced023b5 100644 --- a/src/commands/addJob-8.lua +++ b/src/commands/addJob-8.lua @@ -82,7 +82,7 @@ else -- LIFO or FIFO rcall(ARGV[10], target, jobId) - -- Emit waiting event (wait..ing@token) + -- Emit waiting event rcall("XADD", KEYS[7], "*", "event", "waiting", "jobId", jobId); else -- Priority add diff --git a/src/commands/moveToActive-8.lua b/src/commands/moveToActive-9.lua similarity index 65% rename from src/commands/moveToActive-8.lua rename to src/commands/moveToActive-9.lua index 5ce58b1e68..adff1a9acd 100644 --- a/src/commands/moveToActive-8.lua +++ b/src/commands/moveToActive-9.lua @@ -2,10 +2,6 @@ Move next job to be processed to active, lock it and fetch its data. The job may be delayed, in that case we need to move it to the delayed set instead. - This operation guarantees that the worker owns the job during the locks - expiration time. The worker is responsible of keeping the lock fresh - so that no other worker picks this job again. - Input: KEYS[1] wait key KEYS[2] active key @@ -17,16 +13,17 @@ KEYS[6] rate limiter key KEYS[7] delayed key - -- + -- Events KEYS[8] events stream key + KEYS[9] delay stream key + -- Arguments ARGV[1] key prefix ARGV[2] timestamp ARGV[3] optional jobid ARGV[4] optional jobs per time unit (rate limiter) ARGV[5] optional time unit (rate limiter) - ARGV[6] optional do not do anything with job if rate limit hit ]] local jobId @@ -48,25 +45,22 @@ if jobId then if(maxJobs) then local rateLimiterKey = KEYS[6]; - local jobCounter = tonumber(rcall("GET", rateLimiterKey)) - local bounceBack = ARGV[6] - - -- rate limit hit - if jobCounter ~= nil and jobCounter >= maxJobs then - local delay = tonumber(rcall("PTTL", rateLimiterKey)) + local jobCounter = tonumber(rcall("INCR", rateLimiterKey)) + -- check if rate limit hit + if jobCounter > maxJobs then + local exceedingJobs = jobCounter - maxJobs + local delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs - 1) * ARGV[5]) / maxJobs; local timestamp = delay + tonumber(ARGV[2]) - - if bounceBack == 'false' then - -- put job into delayed queue - rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId) - rcall("PUBLISH", KEYS[7], timestamp) - end + + -- put job into delayed queue + rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId); + rcall("XADD", KEYS[8], "*", "event", "delayed", "jobId", jobId, "delay", timestamp); + rcall("XADD", KEYS[9], "*", "nextTimestamp", timestamp); -- remove from active queue rcall("LREM", KEYS[2], 1, jobId) return else - jobCounter = rcall("INCR", rateLimiterKey) - if tonumber(jobCounter) == 1 then + if jobCounter == 1 then rcall("PEXPIRE", rateLimiterKey, ARGV[5]) end end diff --git a/src/commands/moveToFinished-6.lua b/src/commands/moveToFinished-6.lua index 225f866850..5319361701 100644 --- a/src/commands/moveToFinished-6.lua +++ b/src/commands/moveToFinished-6.lua @@ -1,8 +1,6 @@ --[[ Move job from active to a finished status (completed o failed) A job can only be moved to completed if it was active. - The job must be locked before it can be moved to a finished status, - and the lock must be released in this script. Input: KEYS[1] active key @@ -26,7 +24,6 @@ Output: 0 OK -1 Missing key. - -2 Missing lock. Events: 'completed/failed' @@ -34,14 +31,6 @@ local rcall = redis.call if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists ---[[ if ARGV[5] ~= "0" then - local lockKey = KEYS[3] .. ':lock' - if rcall("GET", lockKey) == ARGV[5] then - rcall("DEL", lockKey) - else - return -2 - end - end ]] -- Remove from active list (if not active we shall return error) local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[1]) @@ -70,12 +59,8 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists local jobId = rcall("RPOPLPUSH", KEYS[4], KEYS[1]) if jobId then local jobKey = ARGV[9] .. jobId - -- local lockKey = jobKey .. ':lock' - -- get a lock - -- rcall("SET", lockKey, ARGV[11], "PX", ARGV[10]) rcall("ZREM", KEYS[5], jobId) -- remove from priority - -- rcall("PUBLISH", KEYS[6], jobId) rcall("XADD", KEYS[6], "*", "event", "active", "jobId", jobId, "prev", "waiting"); rcall("HSET", jobKey, "processedOn", ARGV[2]) diff --git a/src/commands/pause-4.lua b/src/commands/pause-4.lua index bea4bbcdf5..991e3f6316 100644 --- a/src/commands/pause-4.lua +++ b/src/commands/pause-4.lua @@ -5,7 +5,7 @@ KEYS[1] 'wait' or 'paused'' KEYS[2] 'paused' or 'wait' KEYS[3] 'meta-paused' - KEYS[4] 'paused' o 'resumed' event. + KEYS[4] events stream key ARGV[1] 'paused' or 'resumed' @@ -24,4 +24,4 @@ else rcall("DEL", KEYS[3]) end -rcall("PUBLISH", KEYS[4], ARGV[1]) +rcall("XADD", KEYS[4], "*", "event", ARGV[1]); diff --git a/src/commands/updateDelaySet-7.lua b/src/commands/updateDelaySet-7.lua index 97428e3aa7..20629f4f3f 100644 --- a/src/commands/updateDelaySet-7.lua +++ b/src/commands/updateDelaySet-7.lua @@ -16,7 +16,7 @@ ARGV[2] delayed timestamp Events: - 'removed' + 'waiting' ]] local rcall = redis.call; diff --git a/src/interfaces/rate-limiter-opts.ts b/src/interfaces/rate-limiter-opts.ts index a9f1b6f22a..fab6b6f4f2 100644 --- a/src/interfaces/rate-limiter-opts.ts +++ b/src/interfaces/rate-limiter-opts.ts @@ -5,7 +5,4 @@ export interface RateLimiterOpts { // per duration in milliseconds duration: number; - // When jobs get rate limited, they stay in the waiting - // queue and are not moved to the delayed queue - bounceBack?: boolean; } diff --git a/src/test/test_delay.ts b/src/test/test_delay.ts index d4ebd7280e..ed35074757 100644 --- a/src/test/test_delay.ts +++ b/src/test/test_delay.ts @@ -207,5 +207,4 @@ describe('Delayed jobs', function() { return promise; }); - }); diff --git a/src/test/test_pause.ts b/src/test/test_pause.ts new file mode 100644 index 0000000000..8a9287029d --- /dev/null +++ b/src/test/test_pause.ts @@ -0,0 +1,309 @@ +import { Job, Queue } from '@src/classes'; +import { QueueEvents, QueueKeeper } from '../classes'; +import { Worker } from '@src/classes/worker'; +import { expect } from 'chai'; +import IORedis from 'ioredis'; +import { beforeEach, describe, it } from 'mocha'; +import { v4 } from 'node-uuid'; + +const delay = require('delay'); + +describe('Pause', function() { + let queue: Queue; + let queueName: string; + let queueEvents: QueueEvents; + let client: IORedis.Redis; + + beforeEach(function() { + client = new IORedis(); + return client.flushdb(); + }); + + beforeEach(async function() { + queueName = 'test-' + v4(); + queue = new Queue(queueName); + queueEvents = new QueueEvents(queueName); + await queueEvents.init(); + }); + + afterEach(async function() { + await queue.close(); + await queueEvents.close(); + return client.quit(); + }); + + it('should pause a queue until resumed', async () => { + let process; + let isPaused = false; + let counter = 2; + const processPromise = new Promise(resolve => { + process = async (job: Job) => { + expect(isPaused).to.be.eql(false); + expect(job.data.foo).to.be.equal('paused'); + counter--; + if (counter === 0) { + resolve(); + } + }; + }); + + const worker = new Worker(queueName, process); + await worker.waitUntilReady(); + + await queue.pause(); + isPaused = true; + await queue.append('test', { foo: 'paused' }); + await queue.append('test', { foo: 'paused' }); + isPaused = false; + await queue.resume(); + + await processPromise; + return worker.close(); + }); + + it('should be able to pause a running queue and emit relevant events', async () => { + let process; + + let isPaused = false, + isResumed = true, + first = true; + + const processPromise = new Promise((resolve, reject) => { + process = async (job: Job) => { + try { + expect(isPaused).to.be.eql(false); + expect(job.data.foo).to.be.equal('paused'); + + if (first) { + first = false; + isPaused = true; + return queue.pause(); + } else { + expect(isResumed).to.be.eql(true); + await queue.close(); + resolve(); + } + } catch (err) { + reject(err); + } + }; + }); + + const worker = new Worker(queueName, process); + await worker.waitUntilReady(); + + queue.append('test', { foo: 'paused' }); + queue.append('test', { foo: 'paused' }); + + queueEvents.on('paused', async () => { + isPaused = false; + await queue.resume(); + }); + + queueEvents.on('resumed', () => { + isResumed = true; + }); + + return processPromise; + }); + + it('should pause the queue locally', async () => { + let worker: Worker; + let counter = 2; + let process; + const processPromise = new Promise(resolve => { + process = async (job: Job) => { + expect(worker.isPaused()).to.be.eql(false); + counter--; + if (counter === 0) { + await queue.close(); + resolve(); + } + }; + }); + + worker = new Worker(queueName, process); + await worker.waitUntilReady(); + + await worker.pause(); + + // Add the worker after the queue is in paused mode since the normal behavior is to pause + // it after the current lock expires. This way, we can ensure there isn't a lock already + // to test that pausing behavior works. + + await queue.append('test', { foo: 'paused' }); + await queue.append('test', { foo: 'paused' }); + + expect(counter).to.be.eql(2); + expect(worker.isPaused()).to.be.eql(true); + + await worker.resume(); + + return processPromise; + }); + + it('should wait until active jobs are finished before resolving pause', async () => { + let process; + + const startProcessing = new Promise(resolve => { + process = async () => { + resolve(); + return delay(1000); + }; + }); + + const worker = new Worker(queueName, process); + await worker.waitUntilReady(); + + const jobs = []; + for (let i = 0; i < 10; i++) { + jobs.push(queue.append('test', i)); + } + + // + // Add start processing so that we can test that pause waits for this job to be completed. + // + jobs.push(startProcessing); + await Promise.all(jobs); + await worker.pause(); + + let active = await queue.getJobCountByTypes('active'); + expect(active).to.be.eql(0); + expect(worker.isPaused()).to.be.eql(true); + + // One job from the 10 posted above will be processed, so we expect 9 jobs pending + let paused = await queue.getJobCountByTypes('delayed', 'waiting'); + expect(paused).to.be.eql(9); + await Promise.all([active, paused]); + + await queue.append('test', {}); + + active = await queue.getJobCountByTypes('active'); + expect(active).to.be.eql(0); + + paused = await queue.getJobCountByTypes('paused', 'waiting', 'delayed'); + expect(paused).to.be.eql(10); + + await Promise.all([active, paused]); + await worker.close(); + }); + + it('should pause the queue locally when more than one worker is active', async () => { + let process1, process2; + + const startProcessing1 = new Promise(resolve => { + process1 = async () => { + resolve(); + return delay(200); + }; + }); + + const startProcessing2 = new Promise(resolve => { + process2 = async () => { + resolve(); + return delay(200); + }; + }); + + const worker1 = new Worker(queueName, process1); + await worker1.waitUntilReady(); + + const worker2 = new Worker(queueName, process2); + await worker2.waitUntilReady(); + + queue.append('test', 1); + queue.append('test', 2); + queue.append('test', 3); + queue.append('test', 4); + + await Promise.all([startProcessing1, startProcessing2]); + await Promise.all([worker1.pause(), worker2.pause()]); + + const count = await queue.getJobCounts('active', 'waiting', 'completed'); + expect(count.active).to.be.eql(0); + expect(count.waiting).to.be.eql(2); + expect(count.completed).to.be.eql(2); + + return Promise.all([worker1.close(), worker2.close()]); + }); + + it('should wait for blocking job retrieval to complete before pausing locally', async () => { + let process; + + const startProcessing = new Promise(resolve => { + process = async () => { + resolve(); + return delay(200); + }; + }); + + const worker = new Worker(queueName, process); + await worker.waitUntilReady(); + + await queue.append('test', 1); + await startProcessing; + await worker.pause(); + await queue.append('test', 2); + + const count = await queue.getJobCounts('active', 'waiting', 'completed'); + expect(count.active).to.be.eql(0); + expect(count.waiting).to.be.eql(1); + expect(count.completed).to.be.eql(1); + + return worker.close(); + }); + + it('pauses fast when queue is drained', async function() { + const worker = new Worker(queueName, async () => {}); + await worker.waitUntilReady(); + + await queue.append('test', {}); + + return new Promise((resolve, reject) => { + queueEvents.on('drained', async () => { + try { + const start = new Date().getTime(); + await queue.pause(); + + const finish = new Date().getTime(); + expect(finish - start).to.be.lt(1000); + } catch (err) { + reject(err); + } finally { + await worker.close(); + } + resolve(); + }); + }); + }); + + it('should not processed delayed jobs', async function() { + this.timeout(5000); + + const queueKeeper = new QueueKeeper(queueName); + await queueKeeper.init(); + + let processed = false; + + const worker = new Worker(queueName, async () => { + processed = true; + }); + await worker.waitUntilReady(); + + await queue.pause(); + await queue.append('test', {}, { delay: 200 }); + const counts = await queue.getJobCounts('waiting', 'delayed'); + + expect(counts).to.have.property('waiting', 0); + expect(counts).to.have.property('delayed', 1); + + await delay(1000); + if (processed) { + throw new Error('should not process delayed jobs in paused queue.'); + } + const counts2 = await queue.getJobCounts('waiting', 'paused', 'delayed'); + expect(counts2).to.have.property('waiting', 0); + expect(counts2).to.have.property('paused', 1); + expect(counts2).to.have.property('delayed', 0); + }); +}); diff --git a/src/test/test_rate_limiter.ts b/src/test/test_rate_limiter.ts new file mode 100644 index 0000000000..bf05db6384 --- /dev/null +++ b/src/test/test_rate_limiter.ts @@ -0,0 +1,113 @@ +import { Queue } from '@src/classes'; +import { describe, beforeEach, it } from 'mocha'; +import { expect, assert } from 'chai'; +import IORedis from 'ioredis'; +import { v4 } from 'node-uuid'; +import { Worker } from '@src/classes/worker'; +import { after } from 'lodash'; +import { QueueEvents } from '@src/classes/queue-events'; +import { QueueKeeper } from '@src/classes/queue-keeper'; + +describe('Rate Limiter', function() { + let queue: Queue; + let queueName: string; + let queueEvents: QueueEvents; + let client: IORedis.Redis; + + beforeEach(function() { + client = new IORedis(); + return client.flushdb(); + }); + + beforeEach(async function() { + queueName = 'test-' + v4(); + queue = new Queue(queueName); + queueEvents = new QueueEvents(queueName); + await queueEvents.init(); + }); + + afterEach(async function() { + await queue.close(); + await queueEvents.close(); + return client.quit(); + }); + + it('should put a job into the delayed queue when limit is hit', async () => { + const worker = new Worker(queueName, async job => {}, { + limiter: { + max: 1, + duration: 1000, + }, + }); + await worker.waitUntilReady(); + + queueEvents.on('failed', err => { + assert.fail(err); + }); + + await Promise.all([ + queue.append('test', {}), + queue.append('test', {}), + queue.append('test', {}), + queue.append('test', {}), + ]); + + await Promise.all([ + worker.getNextJob(), + worker.getNextJob(), + worker.getNextJob(), + worker.getNextJob(), + ]); + + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.eq(3); + }); + + it('should obey the rate limit', async function() { + this.timeout(20000); + + const numJobs = 4; + const startTime = new Date().getTime(); + + const queueKeeper = new QueueKeeper(queueName); + await queueKeeper.init(); + + const worker = new Worker(queueName, async job => {}, { + limiter: { + max: 1, + duration: 1000, + }, + }); + + const result = new Promise((resolve, reject) => { + queueEvents.on( + 'completed', + // after every job has been completed + after(numJobs, async () => { + await worker.close(); + + try { + const timeDiff = new Date().getTime() - startTime; + expect(timeDiff).to.be.above((numJobs - 1) * 1000); + resolve(); + } catch (err) { + reject(err); + } + }), + ); + + queueEvents.on('failed', async err => { + await worker.close(); + reject(err); + }); + }); + + for (let i = 0; i < numJobs; i++) { + await queue.append('rate test', {}); + } + + await result; + await worker.close(); + await queueKeeper.close(); + }); +}); diff --git a/src/test/test_repeat.ts b/src/test/test_repeat.ts index 73a65f702e..ee8d2e7b11 100644 --- a/src/test/test_repeat.ts +++ b/src/test/test_repeat.ts @@ -156,6 +156,7 @@ describe('repeat', function() { const date = new Date('2017-02-07 9:24:00'); this.clock.tick(date.getTime()); + const nextTick = 2 * ONE_SECOND + 500; await queue.append( @@ -171,7 +172,6 @@ describe('repeat', function() { return new Promise(resolve => { worker.on('completed', async job => { - console.log('Completed'); this.clock.tick(nextTick); if (prev) { expect(prev.timestamp).to.be.lt(job.timestamp); @@ -180,7 +180,7 @@ describe('repeat', function() { prev = job; counter++; console.log('COUNTER', counter); - if (counter == 20) { + if (counter == 5) { await worker.close(); resolve(); } @@ -188,7 +188,7 @@ describe('repeat', function() { }); }); - it('should repeat every 2 seconds with startDate in future', async function(done) { + it('should repeat every 2 seconds with startDate in future', async function() { this.timeout(200000); const queueKeeper = new QueueKeeper(queueName); await queueKeeper.init(); @@ -207,7 +207,7 @@ describe('repeat', function() { { foo: 'bar' }, { repeat: { - cron: '* /2 * * * * *', + cron: '*/2 * * * * *', startDate: new Date('2017-02-07 9:24:05'), }, }, @@ -217,25 +217,34 @@ describe('repeat', function() { let prev: Job; let counter = 0; - worker.on('completed', job => { - console.log('COMPLETED'); - this.clock.tick(nextTick); - if (prev) { - expect(prev.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.gte(2000); - } - prev = job; - counter++; - if (counter == 20) { - done(); - } + + return new Promise((resolve, reject) => { + worker.on('completed', async job => { + this.clock.tick(nextTick); + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + console.log('COUNTER', counter); + if (counter == 5) { + resolve(); + await queueKeeper.close(); + } + }); }); }); - it('should repeat every 2 seconds with startDate in past', async function(done) { + it('should repeat every 2 seconds with startDate in past', async function() { + this.timeout(200000); + const queueKeeper = new QueueKeeper(queueName); + await queueKeeper.init(); + const date = new Date('2017-02-07 9:24:00'); this.clock.tick(date.getTime()); const nextTick = 2 * ONE_SECOND + 500; + const delay = 5 * ONE_SECOND + 500; const worker = new Worker(queueName, async job => { console.log('Working...'); @@ -246,30 +255,40 @@ describe('repeat', function() { { foo: 'bar' }, { repeat: { - cron: '* /2 * * * * *', + cron: '*/2 * * * * *', startDate: new Date('2017-02-07 9:22:00'), }, }, ); - this.clock.tick(nextTick); + + this.clock.tick(nextTick + delay); let prev: Job; let counter = 0; - worker.on('completed', job => { - this.clock.tick(nextTick); - if (prev) { - expect(prev.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.gte(2000); - } - prev = job; - counter++; - if (counter == 20) { - done(); - } + + return new Promise((resolve, reject) => { + worker.on('completed', async job => { + this.clock.tick(nextTick); + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + console.log('COUNTER', counter); + if (counter == 5) { + resolve(); + await queueKeeper.close(); + } + }); }); }); - it.skip('should repeat once a day for 5 days', async function(done) { + // Skipped until we find a way of simulating time to avoid waiting 5 days + it.skip('should repeat once a day for 5 days', async function() { + const queueKeeper = new QueueKeeper(queueName); + await queueKeeper.init(); + const date = new Date('2017-05-05 13:12:00'); this.clock.tick(date.getTime()); const nextTick = ONE_DAY; @@ -292,26 +311,34 @@ describe('repeat', function() { let prev: Job; let counter = 0; - queue.on('completed', async job => { - this.clock.tick(nextTick); - if (prev) { - expect(prev.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.gte(ONE_DAY); - } - prev = job; + return new Promise((resolve, reject) => { + queue.on('completed', async job => { + this.clock.tick(nextTick); + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(ONE_DAY); + } + prev = job; - counter++; - if (counter == 5) { - const waitingJobs = await queue.getWaiting(); - expect(waitingJobs.length).to.be.eql(0); - const delayedJobs = await queue.getDelayed(); - expect(delayedJobs.length).to.be.eql(0); - done(); - } + counter++; + if (counter == 5) { + const waitingJobs = await queue.getWaiting(); + expect(waitingJobs.length).to.be.eql(0); + const delayedJobs = await queue.getDelayed(); + expect(delayedJobs.length).to.be.eql(0); + await queueKeeper.close(); + await worker.close(); + resolve(); + } + }); }); }); - it('should repeat 7:th day every month at 9:25', async function(done) { + // Skipped until we find a way of simulating time to avoid waiting a month + it.skip('should repeat 7:th day every month at 9:25', async function(done) { + const queueKeeper = new QueueKeeper(queueName); + await queueKeeper.init(); + const date = new Date('2017-02-02 7:21:42'); this.clock.tick(date.getTime()); @@ -377,7 +404,7 @@ describe('repeat', function() { this.clock.tick(date.getTime()); const nextTick = 2 * ONE_SECOND; - const repeat = { cron: '* /2 * * * * *' }; + const repeat = { cron: '*/2 * * * * *' }; const worker = new Worker(queueName, async job => { counter++; @@ -413,7 +440,7 @@ describe('repeat', function() { this.clock.tick(date.getTime()); const nextTick = 2 * ONE_SECOND; - const repeat = { cron: '* /2 * * * * *' }; + const repeat = { cron: '*/2 * * * * *' }; await queue.append( 'test',