From f8aa946e9890b6a1b4c6eae1c15c401edd13e542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Mar=C3=A9chal?= Date: Wed, 20 Nov 2019 17:35:09 -0500 Subject: [PATCH] process: add onClose event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a `onClose` convenience event on processes, fired when all the streams are supposed to be closed. Signed-off-by: Paul Maréchal --- dev-packages/electron/scripts/post-install.js | 2 +- packages/process/src/node/dev-null-stream.ts | 13 +++++++++ .../process/src/node/multi-ring-buffer.ts | 24 +++++++++++---- packages/process/src/node/process.ts | 19 ++++++++++++ packages/process/src/node/raw-process.ts | 29 +++++++++++++------ packages/process/src/node/terminal-process.ts | 15 ++++++++-- .../task/src/node/process/process-task.ts | 4 +-- 7 files changed, 86 insertions(+), 20 deletions(-) diff --git a/dev-packages/electron/scripts/post-install.js b/dev-packages/electron/scripts/post-install.js index c3414bb08b28e..e1d9352c52971 100644 --- a/dev-packages/electron/scripts/post-install.js +++ b/dev-packages/electron/scripts/post-install.js @@ -32,7 +32,7 @@ async function fork(script, args = [], options = {}, callback) { return new Promise((resolve, reject) => { const subprocess = cp.fork(script, args, options); subprocess.once('error', reject); - subprocess.once('exit', (code, signal) => { + subprocess.once('close', (code, signal) => { if (signal || code) reject(new Error(`"${script}" exited with ${signal || code}`)); else resolve(); }); diff --git a/packages/process/src/node/dev-null-stream.ts b/packages/process/src/node/dev-null-stream.ts index f18d17fe683f2..024b1774b76d2 100644 --- a/packages/process/src/node/dev-null-stream.ts +++ b/packages/process/src/node/dev-null-stream.ts @@ -22,6 +22,19 @@ import stream = require('stream'); * Writing goes to a black hole, reading returns `EOF`. */ export class DevNullStream extends stream.Duplex { + + constructor(options: { + /** + * Makes this stream call `destroy` on itself, emitting the `close` event. + */ + autoDestroy?: boolean, + } = {}) { + super(); + if (options.autoDestroy) { + this.destroy(); + } + } + // tslint:disable-next-line:no-any _write(chunk: any, encoding: string, callback: (err?: Error) => void): void { callback(); diff --git a/packages/process/src/node/multi-ring-buffer.ts b/packages/process/src/node/multi-ring-buffer.ts index ccc7a1712ed5d..ebda711a22ef0 100644 --- a/packages/process/src/node/multi-ring-buffer.ts +++ b/packages/process/src/node/multi-ring-buffer.ts @@ -44,6 +44,14 @@ export class MultiRingBufferReadableStream extends stream.Readable implements Di this.deq(size); } + _destroy(err: Error | undefined, callback: (err?: Error) => void): void { + this.ringBuffer.closeStream(this); + this.ringBuffer.closeReader(this.reader); + this.disposed = true; + this.removeAllListeners(); + callback(err); + } + onData(): void { if (this.more === true) { this.deq(-1); @@ -66,10 +74,7 @@ export class MultiRingBufferReadableStream extends stream.Readable implements Di } dispose(): void { - this.ringBuffer.closeStream(this); - this.ringBuffer.closeReader(this.reader); - this.disposed = true; - this.removeAllListeners(); + this.destroy(); } } @@ -82,7 +87,7 @@ export interface MultiRingBufferOptions { export interface WrappedPosition { newPos: number, wrap: boolean } @injectable() -export class MultiRingBuffer { +export class MultiRingBuffer implements Disposable { protected readonly buffer: Buffer; protected head: number = -1; @@ -276,6 +281,15 @@ export class MultiRingBuffer { return this.readers.size; } + /** + * Dispose all the attached readers/streams. + */ + dispose(): void { + for (const astream of this.streams.keys()) { + astream.dispose(); + } + } + /* Position should be incremented if it goes pass end. */ protected shouldIncPos(pos: number, end: number, size: number): boolean { const { newPos: newHead, wrap } = this.inc(end, size); diff --git a/packages/process/src/node/process.ts b/packages/process/src/node/process.ts index 14c9873ae2b41..25faf89334831 100644 --- a/packages/process/src/node/process.ts +++ b/packages/process/src/node/process.ts @@ -85,6 +85,7 @@ export abstract class Process { readonly id: number; protected readonly startEmitter: Emitter = new Emitter(); protected readonly exitEmitter: Emitter = new Emitter(); + protected readonly closeEmitter: Emitter = new Emitter(); protected readonly errorEmitter: Emitter = new Emitter(); protected _killed = false; @@ -128,6 +129,9 @@ export abstract class Process { return this.startEmitter.event; } + /** + * Wait for the process to exit, streams can still emit data. + */ get onExit(): Event { return this.exitEmitter.event; } @@ -136,6 +140,13 @@ export abstract class Process { return this.errorEmitter.event; } + /** + * Waits for both process exit and for all the streams to be closed. + */ + get onClose(): Event { + return this.closeEmitter.event; + } + protected emitOnStarted(): void { this.startEmitter.fire({}); } @@ -150,6 +161,14 @@ export abstract class Process { this.exitEmitter.fire(exitEvent); } + /** + * Emit the onClose event for this process. Only one of code and signal + * should be defined. + */ + protected emitOnClose(code?: number, signal?: string): void { + this.closeEmitter.fire({ code, signal }); + } + protected handleOnExit(event: IProcessExitEvent): void { this._killed = true; const signalSuffix = event.signal ? `, signal: ${event.signal}` : ''; diff --git a/packages/process/src/node/raw-process.ts b/packages/process/src/node/raw-process.ts index ab1a09c1f2248..9986a4e7f163c 100644 --- a/packages/process/src/node/raw-process.ts +++ b/packages/process/src/node/raw-process.ts @@ -115,18 +115,29 @@ export class RawProcess extends Process { error.code = error.code || 'Unknown error'; this.emitOnError(error as ProcessErrorEvent); }); - this.process.on('exit', (exitCode: number, signal: string) => { + + // When no stdio option is passed, it is null by default. + this.outputStream = this.process.stdout || new DevNullStream({ autoDestroy: true }); + this.inputStream = this.process.stdin || new DevNullStream({ autoDestroy: true }); + this.errorStream = this.process.stderr || new DevNullStream({ autoDestroy: true }); + + this.process.on('exit', (exitCode, signal) => { // node's child_process exit sets the unused parameter to null, // but we want it to be undefined instead. this.emitOnExit( - exitCode !== null ? exitCode : undefined, - signal !== null ? signal : undefined, + typeof exitCode === 'number' ? exitCode : undefined, + typeof signal === 'string' ? signal : undefined, ); }); - this.outputStream = this.process.stdout || new DevNullStream(); - this.inputStream = this.process.stdin || new DevNullStream(); - this.errorStream = this.process.stderr || new DevNullStream(); + this.process.on('close', (exitCode, signal) => { + // node's child_process exit sets the unused parameter to null, + // but we want it to be undefined instead. + this.emitOnClose( + typeof exitCode === 'number' ? exitCode : undefined, + typeof signal === 'string' ? signal : undefined, + ); + }); if (this.process.pid !== undefined) { process.nextTick(this.emitOnStarted.bind(this)); @@ -134,9 +145,9 @@ export class RawProcess extends Process { } catch (error) { /* When an error is thrown, set up some fake streams, so the client code doesn't break because these field are undefined. */ - this.outputStream = new DevNullStream(); - this.inputStream = new DevNullStream(); - this.errorStream = new DevNullStream(); + this.outputStream = new DevNullStream({ autoDestroy: true }); + this.inputStream = new DevNullStream({ autoDestroy: true }); + this.errorStream = new DevNullStream({ autoDestroy: true }); /* Call the client error handler, but first give them a chance to register it. */ this.emitOnErrorAsync(error); diff --git a/packages/process/src/node/terminal-process.ts b/packages/process/src/node/terminal-process.ts index d243638e19dd7..b557709df98f2 100644 --- a/packages/process/src/node/terminal-process.ts +++ b/packages/process/src/node/terminal-process.ts @@ -178,7 +178,7 @@ export class TerminalProcess extends Process { protected readonly terminal: IPty | undefined; readonly outputStream = this.createOutputStream(); - readonly errorStream = new DevNullStream(); + readonly errorStream = new DevNullStream({ autoDestroy: true }); readonly inputStream: Writable; constructor( @@ -210,7 +210,9 @@ export class TerminalProcess extends Process { } }); - this.terminal.on('exit', (code: number, signal?: number) => { + // node-pty actually wait for the underlying streams to be closed before emitting exit. + // We should emulate the `exit` and `close` sequence. + this.terminal.on('exit', (code, signal) => { // Make sure to only pass either code or signal as !undefined, not // both. // @@ -224,6 +226,13 @@ export class TerminalProcess extends Process { } else { this.emitOnExit(undefined, signame(signal)); } + process.nextTick(() => { + if (signal === undefined || signal === 0) { + this.emitOnClose(code, undefined); + } else { + this.emitOnClose(undefined, signame(signal)); + } + }); }); this.terminal.on('data', (data: string) => { @@ -237,7 +246,7 @@ export class TerminalProcess extends Process { }); } catch (error) { - this.inputStream = new DevNullStream(); + this.inputStream = new DevNullStream({ autoDestroy: true }); // Normalize the error to make it as close as possible as what // node's child_process.spawn would generate in the same diff --git a/packages/task/src/node/process/process-task.ts b/packages/task/src/node/process/process-task.ts index bed546979804e..427e665e6c57c 100644 --- a/packages/task/src/node/process/process-task.ts +++ b/packages/task/src/node/process/process-task.ts @@ -64,7 +64,7 @@ export class ProcessTask extends Task { ) { super(taskManager, logger, options); - const toDispose = this.process.onExit(async event => { + const toDispose = this.process.onClose(async event => { toDispose.dispose(); this.fireTaskExited(await this.getTaskExitedEvent(event)); }); @@ -100,7 +100,7 @@ export class ProcessTask extends Task { if (this.process.killed) { resolve(); } else { - const toDispose = this.process.onExit(event => { + const toDispose = this.process.onClose(event => { toDispose.dispose(); resolve(); });