diff --git a/packages/jest-worker/README.md b/packages/jest-worker/README.md new file mode 100644 index 000000000000..dcbdc04f9819 --- /dev/null +++ b/packages/jest-worker/README.md @@ -0,0 +1,193 @@ +# jest-worker + +Module for executing heavy tasks under forked processes in parallel, by providing a `Promise` based interface, minimum overhead, and bound workers. + +The module works by providing an absolute path of the module to be loaded in all forked processes. Files relative to a node module are also accepted. All methods are exposed on the parent process as promises, so they can be `await`'ed. Child (worker) methods can either be synchronous or asynchronous. + +The module also implements support for bound workers. Binding a worker means that, based on certain parameters, the same task will always be executed by the same worker. The way bound workers work is by using the returned string of the `computeWorkerKey` method. If the string was used before for a task, the call will be queued to the related worker that processed the task earlier; if not, it will be executed by the first available worker, then sticked to the worker that executed it; so the next time it will be processed by the same worker. If you have no preference on the worker executing the task, but you have defined a `computeWorkerKey` method because you want _some_ of the tasks to be sticked, you can return `null` from it. + +The list of exposed methods can be explicitly provided via the `exposedMethods` option. If it is not provided, it will be obtained by requiring the child module into the main process, and analyzed via reflection. Check the "minimal example" section for a valid one. + +## Install + +```sh +$ yarn add jest-worker +``` + + +## API + +The only exposed method is a constructor (`Worker`) that is initialized by passing the worker path, plus an options object. + + +### `workerPath: string` (required) + +Node module name or absolute path of the file to be loaded in the child processes. Use `require.resolve` to transform a relative path into an absolute one. + + +### `options: Object` (optional) + +#### `exposedMethods: $ReadOnlyArray` (optional) + +List of method names that can be called on the child processes from the parent process. You cannot expose any method named like a public `Worker` method, or starting with `_`. If you use method auto-discovery, then these methods will not be exposed, even if they exist. + +#### `numWorkers: number` (optional) + +Amount of workers to spwan. Defaults to the number of CPUs minus 1. + +#### `forkOptions: Object` (optional) + +Allow customizing all options passed to `childProcess.fork`. By default, some values are set (`cwd` and `env`), but you can override them and customize the rest. For a list of valid values, check [the Node documentation](https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options). + +### `computeWorkerKey: (method: string, ...args: Array) => ?string` (optional) + +Every time a method exposed via the API is called, `computeWorkerKey` is also called in order to bound the call to a worker. This is useful for workers that are able to cache the result or part of it. You bound calls to a worker by making `computeWorkerKey` return the same identifier for all different calls. If you do not want to bind the call to any worker, return `null`. + +The callback you provide is called with the method name, plus all the rest of the arguments of the call. Thus, you have full control to decide what to return. Check a practical example on bound workers under the "bound worker usage" section. + +By default, no process is bound to any worker. + + +## Worker + +The returned `Worker` instance has all the exposed methods, plus some additional ones to interact with the workers itself: + + +### `getStdout(): Readable` + +Returns a `ReadableStream` where the standard output of all workers is piped. Note that the `silent` option of the child workers must be set to `true` to make it work. This is the default set by `jest-worker`, but keep it in mind when overriding options through `forkOptions`. + + +### `getStderr(): Readable` + +Returns a `ReadableStream` where the standard error of all workers is piped. Note that the `silent` option of the child workers must be set to `true` to make it work. This is the default set by `jest-worker`, but keep it in mind when overriding options through `forkOptions`. + + +### `end()` + +Finishes the workers by killing all workers. No further calls can be done to the `Worker` instance. + + +## Minimal example + +This example covers the minmal usage: + +### File `parent.js` + +```javascript +import Worker from 'jest-worker'; + +async function main() { + const worker = new Worker(require.resolve('./worker')); + const result = await worker.hello('Alice'); // "Hello, Alice" +} + +main(); +``` + +### File `worker.js` + +```javascript +export function hello(param) { + return 'Hello, ' + param; +} +``` + + +## Standard usage + +This example covers the standard usage: + +### File `parent.js` + +```javascript +import Worker from 'jest-worker'; + +async function main() { + const myWorker = new Worker({ + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + workerPath: require.resolve('./worker'), + }); + + console.log(await myWorker.foo('Alice')); // "Hello from foo: Alice" + console.log(await myWorker.bar('Bob')); // "Hello from bar: Bob" + + myWorker.end(); +} + +main(); +``` + +### File `worker.js` + +```javascript +export function foo(param) { + return 'Hello from foo: ' + param; +} + +export function bar(param) { + return 'Hello from bar: ' + param; +} +``` + + +## Bound worker usage: + +This example covers the usage with a `computeWorkerKey` method: + +### File `parent.js` + +```javascript +import Worker from 'jest-worker'; + +async function main() { + const myWorker = new Worker({ + computeWorkerKey: (method, filename) => filename, + exposedMethods: ['foo', 'bar'], + workerPath: require.resolve('./worker'), + }); + + // Transform the given file, within the first available worker. + console.log(await myWorker.transform('/tmp/foo.js')); + + // Wait a bit. + await sleep(10000); + + // Transform the same file again. Will immediately return because the + // transformed file is cached in the worker, and `computeWorkerKey` ensures + // the same worker that processed the file the first time will process it now. + console.log(await myWorker.transform('/tmp/foo.js')); + + myWorker.end(); +} + +main(); +``` + +### File `worker.js` + +```javascript + +import babel from 'babel-core'; + +const cache = Object.create(null); + +export function transform(filename) { + if (cache[filename]) { + return cache[filename]; + } + + // jest-worker can handle both immediate results and thenables. If a + // thenable is returned, it will be await'ed until it resolves. + return new Promise((resolve, reject) => { + babel.transformFile(filename, (err, result) => { + if (err) { + reject(err); + } else { + resolve(cache[filename] = result); + } + }); + }); +} +``` diff --git a/packages/jest-worker/package.json b/packages/jest-worker/package.json new file mode 100644 index 000000000000..57518305240f --- /dev/null +++ b/packages/jest-worker/package.json @@ -0,0 +1,13 @@ +{ + "name": "jest-worker", + "version": "21.1.0", + "repository": { + "type": "git", + "url": "https://github.com/facebook/jest.git" + }, + "license": "MIT", + "main": "build/index.js", + "dependencies": { + "merge-stream": "^1.0.1" + } +} diff --git a/packages/jest-worker/src/__performance_tests__/test.js b/packages/jest-worker/src/__performance_tests__/test.js new file mode 100644 index 000000000000..aeff514657af --- /dev/null +++ b/packages/jest-worker/src/__performance_tests__/test.js @@ -0,0 +1,173 @@ +'use strict'; + +// eslint-disable-next-line import/no-extraneous-dependencies +const workerFarm = require('worker-farm'); +import JestWorker from '../../build'; + +const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); +const calls = 10000; +const threads = 6; + +function testWorkerFarm() { + return new Promise(async (resolve, reject) => { + const startTime = Date.now(); + let count = 0; + + async function countToFinish() { + if (++count === calls) { + workerFarm.end(api); + const endTime = Date.now(); + + // Let all workers go down. + await sleep(2000); + + resolve({ + globalTime: endTime - startTime - 2000, + processingTime: endTime - startProcess, + }); + } + } + + const api = workerFarm( + { + autoStart: true, + maxConcurrentCallsPerWorker: 1, + maxConcurrentWorkers: threads, + }, + require.resolve('./workers/worker_farm'), + ['loadTest'], + ); + + // Let all workers come up. + await sleep(2000); + + const startProcess = Date.now(); + + for (let i = 0; i < calls; i++) { + const promisified = new Promise((resolve, reject) => { + api.loadTest((err, result) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }); + + promisified.then(countToFinish); + } + }); +} + +function testJestWorker() { + return new Promise(async (resolve, reject) => { + const startTime = Date.now(); + let count = 0; + + async function countToFinish() { + if (++count === calls) { + farm.end(); + const endTime = Date.now(); + + // Let all workers go down. + await sleep(2000); + + resolve({ + globalTime: endTime - startTime - 2000, + processingTime: endTime - startProcess, + }); + } + } + + const farm = new JestWorker(require.resolve('./workers/jest_worker'), { + exposedMethods: ['loadTest'], + forkOptions: {execArgv: []}, + workers: threads, + }); + + farm.getStdout().pipe(process.stdout); + farm.getStderr().pipe(process.stderr); + + // Let all workers come up. + await sleep(2000); + + const startProcess = Date.now(); + + for (let i = 0; i < calls; i++) { + const promisified = farm.loadTest(); + + promisified.then(countToFinish); + } + }); +} + +function profile(x) { + console.profile(x); +} + +function profileEnd(x) { + console.profileEnd(x); +} + +async function main() { + if (!global.gc) { + console.log('GC not present'); + } + + const wFResults = []; + const jWResults = []; + + for (let i = 0; i < 10; i++) { + console.log('-'.repeat(75)); + + profile('worker farm'); + const wF = await testWorkerFarm(); + profileEnd('worker farm'); + await sleep(3000); + // eslint-disable-next-line no-undef + global.gc && gc(); + + profile('jest worker'); + const jW = await testJestWorker(); + profileEnd('jest worker'); + await sleep(3000); + // eslint-disable-next-line no-undef + global.gc && gc(); + + wFResults.push(wF); + jWResults.push(jW); + + console.log('jest-worker:', jW); + console.log('worker-farm:', wF); + } + + let wFGT = 0; + let wFPT = 0; + let jWGT = 0; + let jWPT = 0; + + for (let i = 0; i < 10; i++) { + wFGT += wFResults[i].globalTime; + wFPT += wFResults[i].processingTime; + + jWGT += jWResults[i].globalTime; + jWPT += jWResults[i].processingTime; + } + + console.log('-'.repeat(75)); + console.log('total worker-farm:', {wFGT, wFPT}); + console.log('total jest-worker:', {jWGT, jWPT}); + + console.log('-'.repeat(75)); + console.log( + `% improvement over ${calls} calls (global time):`, + 100 * (wFGT - jWGT) / wFGT, + ); + + console.log( + `% improvement over ${calls} calls (processing time):`, + 100 * (wFPT - jWPT) / wFPT, + ); +} + +main(); diff --git a/packages/jest-worker/src/__performance_tests__/workers/jest_worker.js b/packages/jest-worker/src/__performance_tests__/workers/jest_worker.js new file mode 100644 index 000000000000..e48d8527c4f8 --- /dev/null +++ b/packages/jest-worker/src/__performance_tests__/workers/jest_worker.js @@ -0,0 +1,7 @@ +'use strict'; + +const pi = require('./pi'); + +module.exports.loadTest = function() { + return pi(); +}; diff --git a/packages/jest-worker/src/__performance_tests__/workers/pi.js b/packages/jest-worker/src/__performance_tests__/workers/pi.js new file mode 100644 index 000000000000..76ffd79a4a3d --- /dev/null +++ b/packages/jest-worker/src/__performance_tests__/workers/pi.js @@ -0,0 +1,14 @@ +'use strict'; + +module.exports = function() { + const points = 10000; + let inside = 0; + + for (let i = 0; i < points; i++) { + if (Math.pow(Math.random(), 2) + Math.pow(Math.random(), 2) <= 1) { + inside++; + } + } + + return 4 * inside / points; +}; diff --git a/packages/jest-worker/src/__performance_tests__/workers/worker_farm.js b/packages/jest-worker/src/__performance_tests__/workers/worker_farm.js new file mode 100644 index 000000000000..4213db2463b4 --- /dev/null +++ b/packages/jest-worker/src/__performance_tests__/workers/worker_farm.js @@ -0,0 +1,7 @@ +'use strict'; + +const pi = require('./pi'); + +module.exports.loadTest = function(callback) { + callback(null, pi()); +}; diff --git a/packages/jest-worker/src/__tests__/child.test.js b/packages/jest-worker/src/__tests__/child.test.js new file mode 100644 index 000000000000..2cf358cb9777 --- /dev/null +++ b/packages/jest-worker/src/__tests__/child.test.js @@ -0,0 +1,331 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +const mockError = new TypeError('Booo'); +const mockExtendedError = new ReferenceError('Booo extended'); +const processExit = process.exit; +const processSend = process.send; +const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); + +import { + CHILD_MESSAGE_INITIALIZE, + CHILD_MESSAGE_CALL, + CHILD_MESSAGE_END, + PARENT_MESSAGE_OK, + PARENT_MESSAGE_ERROR, +} from '../types'; + +let mockCount; + +beforeEach(() => { + mockCount = 0; + + jest.mock( + '../my-fancy-worker', + () => { + mockCount++; + + return { + fooPromiseThrows() { + return new Promise((resolve, reject) => { + setTimeout(() => reject(mockError), 5); + }); + }, + + fooPromiseWorks() { + return new Promise((resolve, reject) => { + setTimeout(() => resolve(1989), 5); + }); + }, + + fooThrows() { + throw mockError; + }, + + fooThrowsANumber() { + // eslint-disable-next-line no-throw-literal + throw 412; + }, + + fooThrowsAnErrorWithExtraProperties() { + mockExtendedError.baz = 123; + mockExtendedError.qux = 456; + + throw mockExtendedError; + }, + + fooThrowsNull() { + // eslint-disable-next-line no-throw-literal + throw null; + }, + + fooWorks() { + return 1989; + }, + }; + }, + {virtual: true}, + ); + + jest.mock( + '../my-fancy-standalone-worker', + () => jest.fn().mockImplementation(() => 12345), + {virtual: true}, + ); + + // This mock emulates a transpiled Babel module that carries a default export + // that corresponds to a method. + jest.mock( + '../my-fancy-babel-worker', + () => ({ + __esModule: true, + default: jest.fn().mockImplementation(() => 67890), + }), + {virtual: true}, + ); + + process.exit = jest.fn(); + process.send = jest.fn(); + + // Require the child! + require('../child'); +}); + +afterEach(() => { + jest.resetModules(); + + process.removeAllListeners('message'); + + process.exit = processExit; + process.send = processSend; +}); + +it('lazily requires the file', () => { + expect(mockCount).toBe(0); + + process.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + expect(mockCount).toBe(0); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooWorks', + [], + ]); + + expect(mockCount).toBe(1); +}); + +it('returns results immediately when function is synchronous', () => { + process.send = jest.fn(); + + process.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooWorks', + [], + ]); + + expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 1989]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrows', + [], + ]); + + expect(process.send.mock.calls[1][0]).toEqual([ + PARENT_MESSAGE_ERROR, + 'TypeError', + 'Booo', + mockError.stack, + {}, + ]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrowsANumber', + [], + ]); + + expect(process.send.mock.calls[2][0]).toEqual([ + PARENT_MESSAGE_ERROR, + 'Number', + void 0, + void 0, + 412, + ]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrowsAnErrorWithExtraProperties', + [], + ]); + + expect(process.send.mock.calls[3][0]).toEqual([ + PARENT_MESSAGE_ERROR, + 'ReferenceError', + 'Booo extended', + mockExtendedError.stack, + {baz: 123, qux: 456}, + ]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrowsNull', + [], + ]); + + expect(process.send.mock.calls[4][0][0]).toBe(PARENT_MESSAGE_ERROR); + expect(process.send.mock.calls[4][0][1]).toBe('Error'); + expect(process.send.mock.calls[4][0][2]).toEqual( + '"null" or "undefined" thrown', + ); + + expect(process.send.mock.calls.length).toBe(5); +}); + +it('returns results when it gets resolved if function is asynchronous', async () => { + jest.useRealTimers(); + + process.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooPromiseWorks', + [], + ]); + + await sleep(10); + + expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 1989]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooPromiseThrows', + [], + ]); + + await sleep(10); + + expect(process.send.mock.calls[1][0]).toEqual([ + PARENT_MESSAGE_ERROR, + 'TypeError', + 'Booo', + mockError.stack, + {}, + ]); + + expect(process.send.mock.calls.length).toBe(2); +}); + +it('calls the main module if the method call is "default"', () => { + process.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-standalone-worker', + ]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'default', + [], + ]); + + expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 12345]); +}); + +it('calls the main export if the method call is "default" and it is a Babel transpiled one', () => { + process.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-babel-worker', + ]); + + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'default', + [], + ]); + + expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 67890]); +}); + +it('finishes the process with exit code 0 if requested', () => { + process.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + process.emit('message', [ + CHILD_MESSAGE_END, + true, // Not really used here, but for flow type purity. + ]); + + expect(process.exit.mock.calls[0]).toEqual([0]); +}); + +it('throws if an invalid message is detected', () => { + // Type 27 does not exist. + expect(() => { + process.emit('message', [27]); + }).toThrow(TypeError); +}); + +it('throws if child is not forked', () => { + delete process.send; + + process.emit('message', [ + CHILD_MESSAGE_INITIALIZE, + true, // Not really used here, but for flow type purity. + './my-fancy-worker', + ]); + + expect(() => { + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooWorks', + [], + ]); + }).toThrow(); + + expect(() => { + process.emit('message', [ + CHILD_MESSAGE_CALL, + true, // Not really used here, but for flow type purity. + 'fooThrows', + [], + ]); + }).toThrow(); +}); diff --git a/packages/jest-worker/src/__tests__/index.test.js b/packages/jest-worker/src/__tests__/index.test.js new file mode 100644 index 000000000000..4f63ff7746d6 --- /dev/null +++ b/packages/jest-worker/src/__tests__/index.test.js @@ -0,0 +1,325 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +let Farm; +let Worker; +let mockWorkers; + +function workerReply(i, error, result) { + return mockWorkers[i].send.mock.calls[0][1].call( + mockWorkers[i], + error, + result, + ); +} + +beforeEach(() => { + mockWorkers = []; + + // The worker mock returns a worker with custom methods, plus it stores them + // in a global list, so that they can be accessed later. This list is reset in + // every test. + jest.mock('../worker', () => { + const fakeClass = jest.fn(() => { + const fakeWorker = { + getStderr: () => ({once() {}, pipe() {}}), + getStdout: () => ({once() {}, pipe() {}}), + send: jest.fn(), + }; + + mockWorkers.push(fakeWorker); + + return fakeWorker; + }); + + return { + __esModule: true, + default: fakeClass, + }; + }); + + jest.mock( + '/fake-worker.js', + () => { + return { + _shouldNotExist1() {}, + methodA() {}, + methodB() {}, + }; + }, + {virtual: true}, + ); + + jest.mock( + '/fake-worker-with-default-method.js', + () => { + return () => {}; + }, + {virtual: true}, + ); + + Worker = require('../worker').default; + Farm = require('../index').default; +}); + +afterEach(() => { + jest.resetModules(); +}); + +it('exposes the right API', () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + expect(typeof farm.foo).toBe('function'); + expect(typeof farm.bar).toBe('function'); +}); + +it('breaks if any of the forbidden methods is tried to be exposed', () => { + expect( + () => new Farm('/tmp/baz.js', {exposedMethods: ['getStdout']}), + ).toThrow(); + + expect( + () => new Farm('/tmp/baz.js', {exposedMethods: ['getStderr']}), + ).toThrow(); + + expect(() => new Farm('/tmp/baz.js', {exposedMethods: ['end']})).toThrow(); +}); + +it('works with minimal options', () => { + // eslint-disable-next-line no-new + const farm1 = new Farm('/fake-worker.js'); + + expect(Worker.mock.calls.length).toBe(require('os').cpus().length - 1); + expect(typeof farm1.methodA).toBe('function'); + expect(typeof farm1.methodB).toBe('function'); + expect(typeof farm1._shouldNotExist).not.toBe('function'); + + // eslint-disable-next-line no-new + const farm2 = new Farm('/fake-worker-with-default-method.js'); + + expect(typeof farm2.default).toBe('function'); +}); + +it('tries instantiating workers with the right options', () => { + // eslint-disable-next-line no-new + new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + forkOptions: {execArgv: []}, + numWorkers: 4, + }); + + expect(Worker.mock.calls.length).toBe(4); + expect(Worker.mock.calls[0][0]).toEqual({ + forkOptions: {execArgv: []}, + workerPath: '/tmp/baz.js', + }); +}); + +it('makes a non-existing relative worker throw', () => { + expect( + () => + new Farm('./baz.js', { + exposedMethods: [], + numWorkers: 1, + }), + ).toThrow(); +}); + +it('aggregates all stdouts and stderrs from all workers', () => { + const out = []; + const err = []; + + Worker.mockImplementation(() => { + return { + getStderr: () => ({ + once() {}, + pipe(errStream) { + err.push(errStream); + }, + }), + getStdout: () => ({ + once() {}, + pipe(outStream) { + out.push(outStream); + }, + }), + }; + }); + + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 2, + }); + + expect(out.length).toBe(2); + expect(err.length).toBe(2); + + const stdout = jest.fn(); + const stderr = jest.fn(); + + farm.getStdout().on('data', stdout); + farm.getStderr().on('data', stderr); + + out[0].write(Buffer.from('hello')); + out[1].write(Buffer.from('bye')); + err[1].write(Buffer.from('house')); + err[0].write(Buffer.from('tree')); + + expect(stdout.mock.calls[0][0].toString()).toBe('hello'); + expect(stdout.mock.calls[1][0].toString()).toBe('bye'); + expect(stderr.mock.calls[0][0].toString()).toBe('house'); + expect(stderr.mock.calls[1][0].toString()).toBe('tree'); +}); + +it('does not let make calls after the farm is ended', () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + farm.end(); + + expect(() => farm.foo()).toThrow(); + expect(() => farm.bar()).toThrow(); +}); + +it('does not let end the farm after it is ended', () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + farm.end(); + + expect(() => farm.end()).toThrow(); +}); + +it('calls "computeWorkerKey" for each of the calls', () => { + const computeWorkerKey = jest.fn(); + const farm = new Farm('/tmp/baz.js', { + computeWorkerKey, + exposedMethods: ['foo', 'bar'], + numWorkers: 3, + }); + + farm.foo('car', 'plane'); + + expect(computeWorkerKey.mock.calls[0]).toEqual(['foo', 'car', 'plane']); +}); + +it('returns the result if the call worked', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 1, + }); + + const promise = farm.foo('car', 'plane'); + + workerReply(0, null, 34); + expect(await promise).toEqual(34); +}); + +it('throws if the call failed', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 1, + }); + + const promise = farm.foo('car', 'plane'); + let error = null; + + workerReply(0, new TypeError('Massively broken')); + + try { + await promise; + } catch (err) { + error = err; + } + + expect(error).not.toBe(null); + expect(error).toBeInstanceOf(TypeError); +}); + +it('sends non-sticked tasks to all workers', () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 3, + }); + + farm.foo('car', 'plane'); + + expect(mockWorkers[0].send.mock.calls.length).toBe(1); + expect(mockWorkers[1].send.mock.calls.length).toBe(1); + expect(mockWorkers[2].send.mock.calls.length).toBe(1); +}); + +it('sends first-time sticked tasks to all workers', () => { + const farm = new Farm('/tmp/baz.js', { + computeWorkerKey: () => '1234567890abcdef', + exposedMethods: ['foo', 'bar'], + numWorkers: 3, + }); + + farm.foo('car', 'plane'); + + expect(mockWorkers[0].send.mock.calls.length).toBe(1); + expect(mockWorkers[1].send.mock.calls.length).toBe(1); + expect(mockWorkers[2].send.mock.calls.length).toBe(1); +}); + +it('checks that once a sticked task finishes, next time is sent to that worker', async () => { + const farm = new Farm('/tmp/baz.js', { + computeWorkerKey: () => '1234567890abcdef', + exposedMethods: ['foo', 'bar'], + numWorkers: 3, + }); + + // Worker 1 successfully replies with "17" as a result. + const promise = farm.foo('car', 'plane'); + workerReply(1, null, 17); + await promise; + + // Note that the stickiness is not created by the method name or the arguments + // it is solely controlled by the provided "computeWorkerKey" method, which in the + // test example always returns the same key, so all calls should be redirected + // to worker 1 (which is the one that resolved the first call). + farm.bar(); + + // The first time, a call with a "1234567890abcdef" hash had never been done + // earlier ("foo" call), so it got queued to all workers. Later, since the one + // that resolved the call was the one in position 1, all subsequent calls are + // only redirected to that worker. + expect(mockWorkers[0].send.mock.calls.length).toBe(1); // Only "foo". + expect(mockWorkers[1].send.mock.calls.length).toBe(2); // "foo" + "bar". + expect(mockWorkers[2].send.mock.calls.length).toBe(1); // Only "foo". +}); + +it('checks that once a non-sticked task finishes, next time is sent to all workers', async () => { + // Note there is no "computeWorkerKey". + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 3, + }); + + // Worker 1 successfully replies with "17" as a result. + const promise = farm.foo('car', 'plane'); + workerReply(1, null, 17); + await promise; + + farm.bar(); + + // Since "computeWorkerKey" does not return anything, new jobs are sent again to + // all existing workers. + expect(mockWorkers[0].send.mock.calls.length).toBe(2); + expect(mockWorkers[1].send.mock.calls.length).toBe(2); + expect(mockWorkers[2].send.mock.calls.length).toBe(2); +}); diff --git a/packages/jest-worker/src/__tests__/worker.test.js b/packages/jest-worker/src/__tests__/worker.test.js new file mode 100644 index 000000000000..09c5ebb7915b --- /dev/null +++ b/packages/jest-worker/src/__tests__/worker.test.js @@ -0,0 +1,242 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +/* eslint-disable no-new */ + +import {EventEmitter} from 'events'; + +import { + CHILD_MESSAGE_CALL, + CHILD_MESSAGE_INITIALIZE, + PARENT_MESSAGE_ERROR, + PARENT_MESSAGE_OK, +} from '../types'; + +let Worker; +let forkInterface; +let childProcess; + +beforeEach(() => { + forkInterface = Object.assign(new EventEmitter(), { + send: jest.fn(), + stderr: {}, + stdout: {}, + }); + + jest.mock('child_process'); + + childProcess = require('child_process'); + childProcess.fork.mockImplementation(() => forkInterface); + + Worker = require('../worker').default; +}); + +afterEach(() => { + jest.resetModules(); +}); + +it('passes fork options down to child_process.fork, adding the defaults', () => { + const child = require.resolve('../child'); + new Worker({ + forkOptions: { + cwd: '/tmp', + execArgv: ['--no-warnings'], + }, + workerPath: '/tmp/foo/bar/baz.js', + }); + + expect(childProcess.fork.mock.calls[0][0]).toBe(child); + expect(childProcess.fork.mock.calls[0][1]).toEqual({ + cwd: '/tmp', // Overridden default option. + env: process.env, // Default option. + execArgv: ['--no-warnings'], // Added option. + silent: true, // Default option. + }); +}); + +it('initializes the child process with the given workerPath', () => { + new Worker({ + workerPath: '/tmp/foo/bar/baz.js', + }); + + expect(forkInterface.send.mock.calls[0][0]).toEqual([ + CHILD_MESSAGE_INITIALIZE, + false, + '/tmp/foo/bar/baz.js', + ]); +}); + +it('provides stdout and stderr fields from the child process', () => { + const worker = new Worker({ + workerPath: '/tmp/foo', + }); + + expect(worker.getStdout()).toBe(forkInterface.stdout); + expect(worker.getStderr()).toBe(forkInterface.stderr); +}); + +it('swtiches the processed flag of a task as soon as it is processed', () => { + const worker = new Worker({ + workerPath: '/tmp/foo', + }); + + const request1 = [CHILD_MESSAGE_CALL, false, 'foo', []]; + const request2 = [CHILD_MESSAGE_CALL, false, 'bar', []]; + + worker.send(request1, () => {}); + worker.send(request2, () => {}); + + // The queue is empty when it got send, so the task is processed. + expect(request1[1]).toBe(true); + + // The previous one is being processed, so that one stays as unprocessed. + expect(request2[1]).toBe(false); +}); + +it('sends the task to the child process', () => { + const worker = new Worker({ + workerPath: '/tmp/foo', + }); + + const request = [CHILD_MESSAGE_CALL, false, 'foo', []]; + + worker.send(request, () => {}); + + // Skipping call "0" because it corresponds to the "initialize" one. + expect(forkInterface.send.mock.calls[1][0]).toEqual(request); +}); + +it('relates replies to requests, in order', () => { + const worker = new Worker({ + workerPath: '/tmp/foo', + }); + + const callback1 = jest.fn(); + const request1 = [CHILD_MESSAGE_CALL, false, 'foo', []]; + + const callback2 = jest.fn(); + const request2 = [CHILD_MESSAGE_CALL, false, 'bar', []]; + + worker.send(request1, callback1); + worker.send(request2, callback2); + + // 2nd call waits on the queue... + expect(request2[1]).toBe(false); + + // then first call replies... + forkInterface.emit('message', [PARENT_MESSAGE_OK, 44]); + + expect(callback1.mock.calls[0][0]).toBeFalsy(); + expect(callback1.mock.calls[0][1]).toBe(44); + expect(callback1.mock.instances[0]).toBe(worker); + + // which causes the second call to be processed... + expect(request2[1]).toBe(true); + + // and then the second call replies... + forkInterface.emit('message', [ + PARENT_MESSAGE_ERROR, + 'TypeError', + 'foo', + 'TypeError: foo', + {}, + ]); + + expect(callback2.mock.calls[0][0].message).toBe('foo'); + expect(callback2.mock.instances[0]).toBe(worker); +}); + +it('creates error instances for known errors', () => { + const worker = new Worker({ + workerPath: '/tmp/foo', + }); + + const callback1 = jest.fn(); + const callback2 = jest.fn(); + const callback3 = jest.fn(); + + // Testing a generic ECMAScript error. + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback1); + + forkInterface.emit('message', [ + PARENT_MESSAGE_ERROR, + 'TypeError', + 'bar', + 'TypeError: bar', + {}, + ]); + + expect(callback1.mock.calls[0][0]).toBeInstanceOf(TypeError); + expect(callback1.mock.calls[0][0].message).toBe('bar'); + expect(callback1.mock.calls[0][0].type).toBe('TypeError'); + expect(callback1.mock.calls[0][0].stack).toBe('TypeError: bar'); + + // Testing a custom error. + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback2); + + forkInterface.emit('message', [ + PARENT_MESSAGE_ERROR, + 'RandomCustomError', + 'bar', + 'RandomCustomError: bar', + {qux: 'extra property'}, + ]); + + expect(callback2.mock.calls[0][0]).toBeInstanceOf(Error); + expect(callback2.mock.calls[0][0].message).toBe('bar'); + expect(callback2.mock.calls[0][0].type).toBe('RandomCustomError'); + expect(callback2.mock.calls[0][0].stack).toBe('RandomCustomError: bar'); + expect(callback2.mock.calls[0][0].qux).toBe('extra property'); + + // Testing a non-object throw. + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback3); + + forkInterface.emit('message', [ + PARENT_MESSAGE_ERROR, + 'Number', + null, + null, + 412, + ]); + + expect(callback3.mock.calls[0][0]).toBe(412); +}); + +it('throws when the child process returns a strange message', () => { + const worker = new Worker({ + workerPath: '/tmp/foo', + }); + + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}); + + // Type 27 does not exist. + expect(() => { + forkInterface.emit('message', [27]); + }).toThrow(TypeError); +}); + +it('does not restart the child if it cleanly exited', () => { + new Worker({ + workerPath: '/tmp/foo', + }); + + expect(childProcess.fork.mock.calls.length).toBe(1); + forkInterface.emit('exit', 0); + expect(childProcess.fork.mock.calls.length).toBe(1); +}); + +it('restarts the child when the child process dies', () => { + new Worker({ + workerPath: '/tmp/foo', + }); + + expect(childProcess.fork.mock.calls.length).toBe(1); + forkInterface.emit('exit', 1); + expect(childProcess.fork.mock.calls.length).toBe(2); +}); diff --git a/packages/jest-worker/src/child.js b/packages/jest-worker/src/child.js new file mode 100644 index 000000000000..dc372bf9a088 --- /dev/null +++ b/packages/jest-worker/src/child.js @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import { + CHILD_MESSAGE_CALL, + CHILD_MESSAGE_END, + CHILD_MESSAGE_INITIALIZE, + PARENT_MESSAGE_ERROR, + PARENT_MESSAGE_OK, +} from './types'; + +let file = null; + +/** + * This file is a small bootstrapper for workers. It sets up the communication + * between the worker and the parent process, interpreting parent messages and + * sending results back. + * + * The file loaded will be lazily initialized the first time any of the workers + * is called. This is done for optimal performance: if the farm is initialized, + * but no call is made to it, child Node processes will be consuming the least + * possible amount of memory. + * + * If an invalid message is detected, the child will exit (by throwing) with a + * non-zero exit code. + */ +process.on('message', (request: any /* Should be ChildMessage */) => { + switch (request[0]) { + case CHILD_MESSAGE_INITIALIZE: + file = request[2]; + break; + + case CHILD_MESSAGE_CALL: + execMethod(request[2], request[3]); + break; + + case CHILD_MESSAGE_END: + process.exit(0); + break; + + default: + throw new TypeError( + 'Unexpected request from parent process: ' + request[0], + ); + } +}); + +function reportSuccess(result: any) { + if (!process || !process.send) { + throw new Error('Child can only be used on a forked process'); + } + + process.send([PARENT_MESSAGE_OK, result]); +} + +function reportError(error: Error) { + if (!process || !process.send) { + throw new Error('Child can only be used on a forked process'); + } + + if (error == null) { + error = new Error('"null" or "undefined" thrown'); + } + + process.send([ + PARENT_MESSAGE_ERROR, + error.constructor && error.constructor.name, + error.message, + error.stack, + // $FlowFixMe: this is safe to just inherit from Object. + typeof error === 'object' ? Object.assign({}, error) : error, + ]); +} + +function execMethod(method: string, args: $ReadOnlyArray): void { + // $FlowFixMe: This has to be a dynamic require. + const main = require(file); + let result; + + try { + if (method === 'default') { + result = (main.__esModule ? main['default'] : main).apply(global, args); + } else { + result = main[method].apply(main, args); + } + } catch (err) { + reportError(err); + return; + } + + if (result && typeof result.then === 'function') { + result.then(reportSuccess, reportError); + } else { + reportSuccess(result); + } +} diff --git a/packages/jest-worker/src/index.js b/packages/jest-worker/src/index.js new file mode 100644 index 000000000000..fce49cf96208 --- /dev/null +++ b/packages/jest-worker/src/index.js @@ -0,0 +1,190 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import mergeStream from 'merge-stream'; +import os from 'os'; +import path from 'path'; + +import type {FarmOptions} from './types'; +import type {Readable} from 'stream'; + +import {CHILD_MESSAGE_CALL, CHILD_MESSAGE_END} from './types'; +import Worker from './worker'; + +/* istanbul ignore next */ +const emptyMethod = () => {}; + +/** + * The Jest farm (publicly called "Worker") is a class that allows you to queue + * methods across multiple child processes, in order to parallelize work. This + * is done by providing an absolute path to a module that will be loaded on each + * of the child processes, and bridged to the main process. + * + * Bridged methods are specified by using the "exposedMethods" property of the + * options "object". This is an array of strings, where each of them corresponds + * to the exported name in the loaded module. + * + * You can also control the amount of workers by using the "numWorkers" property + * of the "options" object, and the settings passed to fork the process through + * the "forkOptions" property. The amount of workers defaults to the amount of + * CPUS minus one. + * + * Queueing calls can be done in two ways: + * - Standard method: calls will be redirected to the first available worker, + * so they will get executed as soon as they can. + * + * - Sticky method: if a "computeWorkerKey" method is provided within the + * config, the resulting string of this method will be used as a key. + * Everytime this key is returned, it is guaranteed that your job will be + * processed by the same worker. This is specially useful if your workers are + * caching results. + */ +export default class { + _stdout: Readable; + _stderr: Readable; + _ending: boolean; + _cacheKeys: {[string]: Worker}; + _options: FarmOptions; + _workers: Array; + + constructor(workerPath: string, options?: FarmOptions = {}) { + const numWorkers = options.numWorkers || os.cpus().length - 1; + const workers = new Array(numWorkers); + const stdout = mergeStream(); + const stderr = mergeStream(); + + if (!path.isAbsolute(workerPath)) { + workerPath = require.resolve(workerPath); + } + + // Build the options once for all workers to avoid allocating extra objects. + const workerOptions = { + forkOptions: options.forkOptions || {}, + workerPath, + }; + + for (let i = 0; i < numWorkers; i++) { + const worker = new Worker(workerOptions); + + stdout.add(worker.getStdout()); + stderr.add(worker.getStderr()); + + workers[i] = worker; + } + + let exposedMethods = options.exposedMethods; + + // If no methods list is given, try getting it by auto-requiring the module. + if (!exposedMethods) { + // $FlowFixMe: This has to be a dynamic require. + const child = require(workerPath); + + exposedMethods = Object.keys(child).filter( + name => typeof child[name] === 'function', + ); + + if (typeof child === 'function') { + exposedMethods.push('default'); + } + } + + exposedMethods.forEach(name => { + if (name.startsWith('_')) { + return; + } + + if (this.constructor.prototype.hasOwnProperty(name)) { + throw new TypeError('Cannot define a method called ' + name); + } + + // $FlowFixMe: dynamic extension of the class instance is expected. + this[name] = this._makeCall.bind(this, name); + }); + + this._stdout = stdout; + this._stderr = stderr; + this._ending = false; + this._cacheKeys = Object.create(null); + this._options = options; + this._workers = workers; + } + + getStdout(): Readable { + return this._stdout; + } + + getStderr(): Readable { + return this._stderr; + } + + end() { + if (this._ending) { + throw new Error('Farm is ended, no more calls can be done to it'); + } + + const workers = this._workers; + + // We do not cache the request object here. If so, it would only be only + // processed by one of the workers, and we want them all to close. + for (let i = 0; i < workers.length; i++) { + workers[i].send([CHILD_MESSAGE_END, false], emptyMethod); + } + + this._ending = true; + } + + // eslint-disable-next-line no-unclear-flowtypes + _makeCall(method: string, ...args: Array): Promise { + if (this._ending) { + throw new Error('Farm is ended, no more calls can be done to it'); + } + + return new Promise((resolve, reject) => { + const {computeWorkerKey} = this._options; + const workers = this._workers; + const cacheKeys = this._cacheKeys; + const request = [CHILD_MESSAGE_CALL, false, method, args]; + + let worker = null; + let hash = null; + + if (computeWorkerKey) { + hash = computeWorkerKey.apply(this, [method].concat(args)); + worker = hash == null ? null : cacheKeys[hash]; + } + + // Do not use a fat arrow since we need the "this" value, which points to + // the worker that executed the call. + function callback(error, result) { + if (hash != null) { + cacheKeys[hash] = this; + } + + if (error) { + reject(error); + } else { + resolve(result); + } + } + + // If a worker is pre-selected, use it... + if (worker) { + worker.send(request, callback); + return; + } + + // ... otherwise use all workers, so the first one available will pick it. + for (let i = 0; i < workers.length; i++) { + workers[i].send(request, callback); + } + }); + } +} diff --git a/packages/jest-worker/src/types.js b/packages/jest-worker/src/types.js new file mode 100644 index 000000000000..81305118d500 --- /dev/null +++ b/packages/jest-worker/src/types.js @@ -0,0 +1,99 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +// Because of the dynamic nature of a worker communication process, all messages +// coming from any of the other processes cannot be typed. Thus, many types +// include "any" as a flow type, which is (unfortunately) correct here. + +/* eslint-disable no-unclear-flowtypes */ + +export const CHILD_MESSAGE_INITIALIZE: 0 = 0; +export const CHILD_MESSAGE_CALL: 1 = 1; +export const CHILD_MESSAGE_END: 2 = 2; + +export const PARENT_MESSAGE_OK: 0 = 0; +export const PARENT_MESSAGE_ERROR: 1 = 1; + +// Option objects. + +export type ForkOptions = { + cwd?: string, + env?: Object, + execPath?: string, + execArgv?: Array, + silent?: boolean, + stdio?: Array, + uid?: number, + gid?: number, +}; + +export type FarmOptions = { + computeWorkerKey?: (string, ...Array) => ?string, + exposedMethods?: $ReadOnlyArray, + forkOptions?: ForkOptions, + numWorkers?: number, +}; + +export type WorkerOptions = {| + forkOptions?: ForkOptions, + workerPath: string, +|}; + +// Messages passed from the parent to the children. + +export type ChildMessageInitialize = [ + typeof CHILD_MESSAGE_INITIALIZE, // type + boolean, // processed + string, // file +]; + +export type ChildMessageCall = [ + typeof CHILD_MESSAGE_CALL, // type + boolean, // processed + string, // method + $ReadOnlyArray, // args +]; + +export type ChildMessageEnd = [ + typeof CHILD_MESSAGE_END, // type + boolean, // processed +]; + +export type ChildMessage = + | ChildMessageInitialize + | ChildMessageCall + | ChildMessageEnd; + +// Messages passed from the children to the parent. + +export type ParentMessageOk = [ + typeof PARENT_MESSAGE_OK, // type + any, // result +]; + +export type ParentMessageError = [ + typeof PARENT_MESSAGE_ERROR, // type + string, // constructor + string, // message + string, // stack + any, // extra +]; + +export type ParentMessage = ParentMessageOk | ParentMessageError; + +// Queue types. + +export type QueueCallback = (?Error, ?any) => void; + +export type QueueChildMessage = {| + request: ChildMessage, + callback: QueueCallback, +|}; diff --git a/packages/jest-worker/src/worker.js b/packages/jest-worker/src/worker.js new file mode 100644 index 000000000000..5a12fd573b61 --- /dev/null +++ b/packages/jest-worker/src/worker.js @@ -0,0 +1,171 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import childProcess from 'child_process'; + +import { + CHILD_MESSAGE_INITIALIZE, + PARENT_MESSAGE_ERROR, + PARENT_MESSAGE_OK, +} from './types'; + +import type {ChildProcess} from 'child_process'; +import type {Readable} from 'stream'; + +import type { + ChildMessage, + QueueCallback, + QueueChildMessage, + WorkerOptions, +} from './types'; + +/** + * This class wraps the child process and provides a nice interface to + * communicate with. It takes care of: + * + * - Re-spawning the process if it dies. + * - Queues calls while the worker is busy. + * - Re-sends the requests if the worker blew up. + * + * The reason for queueing them here (since childProcess.send also has an + * internal queue) is because the worker could be doing asynchronous work, and + * this would lead to the child process to read its receiving buffer and start a + * second call. By queueing calls here, we don't send the next call to the + * children until we receive the result of the previous one. + * + * As soon as a request starts to be processed by a worker, its "processed" + * field is changed to "true", so that other workers which might encounter the + * same call skip it. + */ +export default class { + _busy: boolean; + _child: ChildProcess; + _options: WorkerOptions; + _queue: Array; + + constructor(options: WorkerOptions) { + this._options = options; + this._queue = []; + + this._initialize(); + } + + getStdout(): Readable { + return this._child.stdout; + } + + getStderr(): Readable { + return this._child.stderr; + } + + send(request: ChildMessage, callback: QueueCallback) { + this._queue.push({callback, request}); + this._process(); + } + + _initialize() { + const child = childProcess.fork( + require.resolve('./child'), + // $FlowFixMe: Flow does not work well with Object.assign. + Object.assign( + { + cwd: process.cwd(), + env: process.env, + silent: true, + }, + this._options.forkOptions, + ), + ); + + child.on('message', this._receive.bind(this)); + child.on('exit', this._exit.bind(this)); + + // $FlowFixMe: wrong "ChildProcess.send" signature. + child.send([CHILD_MESSAGE_INITIALIZE, false, this._options.workerPath]); + + this._child = child; + this._busy = false; + } + + _process() { + if (this._busy) { + return; + } + + const queue = this._queue; + let skip = 0; + + // Calls in the queue might have already been processed by another worker, + // so we have to skip them. + while (queue.length > skip && queue[skip].request[1]) { + skip++; + } + + // Remove all pieces at once. + queue.splice(0, skip); + + if (queue.length) { + const call = queue[0]; + + // Flag the call as processed, so that other workers know that they don't + // have to process it as well. + call.request[1] = true; + + this._busy = true; + // $FlowFixMe: wrong "ChildProcess.send" signature. + this._child.send(call.request); + } + } + + _receive(response: any /* Should be ParentMessage */) { + const callback = this._queue[0].callback; + + this._busy = false; + this._process(); + + switch (response[0]) { + case PARENT_MESSAGE_OK: + callback.call(this, null, response[1]); + break; + + case PARENT_MESSAGE_ERROR: + let error = response[4]; + + if (error != null && typeof error === 'object') { + const extra = error; + const NativeCtor = global[response[1]]; + const Ctor = typeof NativeCtor === 'function' ? NativeCtor : Error; + + error = new Ctor(response[2]); + // $FlowFixMe: adding custom properties to errors. + error.type = response[1]; + error.stack = response[3]; + + for (const key in extra) { + // $FlowFixMe: adding custom properties to errors. + error[key] = extra[key]; + } + } + + callback.call(this, error, null); + break; + + default: + throw new TypeError('Unexpected response from worker: ' + response[0]); + } + } + + _exit(exitCode: number) { + if (exitCode !== 0) { + this._initialize(); + } + } +} diff --git a/yarn.lock b/yarn.lock index d1eade80989c..5b40ce764094 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4129,6 +4129,12 @@ meow@^3.3.0, meow@^3.7.0: redent "^1.0.0" trim-newlines "^1.0.0" +merge-stream@^1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/merge-stream/-/merge-stream-1.0.1.tgz#4041202d508a342ba00174008df0c251b8c135e1" + dependencies: + readable-stream "^2.0.1" + merge@^1.1.3: version "1.2.0" resolved "https://registry.yarnpkg.com/merge/-/merge-1.2.0.tgz#7531e39d4949c281a66b8c5a6e0265e8b05894da" @@ -4958,7 +4964,7 @@ readable-stream@^1.0.26-4, readable-stream@~1.0.2, readable-stream@~1.0.26, read isarray "0.0.1" string_decoder "~0.10.x" -readable-stream@^2.0.2, readable-stream@^2.0.6, readable-stream@^2.1.4, readable-stream@^2.1.5, readable-stream@^2.2.2, readable-stream@^2.2.6: +readable-stream@^2.0.1, readable-stream@^2.0.2, readable-stream@^2.0.6, readable-stream@^2.1.4, readable-stream@^2.1.5, readable-stream@^2.2.2, readable-stream@^2.2.6: version "2.3.3" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.3.3.tgz#368f2512d79f9d46fdfc71349ae7878bbc1eb95c" dependencies: