diff --git a/doc/api/async_hooks.md b/doc/api/async_hooks.md index c2c0d7e1da65b0..4f3039826bccfa 100644 --- a/doc/api/async_hooks.md +++ b/doc/api/async_hooks.md @@ -682,6 +682,128 @@ never be called. * Returns: {number} The same `triggerAsyncId` that is passed to the `AsyncResource` constructor. + +### Using `AsyncResource` for a `Worker` thread pool + +The following example shows how to use the `AsyncResource` class to properly +provide async tracking for a [`Worker`][] pool. Other resource pools, such as +database connection pools, can follow a similar model. + +Assuming that the task is adding two numbers, using a file named +`task_processor.js` with the following content: + +```js +const { parentPort } = require('worker_threads'); +parentPort.on('message', (task) => { + parentPort.postMessage(task.a + task.b); +}); +``` + +a Worker pool around it could use the following structure: + +```js +const { AsyncResource } = require('async_hooks'); +const { EventEmitter } = require('events'); +const path = require('path'); +const { Worker } = require('worker_threads'); + +const kTaskInfo = Symbol('kTaskInfo'); +const kWorkerFreedEvent = Symbol('kWorkerFreedEvent'); + +class WorkerPoolTaskInfo extends AsyncResource { + constructor(callback) { + super('WorkerPoolTaskInfo'); + this.callback = callback; + } + + done(err, result) { + this.runInAsyncScope(this.callback, null, err, result); + this.emitDestroy(); // `TaskInfo`s are used only once. + } +} + +class WorkerPool extends EventEmitter { + constructor(numThreads) { + super(); + this.numThreads = numThreads; + this.workers = []; + this.freeWorkers = []; + + for (let i = 0; i < numThreads; i++) + this.addNewWorker(); + } + + addNewWorker() { + const worker = new Worker(path.resolve(__dirname, 'task_processor.js')); + worker.on('message', (result) => { + // In case of success: Call the callback that was passed to `runTask`, + // remove the `TaskInfo` associated with the Worker, and mark it as free + // again. + worker[kTaskInfo].done(null, result); + worker[kTaskInfo] = null; + this.freeWorkers.push(worker); + this.emit(kWorkerFreedEvent); + }); + worker.on('error', (err) => { + // In case of an uncaught exception: Call the callback that was passed to + // `runTask` with the error. + if (worker[kTaskInfo]) + worker[kTaskInfo].done(err, null); + else + this.emit('error', err); + // Remove the worker from the list and start a new Worker to replace the + // current one. + this.workers.splice(this.workers.indexOf(worker), 1); + this.addNewWorker(); + }); + this.workers.push(worker); + this.freeWorkers.push(worker); + } + + runTask(task, callback) { + if (this.freeWorkers.length === 0) { + // No free threads, wait until a worker thread becomes free. + this.once(kWorkerFreedEvent, () => this.runTask(task, callback)); + return; + } + + const worker = this.freeWorkers.pop(); + worker[kTaskInfo] = new WorkerPoolTaskInfo(callback); + worker.postMessage(task); + } + + close() { + for (const worker of this.workers) worker.terminate(); + } +} + +module.exports = WorkerPool; +``` + +Without the explicit tracking added by the `WorkerPoolTaskInfo` objects, +it would appear that the callbacks are associated with the individual `Worker` +objects. However, the creation of the `Worker`s is not associated with the +creation of the tasks and does not provide information about when tasks +were scheduled. + +This pool could be used as follows: + +```js +const WorkerPool = require('./worker_pool.js'); +const os = require('os'); + +const pool = new WorkerPool(os.cpus().length); + +let finished = 0; +for (let i = 0; i < 10; i++) { + pool.runTask({ a: 42, b: 100 }, (err, result) => { + console.log(i, err, result); + if (++finished === 10) + pool.close(); + }); +} +``` + [`after` callback]: #async_hooks_after_asyncid [`before` callback]: #async_hooks_before_asyncid [`destroy` callback]: #async_hooks_destroy_asyncid diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index 6812e6966dae66..0cd8b8717422a5 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -51,7 +51,9 @@ overhead of creating Workers would likely exceed their benefit. When implementing a worker pool, use the [`AsyncResource`][] API to inform diagnostic tools (e.g. in order to provide asynchronous stack traces) about the -correlation between tasks and their outcomes. +correlation between tasks and their outcomes. See +["Using `AsyncResource` for a `Worker` thread pool"][async-resource-worker-pool] +in the `async_hooks` documentation for an example implementation. ## `worker.isMainThread`