Skip to content

Commit

Permalink
fix: minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stansv committed Aug 16, 2019
1 parent 51f75a4 commit 7791cda
Showing 1 changed file with 41 additions and 59 deletions.
100 changes: 41 additions & 59 deletions src/classes/compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ export class Queue3<T = any> extends EventEmitter {
* Empties a queue deleting all the input lists and associated jobs.
*/
empty(): Promise<void> {
throw new Error('Not supported');
return this.getQueue().drain(true);
}

/**
Expand Down Expand Up @@ -992,8 +992,8 @@ export class Queue3<T = any> extends EventEmitter {
return (this.getQueue() as any).parseClientList(list);
}

retryJob(job: Job3) {
throw new Error('Not supported');
retryJob(job: Job3): Promise<void> {
return job.retry();
}

private getQueueScheduler() {
Expand Down Expand Up @@ -1087,14 +1087,14 @@ export class Queue3<T = any> extends EventEmitter {
case 'completed':
if (once) {
this.onWorkerInit(worker => {
worker.once('completed', (job, result, prev) => {
listener(job, result, prev);
worker.once('completed', (job, returnvalue, prev) => {
listener(job, returnvalue, prev);
});
});
} else {
this.onWorkerInit(worker => {
worker.on('completed', (job, result, prev) => {
listener(job, result, prev);
worker.on('completed', (job, returnvalue, prev) => {
listener(job, returnvalue, prev);
});
});
}
Expand All @@ -1120,14 +1120,14 @@ export class Queue3<T = any> extends EventEmitter {
case 'failed':
if (once) {
this.onWorkerInit(worker => {
worker.once('failed', (job, error, prev) => {
listener(job, error, prev);
worker.once('failed', (job, failedReason, prev) => {
listener(job, failedReason, prev);
});
});
} else {
this.onWorkerInit(worker => {
worker.on('failed', (job, error, prev) => {
listener(job, error, prev);
worker.on('failed', (job, failedReason, prev) => {
listener(job, failedReason, prev);
});
});
}
Expand Down Expand Up @@ -1176,22 +1176,12 @@ export class Queue3<T = any> extends EventEmitter {
break;
case 'progress':
if (once) {
this.onWorkerInit(worker => {
worker.once('progress', (job, progress) => {
listener(job, progress);
});
});
this.getQueue().once('progress', () => {
listener();
this.getQueue().once('progress', (job, progress) => {
listener(job, progress);
});
} else {
this.onWorkerInit(worker => {
worker.on('progress', (job, progress) => {
listener(job, progress);
});
});
this.getQueue().on('progress', () => {
listener();
this.getQueue().on('progress', (job, progress) => {
listener(job, progress);
});
}
break;
Expand All @@ -1200,22 +1190,12 @@ export class Queue3<T = any> extends EventEmitter {
break;
case 'waiting':
if (once) {
this.onWorkerInit(worker => {
worker.once('waiting', (job, progress) => {
listener(job, progress);
});
});
this.getQueue().once('waiting', () => {
listener();
this.getQueue().once('waiting', job => {
listener(job.id, null);
});
} else {
this.onWorkerInit(worker => {
worker.on('waiting', (job, progress) => {
listener(job, progress);
});
});
this.getQueue().on('waiting', () => {
listener();
this.getQueue().on('waiting', job => {
listener(job.id, null);
});
}
break;
Expand Down Expand Up @@ -1638,8 +1618,20 @@ export class Job3<T = any> {
* it atomic. If your queue does have a very large quantity of jobs, you may want to
* avoid using this method.
*/
getState(): Promise<JobStatus3> {
throw new Error('Not supported');
async getState(): Promise<JobStatus3> {
const result = await this.job.getState();
switch (result) {
case 'completed':
return 'completed';
case 'failed':
return 'failed';
case 'delayed':
return 'delayed';
case 'active':
return 'active';
case 'waiting':
return 'waiting';
}
}

/**
Expand All @@ -1662,7 +1654,7 @@ export class Job3<T = any> {
* has been scheduled for retry.
*/
retry(): Promise<void> {
throw new Error('Not supported');
return this.job.retry();
}

/**
Expand Down Expand Up @@ -1721,14 +1713,17 @@ export class Job3<T = any> {
}

moveToDelayed(timestamp?: number, ignoreLock = false): Promise<void> {
throw new Error('Not supported');
if (ignoreLock) {
console.warn('ignoreLock is not supported');
}
return this.job.moveToDelayed(timestamp);
}

/**
* Promotes a job that is currently "delayed" to the "waiting" state and executed as soon as possible.
*/
promote(): Promise<void> {
throw new Error('Not supported');
return this.job.promote();
}

/**
Expand Down Expand Up @@ -2373,21 +2368,8 @@ class Utils {
target.jobId = Utils.convertToJobId(source.jobId);
}

if (source.removeOnComplete !== undefined) {
if (typeof source.removeOnComplete === 'number') {
console.warn('numeric removeOnComplete option is not supported');
} else {
target.removeOnComplete = source.removeOnComplete;
}
}

if (source.removeOnFail !== undefined) {
if (typeof source.removeOnFail === 'number') {
console.warn('numeric removeOnFail option is not supported');
} else {
target.removeOnFail = source.removeOnFail;
}
}
target.removeOnComplete = source.removeOnComplete;
target.removeOnFail = source.removeOnFail;
target.stackTraceLimit = source.stackTraceLimit;
return target;
}
Expand Down

0 comments on commit 7791cda

Please sign in to comment.