Skip to content
This repository has been archived by the owner on Dec 13, 2018. It is now read-only.

Commit

Permalink
Replace ProcessResource with a process stream factory
Browse files Browse the repository at this point in the history
Reviewed By: peterhal

Differential Revision: D3073782

fb-gh-sync-id: 645c94f0e3604c9ea9050e51a1c3ef332f5a9a72
shipit-source-id: 645c94f0e3604c9ea9050e51a1c3ef332f5a9a72
  • Loading branch information
matthewwithanm authored and Facebook Github Bot 5 committed Mar 22, 2016
1 parent 5b402fa commit 58e8778
Showing 1 changed file with 48 additions and 37 deletions.
85 changes: 48 additions & 37 deletions pkg/nuclide-commons/lib/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import type {Observer} from 'rx';
import type {ProcessMessage, process$asyncExecuteRet} from '..';

import {observeStream, splitStream} from './stream';
import {Observable, ReplaySubject} from 'rx';
import {CompositeDisposable, Disposable} from 'event-kit';
import {Observable} from 'rx';
import invariant from 'assert';

let platformPathPromise: ?Promise<string>;
Expand Down Expand Up @@ -254,34 +255,50 @@ function scriptSafeSpawnAndObserveOutput(
});
}

class ProcessResource {
process$: Observable<child_process$ChildProcess>;
disposed$: ReplaySubject<boolean>;

constructor(promiseOrProcess: child_process$ChildProcess | Promise<child_process$ChildProcess>) {
this.disposed$ = new ReplaySubject(1);
this.process$ = Observable.fromPromise(Promise.resolve(promiseOrProcess));

Observable.combineLatest(this.process$, this.disposed$)
.takeUntil(
this.process$.flatMap(process => Observable.fromEvent(process, 'exit')),
)
.subscribe(([process, disposed]) => {
if (disposed && (process != null)) {
process.kill();
}
});
}

getStream(): Observable<child_process$ChildProcess> {
return this.process$.takeUntil(this.disposed$);
}
/**
* Creates an observable with the following properties:
*
* 1. It contains a process that's created using the provided factory upon subscription.
* 2. It doesn't complete until the process exits.
* 3. The process is killed when there are no more subscribers.
*/
function createProcessStream(
createProcess: () => child_process$ChildProcess | Promise<child_process$ChildProcess>,
): Observable<child_process$ChildProcess> {
return Observable.create(observer => {
const promise = Promise.resolve(createProcess());
let process;
let disposed = false;
let exited = false;
const maybeKill = () => {
if (process != null && disposed && !exited) {
process.kill();
process = null;
}
};

dispose(): void {
this.disposed$.onNext(true);
this.disposed$.onCompleted();
}
promise.then(p => {
process = p;
maybeKill();
});

const processStream = Observable.fromPromise(promise);

const exit = processStream
.flatMap(p => Observable.fromEvent(p, 'exit', (code, signal) => signal))
// An exit signal from SIGUSR1 doesn't actually exit the process, so skip that.
.filter(signal => signal !== 'SIGUSR1')
.tap(() => { exited = true; });

return new CompositeDisposable(
// A version of processStream that never completes...
Observable.merge(processStream, Observable.create(() => {}))
// ...which we take until the process exits.
.takeUntil(exit)
.subscribe(observer),
new Disposable(() => { disposed = true; maybeKill(); }),
);
}).share();
}

/**
Expand All @@ -291,12 +308,8 @@ class ProcessResource {
function observeProcessExit(
createProcess: () => child_process$ChildProcess | Promise<child_process$ChildProcess>,
): Observable<number> {
return Observable.using(
() => new ProcessResource(createProcess()),
processResource => (
processResource.getStream().flatMap(process => Observable.fromEvent(process, 'exit').take(1))
),
);
return createProcessStream(createProcess)
.flatMap(process => Observable.fromEvent(process, 'exit').take(1));
}

function getOutputStream(
Expand Down Expand Up @@ -327,10 +340,7 @@ function getOutputStream(
function observeProcess(
createProcess: () => child_process$ChildProcess | Promise<child_process$ChildProcess>,
): Observable<ProcessMessage> {
return Observable.using(
() => new ProcessResource(createProcess()),
processResource => processResource.getStream().flatMap(getOutputStream),
);
return createProcessStream(createProcess).flatMap(getOutputStream);
}

/**
Expand Down Expand Up @@ -483,6 +493,7 @@ async function asyncExecute(
module.exports = {
asyncExecute,
createArgsForScriptCommand,
createProcessStream,
checkOutput,
forkWithExecEnvironment,
getOutputStream,
Expand Down

0 comments on commit 58e8778

Please sign in to comment.