Skip to content

Commit

Permalink
feat(sandbox): kill child workers gracefully (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
GabrielCastro authored Jul 23, 2020
1 parent 30aa033 commit 4262837
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 55 deletions.
35 changes: 22 additions & 13 deletions src/classes/child-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import { forEach, values, flatten } from 'lodash';
import * as getPort from 'get-port';
import * as fs from 'fs';
import { promisify } from 'util';
import { killAsync } from './process-utils';

const stat = promisify(fs.stat);

const CHILD_KILL_TIMEOUT = 30_000;

export interface ChildProcessExt extends ChildProcess {
processFile?: string;
}
Expand All @@ -28,11 +31,21 @@ const convertExecArgv = async (execArgv: string[]): Promise<string[]> => {
return standard.concat(convertedArgs);
};

const initChild = function(child: ChildProcess, processFile: string) {
return new Promise(resolve => {
child.send({ cmd: 'init', value: processFile }, resolve);
async function initChild(child: ChildProcess, processFile: string) {
const onComplete = new Promise(resolve => {
const onMessageHandler = (msg: any) => {
if (msg.cmd === 'init-complete') {
resolve();
child.off('message', onMessageHandler);
}
};
child.on('message', onMessageHandler);
});
};
await new Promise(resolve =>
child.send({ cmd: 'init', value: processFile }, resolve),
);
await onComplete;
}

export class ChildPool {
retained: { [key: number]: ChildProcessExt } = {};
Expand Down Expand Up @@ -89,21 +102,17 @@ export class ChildPool {
}
}

kill(child: ChildProcess, signal?: NodeJS.Signals) {
child.kill(signal || 'SIGKILL');
async kill(child: ChildProcess, signal: 'SIGTERM' | 'SIGKILL' = 'SIGKILL') {
this.remove(child);
await killAsync(child, signal, CHILD_KILL_TIMEOUT);
}

clean() {
async clean() {
const children = values(this.retained).concat(this.getAllFree());

children.forEach(child => {
// TODO: We may want to use SIGKILL if the process does not die after some time.
this.kill(child, 'SIGTERM');
});

this.retained = {};
this.free = {};

await Promise.all(children.map(c => this.kill(c, 'SIGTERM')));
}

getFree(id: string): ChildProcessExt[] {
Expand Down
61 changes: 44 additions & 17 deletions src/classes/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import { SandboxedJob } from '../interfaces/sandboxed-job';

let status: any;
let processor: any;
let currentJobPromise: Promise<unknown> | undefined;

// same as process.send but waits until the send is complete
// the async version is used below because otherwise
// the termination handler may exit before the parent
// process has received the messages it requires
const processSendAsync = promisify(process.send.bind(process)) as (
msg: any,
) => Promise<void>;

// https://stackoverflow.com/questions/18391212/is-it-not-possible-to-stringify-an-error-using-json-stringify
if (!('toJSON' in Error.prototype)) {
Expand All @@ -29,7 +38,19 @@ if (!('toJSON' in Error.prototype)) {
});
}

process.on('message', async msg => {
async function waitForCurrentJobAndExit() {
status = 'TERMINATING';
try {
await currentJobPromise;
} finally {
process.exit(process.exitCode || 0);
}
}

process.on('SIGTERM', waitForCurrentJobAndExit);
process.on('SIGINT', waitForCurrentJobAndExit);

process.on('message', msg => {
switch (msg.cmd) {
case 'init':
processor = require(msg.value);
Expand All @@ -50,6 +71,9 @@ process.on('message', async msg => {
};
}
status = 'IDLE';
process.send({
cmd: 'init-complete',
});
break;

case 'start':
Expand All @@ -60,23 +84,26 @@ process.on('message', async msg => {
});
}
status = 'STARTED';
try {
const result = await Promise.resolve(processor(wrapJob(msg.job)) || {});
process.send({
cmd: 'completed',
value: result,
});
} catch (err) {
if (!err.message) {
err = new Error(err);
currentJobPromise = (async () => {
try {
const result = (await processor(wrapJob(msg.job))) || {};
await processSendAsync({
cmd: 'completed',
value: result,
});
} catch (err) {
if (!err.message) {
err = new Error(err);
}
await processSendAsync({
cmd: 'failed',
value: err,
});
} finally {
status = 'IDLE';
currentJobPromise = undefined;
}
process.send({
cmd: 'failed',
value: err,
});
} finally {
status = 'IDLE';
}
})();
break;
case 'stop':
break;
Expand Down
41 changes: 41 additions & 0 deletions src/classes/process-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict';

import { ChildProcess } from 'child_process';

function onExitOnce(child: ChildProcess): Promise<void> {
return new Promise(resolve => {
child.once('exit', () => resolve());
});
}

function hasProcessExited(child: ChildProcess): boolean {
return !!(child.exitCode !== null || child.signalCode);
}

/**
* Sends a kill signal to a child resolving when the child has exited,
* resorting to SIGKILL if the given timeout is reached
*/
export async function killAsync(
child: ChildProcess,
signal: 'SIGTERM' | 'SIGKILL' = 'SIGKILL',
timeoutMs: number = undefined,
): Promise<void> {
if (hasProcessExited(child)) {
return;
}

const onExit = onExitOnce(child);
child.kill(signal);

if (timeoutMs === 0 || isFinite(timeoutMs)) {
const timeoutHandle = setTimeout(() => {
if (!hasProcessExited(child)) {
child.kill('SIGKILL');
}
}, timeoutMs);
await onExit;
clearTimeout(timeoutHandle);
}
await onExit;
}
49 changes: 28 additions & 21 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,30 +364,37 @@ export class Worker<T = any> extends QueueBase {
this.waiting && reconnect && (await this.blockingConnection.reconnect());
}

async close(force = false) {
if (!this.closing) {
close(force = false) {
if (this.closing) {
return this.closing;
}
this.closing = (async () => {
this.emit('closing', 'closing queue');
this.closing = new Promise(async (resolve, reject) => {
try {
const client = await this.blockingConnection.client;

await this.resume();

if (!force) {
await this.whenCurrentJobsFinished(false);
} else {
await client.disconnect();
const client = await this.blockingConnection.client;

this.resume();
await Promise.resolve()
.finally(() => {
return force || this.whenCurrentJobsFinished(false);
})
.finally(() => {
const closePoolPromise = this.childPool?.clean();

if (force) {
// since we're not waiting for the job to end attach
// an error handler to avoid crashing the whole process
closePoolPromise?.catch(err => {
console.error(err);
});
return;
}
} catch (err) {
reject(err);
} finally {
this.timerManager.clearAllTimers();
this.childPool && this.childPool.clean();
}
this.emit('closed');
resolve();
});
}
return closePoolPromise;
})
.finally(() => client.disconnect())
.finally(() => this.timerManager.clearAllTimers())
.finally(() => this.emit('closed'));
})();
return this.closing;
}
}
6 changes: 2 additions & 4 deletions src/test/test_child-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ describe('Child pool', () => {
pool = new ChildPool();
});

afterEach(() => {
pool.clean();
});
afterEach(() => pool.clean());

it('should return same child if free', async () => {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
Expand Down Expand Up @@ -48,7 +46,7 @@ describe('Child pool', () => {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
const child = await pool.retain(processor);
expect(child).to.be.ok;
pool.kill(child);
await pool.kill(child);
expect(pool.retained).to.be.empty;
const newChild = await pool.retain(processor);
expect(child).to.not.be.eql(newChild);
Expand Down
40 changes: 40 additions & 0 deletions src/test/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,44 @@ describe('sandboxed process', () => {

await worker.close();
});

it('should allow the job to complete and then exit on worker close', async function() {
this.timeout(1500000);
const processFile = __dirname + '/fixtures/fixture_processor_slow.js';
const worker = new Worker(queueName, processFile);

// aquire and release a child here so we know it has it's full termination handler setup
const initalizedChild = await worker['childPool'].retain(processFile);
await worker['childPool'].release(initalizedChild);

// await this After we've added the job
const onJobActive = new Promise<void>(resolve => {
worker.on('active', resolve);
});

const jobAdd = queue.add('foo', {});
await onJobActive;

expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(1);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);
const child = Object.values(worker['childPool'].retained)[0];

expect(child).to.equal(initalizedChild);
expect(child.exitCode).to.equal(null);
expect(child.killed).to.equal(false);

// at this point the job should be active and running on the child
// trigger a close while we know it's doing work
await worker.close();

// ensure the child did get cleaned up
expect(!!child.killed).to.eql(true);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);

const job = await jobAdd;
// check that the job did finish successfully
const jobResult = await job.waitUntilFinished(queueEvents);
expect(jobResult).to.equal(42);
});
});

0 comments on commit 4262837

Please sign in to comment.