From f820ce6e501d542df6dad82f8fd5a833c2a6077c Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 1 Feb 2020 13:48:16 +0100 Subject: [PATCH] doc: add AsyncResource + Worker pool example Use Worker thread pools as an example of how `AsyncResource` can be used to track async state across callbacks. PR-URL: https://github.com/nodejs/node/pull/31601 Reviewed-By: Gireesh Punathil Reviewed-By: James M Snell Reviewed-By: Chengzhong Wu Reviewed-By: Rich Trott Reviewed-By: Anto Aravinth --- doc/api/async_hooks.md | 122 ++++++++++++++++++++++++++++++++++++++ doc/api/worker_threads.md | 5 +- 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/doc/api/async_hooks.md b/doc/api/async_hooks.md index edf92b08d572fd..66534ca85029e6 100644 --- a/doc/api/async_hooks.md +++ b/doc/api/async_hooks.md @@ -681,6 +681,128 @@ never be called. * Returns: {number} The same `triggerAsyncId` that is passed to the `AsyncResource` constructor. + +### Using `AsyncResource` for a `Worker` thread pool + +The following example shows how to use the `AsyncResource` class to properly +provide async tracking for a [`Worker`][] pool. Other resource pools, such as +database connection pools, can follow a similar model. + +Assuming that the task is adding two numbers, using a file named +`task_processor.js` with the following content: + +```js +const { parentPort } = require('worker_threads'); +parentPort.on('message', (task) => { + parentPort.postMessage(task.a + task.b); +}); +``` + +a Worker pool around it could use the following structure: + +```js +const { AsyncResource } = require('async_hooks'); +const { EventEmitter } = require('events'); +const path = require('path'); +const { Worker } = require('worker_threads'); + +const kTaskInfo = Symbol('kTaskInfo'); +const kWorkerFreedEvent = Symbol('kWorkerFreedEvent'); + +class WorkerPoolTaskInfo extends AsyncResource { + constructor(callback) { + super('WorkerPoolTaskInfo'); + this.callback = callback; + } + + done(err, result) { + this.runInAsyncScope(this.callback, null, err, result); + this.emitDestroy(); // `TaskInfo`s are used only once. + } +} + +class WorkerPool extends EventEmitter { + constructor(numThreads) { + super(); + this.numThreads = numThreads; + this.workers = []; + this.freeWorkers = []; + + for (let i = 0; i < numThreads; i++) + this.addNewWorker(); + } + + addNewWorker() { + const worker = new Worker(path.resolve(__dirname, 'task_processor.js')); + worker.on('message', (result) => { + // In case of success: Call the callback that was passed to `runTask`, + // remove the `TaskInfo` associated with the Worker, and mark it as free + // again. + worker[kTaskInfo].done(null, result); + worker[kTaskInfo] = null; + this.freeWorkers.push(worker); + this.emit(kWorkerFreedEvent); + }); + worker.on('error', (err) => { + // In case of an uncaught exception: Call the callback that was passed to + // `runTask` with the error. + if (worker[kTaskInfo]) + worker[kTaskInfo].done(err, null); + else + this.emit('error', err); + // Remove the worker from the list and start a new Worker to replace the + // current one. + this.workers.splice(this.workers.indexOf(worker), 1); + this.addNewWorker(); + }); + this.workers.push(worker); + this.freeWorkers.push(worker); + } + + runTask(task, callback) { + if (this.freeWorkers.length === 0) { + // No free threads, wait until a worker thread becomes free. + this.once(kWorkerFreedEvent, () => this.runTask(task, callback)); + return; + } + + const worker = this.freeWorkers.pop(); + worker[kTaskInfo] = new WorkerPoolTaskInfo(callback); + worker.postMessage(task); + } + + close() { + for (const worker of this.workers) worker.terminate(); + } +} + +module.exports = WorkerPool; +``` + +Without the explicit tracking added by the `WorkerPoolTaskInfo` objects, +it would appear that the callbacks are associated with the individual `Worker` +objects. However, the creation of the `Worker`s is not associated with the +creation of the tasks and does not provide information about when tasks +were scheduled. + +This pool could be used as follows: + +```js +const WorkerPool = require('./worker_pool.js'); +const os = require('os'); + +const pool = new WorkerPool(os.cpus().length); + +let finished = 0; +for (let i = 0; i < 10; i++) { + pool.runTask({ a: 42, b: 100 }, (err, result) => { + console.log(i, err, result); + if (++finished === 10) + pool.close(); + }); +} +``` + [`after` callback]: #async_hooks_after_asyncid [`before` callback]: #async_hooks_before_asyncid [`destroy` callback]: #async_hooks_destroy_asyncid diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index e867e161a89987..f9fb8cf4d62d9b 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -51,7 +51,9 @@ overhead of creating Workers would likely exceed their benefit. When implementing a worker pool, use the [`AsyncResource`][] API to inform diagnostic tools (e.g. in order to provide asynchronous stack traces) about the -correlation between tasks and their outcomes. +correlation between tasks and their outcomes. See +["Using `AsyncResource` for a `Worker` thread pool"][async-resource-worker-pool] +in the `async_hooks` documentation for an example implementation. Worker threads inherit non-process-specific options by default. Refer to [`Worker constructor options`][] to know how to customize worker thread options, @@ -759,6 +761,7 @@ active handle in the event system. If the worker is already `unref()`ed calling [`worker.terminate()`]: #worker_threads_worker_terminate [`worker.threadId`]: #worker_threads_worker_threadid_1 [Addons worker support]: addons.html#addons_worker_support +[async-resource-worker-pool]: async_hooks.html#async-resource-worker-pool [HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm [Signals events]: process.html#process_signal_events [Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API