Skip to content

Commit

Permalink
feat: port more features from bull 3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Aug 13, 2019
1 parent e67a911 commit 75bd261
Show file tree
Hide file tree
Showing 18 changed files with 1,176 additions and 116 deletions.
90 changes: 78 additions & 12 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { BackoffOpts } from '@src/interfaces/backoff-opts';
import { WorkerOptions } from '@src/interfaces/worker-opts';
import IORedis from 'ioredis';
import { JobsOpts } from '../interfaces';
import { debuglog } from 'util';
import { Scripts } from './scripts';
import { JobsOpts } from '../interfaces';
import { errorObject, isEmpty, tryCatch } from '../utils';
import { Backoffs } from './backoffs';
import { tryCatch, errorObject, isEmpty } from '../utils';
import { BackoffOpts } from '@src/interfaces/backoff-opts';
import { QueueEvents } from './queue-events';
import { QueueBase } from './queue-base';
import { WorkerOptions } from '@src/interfaces/worker-opts';
import { QueueEvents } from './queue-events';
import { Scripts } from './scripts';

const logger = debuglog('bull');

Expand All @@ -27,7 +27,6 @@ export interface JobJson {
}

export class Job {
id: string;
progress: number | object = 0;
returnvalue: any = null;
stacktrace: string[] = null;
Expand All @@ -46,6 +45,7 @@ export class Job {
public name: string,
public data: any,
public opts: JobsOpts = {},
public id?: string,
) {
this.opts = Object.assign(
{
Expand All @@ -67,24 +67,24 @@ export class Job {
name: string,
data: any,
opts?: JobsOpts,
jobId?: string,
) {
await queue.waitUntilReady();

const job = new Job(queue, name, data, opts);
const job = new Job(queue, name, data, opts, jobId);

job.id = await job.addJob(queue.client);

logger('Job added', job.id);
return job;
}

static fromJSON(queue: QueueBase, json: any, jobId: string) {
static fromJSON(queue: QueueBase, json: any, jobId?: string) {
const data = JSON.parse(json.data || '{}');
const opts = JSON.parse(json.opts || '{}');

const job = new Job(queue, json.name, data, opts);
const job = new Job(queue, json.name, data, opts, json.id || jobId);

job.id = json.id || jobId;
job.progress = JSON.parse(json.progress || 0);

// job.delay = parseInt(json.delay);
Expand Down Expand Up @@ -151,6 +151,17 @@ export class Job {
return Scripts.updateProgress(this.queue, this, progress);
}

/**
* Logs one row of log data.
*
* @params logRow: string String with log data to be logged.
*
*/
log(logRow: string) {
const logsKey = this.toKey(this.id) + ':logs';
return this.queue.client.rpush(logsKey, logRow);
}

async remove() {
await this.queue.waitUntilReady();

Expand Down Expand Up @@ -285,6 +296,15 @@ export class Job {
return (await this.isInList('wait')) || (await this.isInList('paused'));
}

async getState() {
if (await this.isCompleted()) return 'completed';
if (await this.isFailed()) return 'failed';
if (await this.isDelayed()) return 'delayed';
if (await this.isActive()) return 'active';
if (await this.isWaiting()) return 'waiting';
return 'unknown';
}

/**
* Returns a promise the resolves when the job has finished. (completed or failed).
*/
Expand Down Expand Up @@ -354,6 +374,51 @@ export class Job {
}
}

moveToDelayed(timestamp: number) {
return Scripts.moveToDelayed(this.queue, this.id, timestamp);
}

async promote() {
const queue = this.queue;
const jobId = this.id;

const result = await Scripts.promote(queue, jobId);
if (result === -1) {
throw new Error('Job ' + jobId + ' is not in a delayed state');
}
}

/**
* Attempts to retry the job. Only a job that has failed can be retried.
*
* @return {Promise} If resolved and return code is 1, then the queue emits a waiting event
* otherwise the operation was not a success and throw the corresponding error. If the promise
* rejects, it indicates that the script failed to execute
*/
async retry(state: 'completed' | 'failed' = 'failed') {
await this.queue.waitUntilReady();

this.failedReason = null;
this.finishedOn = null;
this.processedOn = null;

await this.queue.client.hdel(
this.queue.toKey(this.id),
'finishedOn',
'processedOn',
'failedReason',
);

const result = await Scripts.reprocessJob(this.queue, this, state);
if (result === 1) {
return;
} else if (result === 0) {
throw new Error('Retried job does not exist');
} else if (result === -2) {
throw new Error('Retried job not failed');
}
}

private async isInZSet(set: string) {
const score = await this.queue.client.zscore(
this.queue.toKey(set),
Expand All @@ -374,7 +439,8 @@ export class Job {
const queue = this.queue;

const jobData = this.toJSON();
return Scripts.addJob(client, queue, jobData, this.opts);

return Scripts.addJob(client, queue, jobData, this.opts, this.id);
}

private saveAttempt(multi: IORedis.Pipeline, err: Error) {
Expand Down
22 changes: 21 additions & 1 deletion src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { QueueBase } from './queue-base';
import { Job } from './job';
import { clientCommandMessageReg } from './worker';

export class QueueGetters extends QueueBase {
getJob(jobId: string) {
Expand Down Expand Up @@ -163,10 +164,29 @@ export class QueueGetters extends QueueBase {
return Promise.all(jobIds.map(jobId => Job.fromId(this, jobId)));
}

async getJobLogs(jobId: string, start = 0, end = -1) {
const multi = this.client.multi();

const logsKey = this.toKey(jobId + ':logs');
multi.lrange(logsKey, -(end + 1), -(start + 1));
multi.llen(logsKey);
return multi.exec().then(result => ({
logs: result[0][1],
count: result[1][1],
}));
}

async getWorkers() {
await this.waitUntilReady();
const clients = await this.client.client('list');
return this.parseClientList(clients);
try {
const list = await this.parseClientList(clients);
return list;
} catch (err) {
if (!clientCommandMessageReg.test(err.message)) {
throw err;
}
}
}

private parseClientList(list: string) {
Expand Down
93 changes: 88 additions & 5 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,35 @@ export class Queue extends QueueGetters {
token = v4();
limiter: RateLimiterOpts = null;
repeat: Repeat;
jobsOpts: JobsOpts;

constructor(name: string, opts?: QueueBaseOptions) {
constructor(
name: string,
opts?: QueueBaseOptions & { defaultJobOptions?: JobsOpts },
) {
super(name, opts);

this.repeat = new Repeat(name, {
...opts,
connection: this.client,
});

this.jobsOpts = opts && opts.defaultJobOptions;
}

async append(jobName: string, data: any, opts?: JobsOpts) {
if (opts && opts.repeat) {
return this.repeat.addNextRepeatableJob(
jobName,
data,
opts,
opts.jobId,
{ ...opts, ...this.jobsOpts },
true,
);
} else {
const job = await Job.create(this, jobName, data, opts);
const job = await Job.create(this, jobName, data, {
...opts,
...this.jobsOpts,
});
this.emit('waiting', job);
return job;
}
Expand Down Expand Up @@ -63,5 +71,80 @@ export class Queue extends QueueGetters {
return this.repeat.removeRepeatable(name, repeatOpts, jobId);
}

async drain() {}
removeRepeatableByKey(key: string) {
return this.repeat.removeRepeatableByKey(key);
}

/**
* Drains the queue, i.e., removes all jobs that are waiting
* or delayed, but not active, completed or failed.
*
* TODO: Convert to an atomic LUA script.
*/
async drain(delayed = false) {
// Get all jobids and empty all lists atomically.
let multi = this.client.multi();

multi.lrange(this.toKey('wait'), 0, -1);
multi.lrange(this.toKey('paused'), 0, -1);
if (delayed) {
// TODO: get delayed jobIds too!
multi.del(this.toKey('delayed'));
}
multi.del(this.toKey('wait'));
multi.del(this.toKey('paused'));
multi.del(this.toKey('meta-paused'));
multi.del(this.toKey('priority'));

const [waiting, paused] = await multi.exec();
const waitingjobs = waiting[1];
const pausedJobs = paused[1];

const jobKeys = pausedJobs.concat(waitingjobs).map(this.toKey, this);

if (jobKeys.length) {
multi = this.client.multi();

multi.del.apply(multi, jobKeys);
return multi.exec();
}
}

/*@function clean
*
* Cleans jobs from a queue. Similar to remove but keeps jobs within a certain
* grace period.
*
* @param {int} grace - The grace period
* @param {string} [type=completed] - The type of job to clean
* Possible values are completed, wait, active, paused, delayed, failed. Defaults to completed.
* @param {int} The max number of jobs to clean
*/
async clean(grace: number, type = 'completed', limit: number) {
await this.waitUntilReady();

if (grace === undefined || grace === null) {
throw new Error('You must define a grace period.');
}

if (!type) {
type = 'completed';
}

if (
['completed', 'wait', 'active', 'paused', 'delayed', 'failed'].indexOf(
type,
) === -1
) {
throw new Error('Cannot clean unknown queue type ' + type);
}

const jobs = await Scripts.cleanJobsInSet(
this,
type,
Date.now() - grace,
limit,
);
return jobs;
}
}
Loading

0 comments on commit 75bd261

Please sign in to comment.