Skip to content

Commit

Permalink
fix: reworked initialization of redis clients
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Oct 12, 2019
1 parent 90bd891 commit c17d4be
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 215 deletions.
33 changes: 16 additions & 17 deletions src/classes/compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class Queue3<T = any> extends EventEmitter {
* This replaces the `ready` event emitted on Queue in previous verisons.
*/
async isReady(): Promise<this> {
await this.queue.waitUntilReady();
await this.queue.client;
return this;
}

Expand Down Expand Up @@ -99,7 +99,7 @@ export class Queue3<T = any> extends EventEmitter {

this.worker = new Worker(this.name, processor, this.opts);
this.queueScheduler = new QueueScheduler(this.name, this.opts);
await this.worker.waitUntilReady();
await this.worker.client;
}

add(jobName: string, data: any, opts?: JobsOptions): Promise<Job> {
Expand Down Expand Up @@ -239,12 +239,13 @@ export class Queue3<T = any> extends EventEmitter {
* Returns JobInformation of repeatable jobs (ordered descending). Provide a start and/or an end
* index to limit the number of results. Start defaults to 0, end to -1 and asc to false.
*/
getRepeatableJobs(
async getRepeatableJobs(
start = 0,
end = -1,
asc = false,
): Promise<JobInformation3[]> {
return this.queue.repeat.getRepeatableJobs(start, end, asc);
const repeat = await this.queue.repeat;
return repeat.getRepeatableJobs(start, end, asc);
}

/**
Expand All @@ -256,12 +257,8 @@ export class Queue3<T = any> extends EventEmitter {
opts?: JobsOptions,
skipCheckExists?: boolean,
): Promise<Job> {
return this.queue.repeat.addNextRepeatableJob(
name,
data,
opts,
skipCheckExists,
);
const repeat = await this.queue.repeat;
return repeat.addNextRepeatableJob(name, data, opts, skipCheckExists);
}

/**
Expand All @@ -270,16 +267,17 @@ export class Queue3<T = any> extends EventEmitter {
*
* name: The name of the to be removed job
*/
async removeRepeatable(name: string, repeat: RepeatOptions): Promise<void> {
return this.queue.repeat.removeRepeatable(name, repeat, repeat.jobId);
async removeRepeatable(name: string, opts: RepeatOptions): Promise<void> {
const repeat = await this.queue.repeat;
return repeat.removeRepeatable(name, opts, opts.jobId);
}

/**
* Removes a given repeatable job by key.
*/
async removeRepeatableByKey(repeatJobKey: string): Promise<void> {
const repeat = this.queue.repeat;
await repeat.waitUntilReady();
const repeat = await this.queue.repeat;
const client = await repeat.client;

const tokens = repeatJobKey.split(':');
const data = {
Expand All @@ -292,7 +290,7 @@ export class Queue3<T = any> extends EventEmitter {
};

const queueKey = repeat.toKey('');
return (<any>repeat.client).removeRepeatable(
return (<any>client).removeRepeatable(
repeat.keys.repeat,
repeat.keys.delayed,
data.id,
Expand Down Expand Up @@ -389,8 +387,9 @@ export class Queue3<T = any> extends EventEmitter {
/**
* Returns a promise that resolves to the quantity of repeatable jobs.
*/
getRepeatableCount(): Promise<number> {
return this.queue.repeat.getRepeatableCount();
async getRepeatableCount(): Promise<number> {
const repeat = await this.queue.repeat;
return repeat.getRepeatableCount();
}

/**
Expand Down
43 changes: 20 additions & 23 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ export class Job {
data: any,
opts?: JobsOptions,
) {
await queue.waitUntilReady();
const client = await queue.client;

const job = new Job(queue, name, data, opts, opts && opts.jobId);

job.id = await job.addJob(queue.client);
job.id = await job.addJob(client);

return job;
}
Expand All @@ -83,13 +83,13 @@ export class Job {
opts?: JobsOptions;
}[],
) {
await queue.waitUntilReady();
const client = await queue.client;

const jobInstances = jobs.map(
job => new Job(queue, job.name, job.data, job.opts),
);

const multi = queue.client.multi();
const multi = client.multi();

for (const job of jobInstances) {
job.addJob(<IORedis.Redis>(multi as unknown));
Expand Down Expand Up @@ -138,8 +138,8 @@ export class Job {
static async fromId(queue: QueueBase, jobId: string) {
// jobId can be undefined if moveJob returns undefined
if (jobId) {
await queue.waitUntilReady();
const jobData = await queue.client.hgetall(queue.toKey(jobId));
const client = await queue.client;
const jobData = await client.hgetall(queue.toKey(jobId));
return isEmpty(jobData) ? null : Job.fromJSON(queue, jobData, jobId);
}
}
Expand All @@ -162,13 +162,10 @@ export class Job {
}

async update(data: any) {
await this.queue.waitUntilReady();
const client = await this.queue.client;

this.data = data;
await this.queue.client.hset(
this.queue.toKey(this.id),
'data',
JSON.stringify(data),
);
await client.hset(this.queue.toKey(this.id), 'data', JSON.stringify(data));
}

async updateProgress(progress: number | object) {
Expand All @@ -182,9 +179,10 @@ export class Job {
* @params logRow: string String with log data to be logged.
*
*/
log(logRow: string) {
async log(logRow: string) {
const client = await this.queue.client;
const logsKey = this.toKey(this.id) + ':logs';
return this.queue.client.rpush(logsKey, logRow);
return client.rpush(logsKey, logRow);
}

async remove() {
Expand Down Expand Up @@ -239,13 +237,13 @@ export class Job {
* @returns void
*/
async moveToFailed(err: Error, fetchNext = false) {
await this.queue.waitUntilReady();
const client = await this.queue.client;

const queue = this.queue;
this.failedReason = err.message;

let command: string;
const multi = queue.client.multi();
const multi = client.multi();
this.saveAttempt(multi, err);

//
Expand Down Expand Up @@ -405,13 +403,13 @@ export class Job {
* rejects, it indicates that the script failed to execute
*/
async retry(state: 'completed' | 'failed' = 'failed') {
await this.queue.waitUntilReady();
const client = await this.queue.client;

this.failedReason = null;
this.finishedOn = null;
this.processedOn = null;

await this.queue.client.hdel(
await client.hdel(
this.queue.toKey(this.id),
'finishedOn',
'processedOn',
Expand All @@ -433,16 +431,15 @@ export class Job {
}

private async isInZSet(set: string) {
const score = await this.queue.client.zscore(
this.queue.toKey(set),
this.id,
);
const client = await this.queue.client;

const score = await client.zscore(this.queue.toKey(set), this.id);
return score !== null;
}

private async isInList(list: string) {
return Scripts.isJobInList(
this.queue.client,
await this.queue.client,
this.queue.toKey(list),
this.id,
);
Expand Down
20 changes: 8 additions & 12 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import { EventEmitter } from 'events';
import IORedis from 'ioredis';
import { QueueBaseOptions } from '../interfaces';
import { RedisConnection } from './redis-connection';

export class QueueBase extends EventEmitter {
keys: { [index: string]: string };
client: IORedis.Redis;
closing: Promise<void>;

protected connection: RedisConnection;
closing: Promise<void>;
private initializing: Promise<IORedis.Redis>;

constructor(protected name: string, public opts: QueueBaseOptions = {}) {
super();
Expand All @@ -20,6 +17,7 @@ export class QueueBase extends EventEmitter {
};

this.connection = new RedisConnection(opts.connection);
this.connection.on('error', this.emit.bind(this));

const keys: { [index: string]: string } = {};
[
Expand Down Expand Up @@ -48,21 +46,19 @@ export class QueueBase extends EventEmitter {
keys[key] = this.toKey(key);
});
this.keys = keys;

this.initializing = this.connection.init();

this.initializing
.then(client => client.on('error', this.emit.bind(this)))
.then(client => (this.client = client))
.catch(err => this.emit('error', err));
}

toKey(type: string) {
return `${this.opts.prefix}:${this.name}:${type}`;
}

get client() {
return this.connection.client;
}

// TO BE DEPRECATED
async waitUntilReady() {
return this.initializing;
return this.client;
}

protected base64Name() {
Expand Down
4 changes: 2 additions & 2 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class QueueEvents extends QueueBase {
}

private async consumeEvents() {
await this.waitUntilReady();
const client = await this.client;

const opts: QueueEventsOptions = this.opts;

Expand All @@ -30,7 +30,7 @@ export class QueueEvents extends QueueBase {

while (!this.closing) {
try {
const data = await this.client.xread(
const data = await client.xread(
'BLOCK',
opts.blockingTimeout,
'STREAMS',
Expand Down
16 changes: 8 additions & 8 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ export class QueueGetters extends QueueBase {
*
*/
async getJobCounts(...types: string[]) {
await this.waitUntilReady();

const multi = this.client.multi();
const client = await this.client;
const multi = client.multi();

this.commandByType(types, true, function(key, command) {
(<any>multi)[command](key);
Expand Down Expand Up @@ -113,7 +112,8 @@ export class QueueGetters extends QueueBase {
}

async getRanges(types: string[], start = 0, end = 1, asc = false) {
const multi = this.client.multi();
const client = await this.client;
const multi = client.multi();
const multiCommands: string[] = [];

this.commandByType(types, false, (key, command) => {
Expand Down Expand Up @@ -153,7 +153,6 @@ export class QueueGetters extends QueueBase {
}

async getJobs(types: string[] | string, start = 0, end = -1, asc = false) {
await this.waitUntilReady();
types = Array.isArray(types) ? types : [types];

if (types.indexOf('waiting') !== -1) {
Expand All @@ -165,7 +164,8 @@ export class QueueGetters extends QueueBase {
}

async getJobLogs(jobId: string, start = 0, end = -1) {
const multi = this.client.multi();
const client = await this.client;
const multi = client.multi();

const logsKey = this.toKey(jobId + ':logs');
multi.lrange(logsKey, -(end + 1), -(start + 1));
Expand All @@ -177,8 +177,8 @@ export class QueueGetters extends QueueBase {
}

async getWorkers() {
await this.waitUntilReady();
const clients = await this.client.client('list');
const client = await this.client;
const clients = await client.client('list');
try {
const list = await this.parseClientList(clients);
return list;
Expand Down
8 changes: 5 additions & 3 deletions src/classes/queue-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export class QueueScheduler extends QueueBase {
}

while (!this.closing) {
const client = await this.waitUntilReady();

// Check if at least the min stalled check time has passed.
await this.moveStalledJobsToWait();

Expand All @@ -58,15 +60,15 @@ export class QueueScheduler extends QueueBase {

let data;
if (blockTime) {
data = await this.client.xread(
data = await client.xread(
'BLOCK',
blockTime,
'STREAMS',
key,
streamLastId,
);
} else {
data = await this.client.xread('STREAMS', key, streamLastId);
data = await client.xread('STREAMS', key, streamLastId);
}

if (data && data[0]) {
Expand All @@ -87,7 +89,7 @@ export class QueueScheduler extends QueueBase {
// We trim to a length of 100, which should be a very safe value
// for all kind of scenarios.
//
this.client.xtrim(key, 'MAXLEN', '~', 100);
client.xtrim(key, 'MAXLEN', '~', 100);
}

const now = Date.now();
Expand Down
Loading

0 comments on commit c17d4be

Please sign in to comment.