diff --git a/pkg/nuclide-commons/lib/process.js b/pkg/nuclide-commons/lib/process.js index 4ae9c1f619..7a69817074 100644 --- a/pkg/nuclide-commons/lib/process.js +++ b/pkg/nuclide-commons/lib/process.js @@ -21,7 +21,8 @@ import type {Observer} from 'rx'; import type {ProcessMessage, process$asyncExecuteRet} from '..'; import {observeStream, splitStream} from './stream'; -import {Observable, ReplaySubject} from 'rx'; +import {CompositeDisposable, Disposable} from 'event-kit'; +import {Observable} from 'rx'; import invariant from 'assert'; let platformPathPromise: ?Promise; @@ -254,34 +255,50 @@ function scriptSafeSpawnAndObserveOutput( }); } -class ProcessResource { - process$: Observable; - disposed$: ReplaySubject; - - constructor(promiseOrProcess: child_process$ChildProcess | Promise) { - this.disposed$ = new ReplaySubject(1); - this.process$ = Observable.fromPromise(Promise.resolve(promiseOrProcess)); - - Observable.combineLatest(this.process$, this.disposed$) - .takeUntil( - this.process$.flatMap(process => Observable.fromEvent(process, 'exit')), - ) - .subscribe(([process, disposed]) => { - if (disposed && (process != null)) { - process.kill(); - } - }); - } - - getStream(): Observable { - return this.process$.takeUntil(this.disposed$); - } +/** + * Creates an observable with the following properties: + * + * 1. It contains a process that's created using the provided factory upon subscription. + * 2. It doesn't complete until the process exits. + * 3. The process is killed when there are no more subscribers. + */ +function createProcessStream( + createProcess: () => child_process$ChildProcess | Promise, +): Observable { + return Observable.create(observer => { + const promise = Promise.resolve(createProcess()); + let process; + let disposed = false; + let exited = false; + const maybeKill = () => { + if (process != null && disposed && !exited) { + process.kill(); + process = null; + } + }; - dispose(): void { - this.disposed$.onNext(true); - this.disposed$.onCompleted(); - } + promise.then(p => { + process = p; + maybeKill(); + }); + const processStream = Observable.fromPromise(promise); + + const exit = processStream + .flatMap(p => Observable.fromEvent(p, 'exit', (code, signal) => signal)) + // An exit signal from SIGUSR1 doesn't actually exit the process, so skip that. + .filter(signal => signal !== 'SIGUSR1') + .tap(() => { exited = true; }); + + return new CompositeDisposable( + // A version of processStream that never completes... + Observable.merge(processStream, Observable.create(() => {})) + // ...which we take until the process exits. + .takeUntil(exit) + .subscribe(observer), + new Disposable(() => { disposed = true; maybeKill(); }), + ); + }).share(); } /** @@ -291,12 +308,8 @@ class ProcessResource { function observeProcessExit( createProcess: () => child_process$ChildProcess | Promise, ): Observable { - return Observable.using( - () => new ProcessResource(createProcess()), - processResource => ( - processResource.getStream().flatMap(process => Observable.fromEvent(process, 'exit').take(1)) - ), - ); + return createProcessStream(createProcess) + .flatMap(process => Observable.fromEvent(process, 'exit').take(1)); } function getOutputStream( @@ -327,10 +340,7 @@ function getOutputStream( function observeProcess( createProcess: () => child_process$ChildProcess | Promise, ): Observable { - return Observable.using( - () => new ProcessResource(createProcess()), - processResource => processResource.getStream().flatMap(getOutputStream), - ); + return createProcessStream(createProcess).flatMap(getOutputStream); } /** @@ -483,6 +493,7 @@ async function asyncExecute( module.exports = { asyncExecute, createArgsForScriptCommand, + createProcessStream, checkOutput, forkWithExecEnvironment, getOutputStream,