diff --git a/lib/cluster.js b/lib/cluster.js index bbb61889400bd4..c4c2c0d9ac4c93 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -217,7 +217,7 @@ function masterInit() { // Keyed on address:port:etc. When a worker dies, we walk over the handles // and remove() the worker from each one. remove() may do a linear scan // itself so we might end up with an O(n*m) operation. Ergo, FIXME. - var handles = {}; + const handles = require('internal/cluster').handles; var initialized = false; cluster.setupMaster = function(options) { @@ -308,6 +308,26 @@ function masterInit() { var ids = 0; + function removeWorker(worker) { + assert(worker); + + delete cluster.workers[worker.id]; + + if (Object.keys(cluster.workers).length === 0) { + assert(Object.keys(handles).length === 0, 'Resource leak detected.'); + intercom.emit('disconnect'); + } + } + + function removeHandlesForWorker(worker) { + assert(worker); + + for (var key in handles) { + var handle = handles[key]; + if (handle.remove(worker)) delete handles[key]; + } + } + cluster.fork = function(env) { cluster.setupMaster(); const id = ++ids; @@ -319,26 +339,6 @@ function masterInit() { worker.on('message', this.emit.bind(this, 'message')); - function removeWorker(worker) { - assert(worker); - - delete cluster.workers[worker.id]; - - if (Object.keys(cluster.workers).length === 0) { - assert(Object.keys(handles).length === 0, 'Resource leak detected.'); - intercom.emit('disconnect'); - } - } - - function removeHandlesForWorker(worker) { - assert(worker); - - for (var key in handles) { - var handle = handles[key]; - if (handle.remove(worker)) delete handles[key]; - } - } - worker.process.once('exit', function(exitCode, signalCode) { /* * Remove the worker from the workers list only @@ -404,6 +404,8 @@ function masterInit() { Worker.prototype.disconnect = function() { this.suicide = true; send(this, { act: 'disconnect' }); + removeHandlesForWorker(this); + removeWorker(this); }; Worker.prototype.destroy = function(signo) { @@ -490,11 +492,12 @@ function masterInit() { cluster.emit('listening', worker, info); } - // Server in worker is closing, remove from list. + // Server in worker is closing, remove from list. The handle may have been + // removed by a prior call to removeHandlesForWorker() so guard against that. function close(worker, message) { var key = message.key; var handle = handles[key]; - if (handle.remove(worker)) delete handles[key]; + if (handle && handle.remove(worker)) delete handles[key]; } function send(worker, message, handle, cb) { diff --git a/lib/internal/cluster.js b/lib/internal/cluster.js new file mode 100644 index 00000000000000..8380ea7482c670 --- /dev/null +++ b/lib/internal/cluster.js @@ -0,0 +1,4 @@ +'use strict'; + +// Used in tests. +exports.handles = {}; diff --git a/node.gyp b/node.gyp index 386074be796d8a..a587edc926945e 100644 --- a/node.gyp +++ b/node.gyp @@ -69,6 +69,7 @@ 'lib/vm.js', 'lib/zlib.js', 'lib/internal/child_process.js', + 'lib/internal/cluster.js', 'lib/internal/freelist.js', 'lib/internal/module.js', 'lib/internal/socket_list.js', diff --git a/test/parallel/test-cluster-disconnect-handles.js b/test/parallel/test-cluster-disconnect-handles.js new file mode 100644 index 00000000000000..5fa98844537eb8 --- /dev/null +++ b/test/parallel/test-cluster-disconnect-handles.js @@ -0,0 +1,65 @@ +/* eslint-disable no-debugger */ +// Flags: --expose_internals +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const cluster = require('cluster'); +const net = require('net'); + +const Protocol = require('_debugger').Protocol; + +if (common.isWindows) { + console.log('1..0 # Skipped: SCHED_RR not reliable on Windows'); + return; +} + +cluster.schedulingPolicy = cluster.SCHED_RR; + +// Worker sends back a "I'm here" message, then immediately suspends +// inside the debugger. The master connects to the debug agent first, +// connects to the TCP server second, then disconnects the worker and +// unsuspends it again. The ultimate goal of this tortured exercise +// is to make sure the connection is still sitting in the master's +// pending handle queue. +if (cluster.isMaster) { + const handles = require('internal/cluster').handles; + // FIXME(bnoordhuis) lib/cluster.js scans the execArgv arguments for + // debugger flags and renumbers any port numbers it sees starting + // from the default port 5858. Add a '.' that circumvents the + // scanner but is ignored by atoi(3). Heinous hack. + cluster.setupMaster({ execArgv: [`--debug=${common.PORT}.`] }); + const worker = cluster.fork(); + worker.on('message', common.mustCall(message => { + assert.strictEqual(Array.isArray(message), true); + assert.strictEqual(message[0], 'listening'); + const address = message[1]; + const host = address.address; + const debugClient = net.connect({ host, port: common.PORT }); + const protocol = new Protocol(); + debugClient.setEncoding('utf8'); + debugClient.on('data', data => protocol.execute(data)); + debugClient.once('connect', common.mustCall(() => { + protocol.onResponse = common.mustCall(res => { + protocol.onResponse = () => {}; + const conn = net.connect({ host, port: address.port }); + conn.once('connect', common.mustCall(() => { + conn.destroy(); + assert.notDeepStrictEqual(handles, {}); + worker.disconnect(); + assert.deepStrictEqual(handles, {}); + const req = protocol.serialize({ command: 'continue' }); + debugClient.write(req); + })); + }); + })); + })); + process.on('exit', () => assert.deepStrictEqual(handles, {})); +} else { + const server = net.createServer(socket => socket.pipe(socket)); + server.listen(() => { + process.send(['listening', server.address()]); + debugger; + }); + process.on('disconnect', process.exit); +}