Skip to content

Commit

Permalink
feat: add some new tests for compat class, more minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stansv committed Aug 21, 2019
1 parent 7590cea commit bc0f653
Show file tree
Hide file tree
Showing 2 changed files with 760 additions and 70 deletions.
204 changes: 134 additions & 70 deletions src/classes/compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ export class Queue3<T = any> extends EventEmitter {
}
}

isPaused(): boolean {
return this.worker && this.worker.isPaused();
}

/**
* Returns a promise that returns the number of jobs in the queue, waiting or paused.
* Since there may be other processes adding or processing jobs,
Expand Down Expand Up @@ -677,7 +681,7 @@ export class Queue3<T = any> extends EventEmitter {
* Optional parameters for range and ordering are provided.
*/
getJobs(
types: string[],
types: string[] | string,
start = 0,
end = -1,
asc = false,
Expand Down Expand Up @@ -705,16 +709,14 @@ export class Queue3<T = any> extends EventEmitter {
/**
* Returns a promise that resolves with the job counts for the given queue.
*/
getJobCounts(
types?: string[] | string,
): Promise<{ [index: string]: number }> {
getJobCounts(...types: string[]): Promise<{ [index: string]: number }> {
return this.getQueue().getJobCounts(...Utils.parseTypeArg(types));
}

/**
* Returns a promise that resolves with the job counts for the given queue of the given types.
*/
async getJobCountByTypes(types?: string[] | string): Promise<number> {
async getJobCountByTypes(...types: string[]): Promise<number> {
return this.getQueue().getJobCountByTypes(...Utils.parseTypeArg(types));
}

Expand Down Expand Up @@ -1020,6 +1022,7 @@ export class Queue3<T = any> extends EventEmitter {
this.name,
Utils.convertToQueueEventsOptions(this.opts),
);
this.queueEvents.init();
}
return this.queueEvents;
}
Expand Down Expand Up @@ -1089,15 +1092,11 @@ export class Queue3<T = any> extends EventEmitter {
case 'drained':
if (once) {
this.onWorkerInit(worker => {
worker.once('drained', () => {
listener();
});
worker.once('drained', listener);
});
} else {
this.onWorkerInit(worker => {
worker.on('drained', () => {
listener();
});
worker.on('drained', listener);
});
}
break;
Expand Down Expand Up @@ -1216,13 +1215,9 @@ export class Queue3<T = any> extends EventEmitter {
break;
case 'global:drained':
if (once) {
this.getQueueEvents().once('drained', () => {
listener();
});
this.getQueueEvents().once('drained', listener);
} else {
this.getQueueEvents().on('drained', () => {
listener();
});
this.getQueueEvents().on('drained', listener);
}
break;
case 'global:failed':
Expand Down Expand Up @@ -1612,13 +1607,15 @@ class Utils {

const target: QueueBaseOptions = {};

if (source.redis) {
if (source.redis !== undefined) {
const client = new IORedis(source.redis);
target.connection = client;
target.client = client;
}

target.prefix = source.prefix;
if (source.prefix !== undefined) {
target.prefix = source.prefix;
}

return target;
}
Expand All @@ -1630,13 +1627,18 @@ class Utils {

const target: QueueOptions = Utils.convertToQueueBaseOptions(source);

target.defaultJobOptions = Utils.convertToJobsOpts(
source.defaultJobOptions,
);
target.createClient = Utils.adaptToCreateClient(
source.createClient,
source.redis,
);
if (source.defaultJobOptions) {
target.defaultJobOptions = Utils.convertToJobsOpts(
source.defaultJobOptions,
);
}

if (source.createClient) {
target.createClient = Utils.adaptToCreateClient(
source.createClient,
source.redis,
);
}

return target;
}
Expand All @@ -1650,8 +1652,8 @@ class Utils {

const target: QueueEventsOptions = Utils.convertToQueueBaseOptions(source);

target.lastEventId = undefined;
target.blockingTimeout = undefined;
// target.lastEventId = undefined;
// target.blockingTimeout = undefined;

return target;
}
Expand All @@ -1666,8 +1668,13 @@ class Utils {
const target: QueueKeeperOptions = Utils.convertToQueueBaseOptions(source);

if (source.settings) {
target.maxStalledCount = source.settings.maxStalledCount;
target.stalledInterval = source.settings.stalledInterval;
if (source.settings.maxStalledCount !== undefined) {
target.maxStalledCount = source.settings.maxStalledCount;
}

if (source.settings.stalledInterval !== undefined) {
target.stalledInterval = source.settings.stalledInterval;
}
}

return target;
Expand All @@ -1679,12 +1686,21 @@ class Utils {
}

const target: JobsOpts = {};

target.timestamp = (source as any).timestamp;
target.priority = source.priority;
target.delay = source.delay;
target.attempts = source.attempts;
target.repeat = Utils.convertToRepeatOpts(source.repeat);
if ((source as any).timestamp !== undefined) {
target.timestamp = (source as any).timestamp;
}
if (source.priority !== undefined) {
target.priority = source.priority;
}
if (source.delay !== undefined) {
target.delay = source.delay;
}
if (source.attempts !== undefined) {
target.attempts = source.attempts;
}
if (source.repeat !== undefined) {
target.repeat = Utils.convertToRepeatOpts(source.repeat);
}

if (source.backoff !== undefined) {
if (typeof source.backoff === 'number') {
Expand All @@ -1694,16 +1710,26 @@ class Utils {
}
}

target.lifo = source.lifo;
target.timeout = source.timeout;

if (source.lifo !== undefined) {
target.lifo = source.lifo;
}
if (source.timeout !== undefined) {
target.timeout = source.timeout;
}
if (source.jobId !== undefined) {
target.jobId = source.jobId;
}

target.removeOnComplete = source.removeOnComplete;
target.removeOnFail = source.removeOnFail;
target.stackTraceLimit = source.stackTraceLimit;
if (source.removeOnComplete !== undefined) {
target.removeOnComplete = source.removeOnComplete;
}
if (source.removeOnFail !== undefined) {
target.removeOnFail = source.removeOnFail;
}
if (source.stackTraceLimit !== undefined) {
target.stackTraceLimit = source.stackTraceLimit;
}

return target;
}

Expand All @@ -1716,14 +1742,26 @@ class Utils {

const target: RepeatOpts = {};

target.cron = (source as CronRepeatOptions3).cron;
target.tz = (source as CronRepeatOptions3).tz;
target.startDate = (source as CronRepeatOptions3).startDate;
target.endDate = (source as CronRepeatOptions3).endDate;
target.limit = (source as EveryRepeatOptions3).limit;
target.every = (source as EveryRepeatOptions3).every;
target.count = undefined;
target.prevMillis = undefined;
if ((source as CronRepeatOptions3).cron !== undefined) {
target.cron = (source as CronRepeatOptions3).cron;
}
if ((source as CronRepeatOptions3).tz !== undefined) {
target.tz = (source as CronRepeatOptions3).tz;
}
if ((source as CronRepeatOptions3).startDate !== undefined) {
target.startDate = (source as CronRepeatOptions3).startDate;
}
if ((source as CronRepeatOptions3).endDate !== undefined) {
target.endDate = (source as CronRepeatOptions3).endDate;
}
if ((source as EveryRepeatOptions3).limit !== undefined) {
target.limit = (source as EveryRepeatOptions3).limit;
}
if ((source as EveryRepeatOptions3).every !== undefined) {
target.every = (source as EveryRepeatOptions3).every;
}
// target.count = undefined;
// target.prevMillis = undefined;

return target;
}
Expand All @@ -1735,8 +1773,12 @@ class Utils {

const target: BackoffOpts = { type: undefined, delay: undefined };

target.type = source.type;
target.delay = source.delay;
if (source.type !== undefined) {
target.type = source.type;
}
if (source.delay !== undefined) {
target.delay = source.delay;
}

return target;
}
Expand All @@ -1747,14 +1789,19 @@ class Utils {
}
const target: WorkerOptions = Utils.convertToQueueBaseOptions(source);

target.concurrency = undefined;
target.limiter = Utils.convertToRateLimiterOpts(source.limiter);
target.skipDelayCheck = undefined;
target.drainDelay = source.settings
? source.settings.drainDelay
: undefined;
target.visibilityWindow = undefined;
target.settings = Utils.convertToAdvancedOpts(source.settings);
// target.concurrency = undefined;
if (source.limiter !== undefined) {
target.limiter = Utils.convertToRateLimiterOpts(source.limiter);
}
// target.skipDelayCheck = undefined;
if (source.settings && source.settings.drainDelay !== undefined) {
target.drainDelay = source.settings.drainDelay;
}

// target.visibilityWindow = undefined;
if (source.settings) {
target.settings = Utils.convertToAdvancedOpts(source.settings);
}

return target;
}
Expand All @@ -1766,8 +1813,12 @@ class Utils {

const target: RateLimiterOpts = { max: undefined, duration: undefined };

target.max = source.max;
target.duration = source.duration;
if (source.max !== undefined) {
target.max = source.max;
}
if (source.duration !== undefined) {
target.duration = source.duration;
}

if (source.bounceBack !== undefined) {
console.warn('bounceBack option is not supported');
Expand All @@ -1783,14 +1834,27 @@ class Utils {

const target: AdvancedOpts = {};

target.lockDuration = source.lockDuration;
target.stalledInterval = source.stalledInterval;
target.maxStalledCount = source.maxStalledCount;
target.guardInterval = source.guardInterval;
target.retryProcessDelay = source.retryProcessDelay;
target.backoffStrategies = source.backoffStrategies;
target.drainDelay = source.drainDelay;

if (source.lockDuration !== undefined) {
target.lockDuration = source.lockDuration;
}
if (source.stalledInterval !== undefined) {
target.stalledInterval = source.stalledInterval;
}
if (source.maxStalledCount !== undefined) {
target.maxStalledCount = source.maxStalledCount;
}
if (source.guardInterval !== undefined) {
target.guardInterval = source.guardInterval;
}
if (source.retryProcessDelay !== undefined) {
target.retryProcessDelay = source.retryProcessDelay;
}
if (source.backoffStrategies !== undefined) {
target.backoffStrategies = source.backoffStrategies;
}
if (source.drainDelay !== undefined) {
target.drainDelay = source.drainDelay;
}
if (source.lockRenewTime !== undefined) {
console.warn('lockRenewTime option is not supported');
}
Expand Down
Loading

0 comments on commit bc0f653

Please sign in to comment.