diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 88580893cdc015..669a86bb23faef 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -317,6 +317,7 @@ class Worker extends EventEmitter { [kOnExit](code) { debug(`[${threadId}] hears end event for Worker ${this.threadId}`); MessagePortPrototype.drain.call(this[kPublicPort]); + MessagePortPrototype.drain.call(this[kPort]); this[kDispose](); this.emit('exit', code); this.removeAllListeners(); diff --git a/test/parallel/test-worker-message-port-drain.js b/test/parallel/test-worker-message-port-drain.js new file mode 100644 index 00000000000000..4fcb3da25e3f14 --- /dev/null +++ b/test/parallel/test-worker-message-port-drain.js @@ -0,0 +1,41 @@ +// Flags: --experimental-worker +'use strict'; +require('../common'); + +// This test ensures that the messages from the internal +// message port are drained before the call to 'kDispose', +// and so all the stdio messages from the worker are processed +// in the parent and are pushed to their target streams. + +const assert = require('assert'); +const { + Worker, + isMainThread, + parentPort, + threadId, +} = require('worker_threads'); + +if (isMainThread) { + const workerIdsToOutput = new Map(); + + for (let i = 0; i < 2; i++) { + const worker = new Worker(__filename, { stdout: true }); + const workerOutput = []; + workerIdsToOutput.set(worker.threadId, workerOutput); + worker.on('message', console.log); + worker.stdout.on('data', (chunk) => { + workerOutput.push(chunk.toString().trim()); + }); + } + + process.on('exit', () => { + for (const [threadId, workerOutput] of workerIdsToOutput) { + assert.ok(workerOutput.includes(`1 threadId: ${threadId}`)); + assert.ok(workerOutput.includes(`2 threadId: ${threadId}`)); + } + }); +} else { + console.log(`1 threadId: ${threadId}`); + console.log(`2 threadId: ${threadId}`); + parentPort.postMessage(Array(100).fill(1)); +}