Skip to content

Commit

Permalink
feat: ported tests and functionality from bull 3
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jul 20, 2019
1 parent 5f07618 commit 1b6b192
Show file tree
Hide file tree
Showing 18 changed files with 645 additions and 220 deletions.
16 changes: 11 additions & 5 deletions src/classes/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
export * from './queue';
export * from './job';
export * from './redis-connection';
export * from './scripts';
export * from './backoffs';
export * from './backoffs';
export * from './job';
export * from './queue-base';
export * from './queue-events';
export * from './queue-getters';
export * from './queue-keeper';
export * from './queue';
export * from './redis-connection';
export * from './repeat';
export * from './scripts';
export * from './worker';
15 changes: 7 additions & 8 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ export class Job {
* Moves a job to the completed queue.
* Returned job to be used with Queue.prototype.nextJobFromJobData.
* @param returnValue {string} The jobs success message.
* @param ignoreLock {boolean} True when wanting to ignore the redis lock on this job.
* @param fetchNext {boolean} True when wanting to fetch the next job
* @returns {Promise} Returns the jobData of the next job in the waiting queue.
*/
async moveToCompleted(
returnValue: any,
ignoreLock = true,
fetchNext = true,
): Promise<[JobJson, string]> {
await this.queue.waitUntilReady();

Expand All @@ -192,17 +192,17 @@ export class Job {
this,
returnValue,
this.opts.removeOnComplete,
ignoreLock,
fetchNext,
);
}

/**
* Moves a job to the failed queue.
* @param err {Error} The jobs error message.
* @param ignoreLock {boolean} True when wanting to ignore the redis lock on this job.
* @param fetchNext {boolean} True when wanting to fetch the next job
* @returns void
*/
async moveToFailed(err: Error, ignoreLock = true) {
async moveToFailed(err: Error, fetchNext = false) {
await this.queue.waitUntilReady();

const queue = this.queue;
Expand Down Expand Up @@ -235,13 +235,12 @@ export class Job {
queue,
this.id,
Date.now() + delay,
ignoreLock,
);
(<any>multi).moveToDelayed(args);
command = 'delayed';
} else {
// If not, retry immediately
(<any>multi).retryJob(Scripts.retryJobArgs(queue, this, ignoreLock));
(<any>multi).retryJob(Scripts.retryJobArgs(queue, this));
command = 'retry';
}
} else {
Expand All @@ -255,7 +254,7 @@ export class Job {
this,
err.message,
this.opts.removeOnFail,
ignoreLock,
fetchNext,
);
(<any>multi).moveToFinished(args);
command = 'failed';
Expand Down
11 changes: 3 additions & 8 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import { RedisConnection } from './redis-connection';
import IORedis from 'ioredis';
import {
QueueBaseOptions,
QueueEventsOptions,
QueueOptions,
} from '@src/interfaces';
import { QueueBaseOptions } from '@src/interfaces';
import { EventEmitter } from 'events';
import { WorkerOptions } from '@src/interfaces/worker-opts';
import IORedis from 'ioredis';
import { RedisConnection } from './redis-connection';

export class QueueBase extends EventEmitter {
keys: { [index: string]: string };
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { QueueEventsOptions } from '@src/interfaces';
import { QueueBase } from './queue-base';
import { array2obj } from '../utils';
import { Job } from './job';

export class QueueEvents extends QueueBase {
constructor(name: string, opts?: QueueEventsOptions) {
Expand Down
15 changes: 10 additions & 5 deletions src/classes/queue-keeper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class QueueKeeper extends QueueBase {
async init() {
await this.waitUntilReady();

// TODO: updateDelaySet should also retun the lastDelayStreamTimestamp
// TODO: updateDelaySet should also return the lastDelayStreamTimestamp
const timestamp = await Scripts.updateDelaySet(this, Date.now());

if (timestamp) {
Expand All @@ -53,10 +53,12 @@ export class QueueKeeper extends QueueBase {
while (!this.closing) {
// Listen to the delay event stream from lastDelayStreamTimestamp
// Can we use XGROUPS to reduce redundancy?
const blockTime = Math.round(Math.min(
(<QueueKeeperOptions>this.opts).stalledInterval,
Math.max(this.nextTimestamp - Date.now(), 0),
));
const blockTime = Math.round(
Math.min(
(<QueueKeeperOptions>this.opts).stalledInterval,
Math.max(this.nextTimestamp - Date.now(), 0),
),
);

const data = await this.client.xread(
'BLOCK',
Expand All @@ -83,6 +85,9 @@ export class QueueKeeper extends QueueBase {

const now = Date.now();
const delay = this.nextTimestamp - now;

console.log('DELAY', delay, this.nextTimestamp);

if (delay <= 0) {
const nextTimestamp = await Scripts.updateDelaySet(this, now);
if (nextTimestamp) {
Expand Down
6 changes: 2 additions & 4 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,15 @@ export class RedisConnection {

await RedisConnection.waitUntilReady(this.client);

this.client.on('error', err => {
this.client.on('error', (err: Error) => {
console.error(err);
});

if ((<RedisOpts>this.opts).skipVersionCheck !== true) {
const version = await this.getRedisVersion();
if (semver.lt(version, RedisConnection.minimumVersion)) {
throw new Error(
`Redis version needs to be greater than ${
RedisConnection.minimumVersion
} Current: ${version}`,
`Redis version needs to be greater than ${RedisConnection.minimumVersion} Current: ${version}`,
);
}
}
Expand Down
80 changes: 28 additions & 52 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@
/*eslint-env node */
'use strict';

import { debuglog } from 'util';
import { QueueKeeperOptions } from '@src/interfaces';
import { WorkerOptions } from '@src/interfaces/worker-opts';
import IORedis from 'ioredis';
import { Queue } from './queue';
import { Job, JobJson } from './job';
import { JobsOpts } from '../interfaces';
import { QueueBase } from './queue-base';
import { Worker } from './worker';
import { WorkerOptions } from '@src/interfaces/worker-opts';
import { array2obj } from '../utils';
import { Job, JobJson } from './job';
import { Queue } from './queue';
import { QueueBase } from './queue-base';
import { QueueKeeper } from './queue-keeper';
import { QueueKeeperOptions } from '@src/interfaces';

const logger = debuglog('bull');
import { Worker } from './worker';

export class Scripts {
static async isJobInList(
Expand Down Expand Up @@ -71,10 +68,12 @@ export class Scripts {
dst = 'wait';
}

const keys = [src, dst, 'meta-paused', pause ? 'paused' : 'resumed'].map(
(name: string) => queue.toKey(name),
const keys = [src, dst, 'meta-paused'].map((name: string) =>
queue.toKey(name),
);

keys.push(queue.eventStreamKey());

return (<any>queue.client).pause(
keys.concat([pause ? 'paused' : 'resumed']),
);
Expand Down Expand Up @@ -123,8 +122,7 @@ export class Scripts {
propVal: string,
shouldRemove: boolean,
target: string,
ignoreLock: boolean,
notFetch?: boolean,
fetchNext = true,
) {
const queueKeys = queue.keys;

Expand All @@ -145,7 +143,9 @@ export class Scripts {
target,
shouldRemove ? '1' : '0',
JSON.stringify({ jobId: job.id, val: val }),
notFetch || queue.closing || (<WorkerOptions>queue.opts).limiter ? 0 : 1,
!fetchNext || queue.closing || (<WorkerOptions>queue.opts).limiter
? 0
: 1,
queueKeys[''],
];

Expand All @@ -159,7 +159,7 @@ export class Scripts {
propVal: string,
shouldRemove: boolean,
target: string,
ignoreLock: boolean,
fetchNext: boolean,
) {
const args = this.moveToFinishedArgs(
queue,
Expand All @@ -168,7 +168,7 @@ export class Scripts {
propVal,
shouldRemove,
target,
ignoreLock,
fetchNext,
);
const result = await (<any>queue.client).moveToFinished(args);
if (result < 0) {
Expand All @@ -193,7 +193,7 @@ export class Scripts {
job: Job,
returnvalue: any,
removeOnComplete: boolean,
ignoreLock: boolean,
fetchNext: boolean,
): Promise<[JobJson, string]> {
return this.moveToFinished(
queue,
Expand All @@ -202,7 +202,7 @@ export class Scripts {
'returnvalue',
removeOnComplete,
'completed',
ignoreLock,
fetchNext,
);
}

Expand All @@ -211,7 +211,7 @@ export class Scripts {
job: Job,
failedReason: string,
removeOnFailed: boolean,
ignoreLock: boolean,
fetchNext = false,
) {
return this.moveToFinishedArgs(
queue,
Expand All @@ -220,8 +220,7 @@ export class Scripts {
'failedReason',
removeOnFailed,
'failed',
ignoreLock,
true,
fetchNext,
);
}

Expand All @@ -234,12 +233,7 @@ export class Scripts {
}

// Note: We have an issue here with jobs using custom job ids
static moveToDelayedArgs(
queue: QueueBase,
jobId: string,
timestamp: number,
ignoreLock: boolean,
) {
static moveToDelayedArgs(queue: QueueBase, jobId: string, timestamp: number) {
//
// Bake in the job id first 12 bits into the timestamp
// to guarantee correct execution order of delayed jobs
Expand All @@ -258,20 +252,11 @@ export class Scripts {
const keys = ['active', 'delayed', jobId].map(function(name) {
return queue.toKey(name);
});
return keys.concat([
JSON.stringify(timestamp),
jobId,
ignoreLock ? '0' : 'queue.token',
]);
return keys.concat([JSON.stringify(timestamp), jobId]);
}

static async moveToDelayed(
queue: Queue,
jobId: string,
timestamp: number,
ignoreLock: boolean,
) {
const args = this.moveToDelayedArgs(queue, jobId, timestamp, ignoreLock);
static async moveToDelayed(queue: Queue, jobId: string, timestamp: number) {
const args = this.moveToDelayedArgs(queue, jobId, timestamp);
const result = await (<any>queue.client).moveToDelayed(args);
switch (result) {
case -1:
Expand All @@ -280,16 +265,10 @@ export class Scripts {
jobId +
' when trying to move from active to delayed',
);
case -2:
throw new Error(
'Job ' +
jobId +
' was locked when trying to move from active to delayed',
);
}
}

static retryJobArgs(queue: QueueBase, job: Job, ignoreLock: boolean) {
static retryJobArgs(queue: QueueBase, job: Job) {
const jobId = job.id;

const keys = ['active', 'wait', jobId].map(function(name) {
Expand All @@ -298,7 +277,7 @@ export class Scripts {

const pushCmd = (job.opts.lifo ? 'R' : 'L') + 'PUSH';

return keys.concat([pushCmd, jobId, ignoreLock ? '0' : 'queue.token']);
return keys.concat([pushCmd, jobId]);
}

static moveToActive(queue: Worker, jobId: string) {
Expand All @@ -310,6 +289,7 @@ export class Scripts {
keys[5] = queueKeys.limiter;
keys[6] = queueKeys.delayed;
keys[7] = queue.eventStreamKey();
keys[8] = queue.delayStreamKey();

const args: (string | number | boolean)[] = [
queueKeys[''],
Expand All @@ -320,11 +300,7 @@ export class Scripts {
const opts: WorkerOptions = <WorkerOptions>queue.opts;

if (opts.limiter) {
args.push(
opts.limiter.max,
opts.limiter.duration,
!!opts.limiter.bounceBack,
);
args.push(opts.limiter.max, opts.limiter.duration);
}
return (<any>queue.client)
.moveToActive((<(string | number | boolean)[]>keys).concat(args))
Expand Down
Loading

0 comments on commit 1b6b192

Please sign in to comment.