Skip to content

Commit

Permalink
fix: improve disconnection for queue events
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 2, 2019
1 parent 2b0d8f3 commit 56b53a1
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 52 deletions.
4 changes: 4 additions & 0 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ export class QueueBase extends EventEmitter {
this.closing = this.connection.close();
return this.closing;
}

disconnect() {
return this.connection.disconnect();
}
}
68 changes: 40 additions & 28 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { QueueEventsOptions } from '@src/interfaces';
import { QueueBase } from './queue-base';
import { delay } from 'bluebird';
import { array2obj } from '../utils';
import { Job } from './job';
import { QueueBase } from './queue-base';

export class QueueEvents extends QueueBase {
constructor(name: string, opts?: QueueEventsOptions) {
Expand All @@ -27,38 +27,50 @@ export class QueueEvents extends QueueBase {
let id = opts.lastEventId || '0-0';

while (!this.closing) {
const data = await this.client.xread(
'BLOCK',
opts.blockingTimeout,
'STREAMS',
key,
id,
);
try {
const data = await this.client.xread(
'BLOCK',
opts.blockingTimeout,
'STREAMS',
key,
id,
);

if (data) {
const stream = data[0];
const events = stream[1];
if (data) {
const stream = data[0];
const events = stream[1];

for (let i = 0; i < events.length; i++) {
id = events[i][0];
const args = array2obj(events[i][1]);
for (let i = 0; i < events.length; i++) {
id = events[i][0];
const args = array2obj(events[i][1]);

//
// TODO: we may need to have a separate xtream for progress data
// to avoid this hack.
switch (args.event) {
case 'progress':
args.data = JSON.parse(args.data);
break;
case 'completed':
args.returnvalue = JSON.parse(args.returnvalue);
break;
}
//
// TODO: we may need to have a separate xtream for progress data
// to avoid this hack.
switch (args.event) {
case 'progress':
args.data = JSON.parse(args.data);
break;
case 'completed':
args.returnvalue = JSON.parse(args.returnvalue);
break;
}

this.emit(args.event, args, id);
this.emit(`${args.event}:${args.jobId}`, args, id);
this.emit(args.event, args, id);
this.emit(`${args.event}:${args.jobId}`, args, id);
}
}
} catch (err) {
if (err.message !== 'Connection is closed.') {
await delay(5000);
throw err;
}
}
}
}

async close() {
await super.close();
return this.disconnect();
}
}
23 changes: 23 additions & 0 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,29 @@ export class RedisConnection {
return this.client;
}

async disconnect() {
const client = this.client;
if (client.status !== 'end') {
let _resolve, _reject;

const disconnecting = new Promise((resolve, reject) => {
client.once('end', resolve);
client.once('error', reject);
_resolve = resolve;
_reject = reject;
});

client.disconnect();

try {
await disconnecting;
} finally {
client.removeListener('end', _resolve);
client.removeListener('error', _reject);
}
}
}

async close() {}

private async getRedisVersion() {
Expand Down
26 changes: 2 additions & 24 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ export class Worker extends QueueBase {
//
// Force reconnection of blocking connection to abort blocking redis call immediately.
//
this.waiting && (await redisClientDisconnect(this.client));
this.waiting && (await this.client.disconnect());

// If we are disconnected, how are we going to update the completed/failed sets?
if (this.processing) {
Expand All @@ -282,33 +282,11 @@ export class Worker extends QueueBase {
if (!force) {
await this.whenCurrentJobsFinished(false);
} else {
await redisClientDisconnect(this.client);
await this.disconnect();
}
} finally {
this.childPool && this.childPool.clean();
}
this.emit('closed');
}
}

async function redisClientDisconnect(client: IORedis.Redis) {
if (client.status !== 'end') {
let _resolve, _reject;

const disconnecting = new Promise((resolve, reject) => {
client.once('end', resolve);
client.once('error', reject);
_resolve = resolve;
_reject = reject;
});

client.disconnect();

try {
await disconnecting;
} finally {
client.removeListener('end', _resolve);
client.removeListener('error', _reject);
}
}
}

0 comments on commit 56b53a1

Please sign in to comment.