diff --git a/benchmark/cluster/echo.js b/benchmark/cluster/echo.js new file mode 100644 index 00000000000000..0733bdbd2077aa --- /dev/null +++ b/benchmark/cluster/echo.js @@ -0,0 +1,70 @@ +'use strict'; + +const cluster = require('cluster'); +if (cluster.isMaster) { + const common = require('../common.js'); + const bench = common.createBenchmark(main, { + workers: [1], + payload: ['string', 'object'], + sendsPerBroadcast: [1, 10], + n: [1e5] + }); + + function main(conf) { + var n = +conf.n; + var workers = +conf.workers; + var sends = +conf.sendsPerBroadcast; + var expectedPerBroadcast = sends * workers; + var payload; + var readies = 0; + var broadcasts = 0; + var msgCount = 0; + + switch (conf.payload) { + case 'string': + payload = 'hello world!'; + break; + case 'object': + payload = { action: 'pewpewpew', powerLevel: 9001 }; + break; + default: + throw new Error('Unsupported payload type'); + } + + for (var i = 0; i < workers; ++i) + cluster.fork().on('online', onOnline).on('message', onMessage); + + function onOnline(msg) { + if (++readies === workers) { + bench.start(); + broadcast(); + } + } + + function broadcast() { + var id; + if (broadcasts++ === n) { + bench.end(n); + for (id in cluster.workers) + cluster.workers[id].disconnect(); + return; + } + for (id in cluster.workers) { + const worker = cluster.workers[id]; + for (var i = 0; i < sends; ++i) + worker.send(payload); + } + } + + function onMessage(msg) { + if (++msgCount === expectedPerBroadcast) { + msgCount = 0; + broadcast(); + } + } + } +} else { + process.on('message', function(msg) { + process.send(msg); + }); +} diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index abd4b16d120dc3..25190277e2e076 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -456,16 +456,22 @@ function setupChannel(target, channel) { } chunks[0] = jsonBuffer + chunks[0]; + var nextTick = false; for (var i = 0; i < numCompleteChunks; i++) { var message = JSON.parse(chunks[i]); // There will be at most one NODE_HANDLE message in every chunk we // read because SCM_RIGHTS messages don't get coalesced. Make sure // that we deliver the handle with the right message however. - if (message && message.cmd === 'NODE_HANDLE') - handleMessage(target, message, recvHandle); - else - handleMessage(target, message, undefined); + if (isInternal(message)) { + if (message.cmd === 'NODE_HANDLE') + handleMessage(message, recvHandle, true, false); + else + handleMessage(message, undefined, true, false); + } else { + handleMessage(message, undefined, false, nextTick); + nextTick = true; + } } jsonBuffer = incompleteChunk; this.buffering = jsonBuffer.length !== 0; @@ -526,7 +532,7 @@ function setupChannel(target, channel) { // Convert handle object obj.got.call(this, message, handle, function(handle) { - handleMessage(target, message.msg, handle); + handleMessage(message.msg, handle, isInternal(message.msg), false); }); }); @@ -645,16 +651,15 @@ function setupChannel(target, channel) { obj.postSend(handle, options, target); } - req.oncomplete = function() { - if (this.async === true) + if (req.async) { + req.oncomplete = function() { control.unref(); - if (typeof callback === 'function') - callback(null); - }; - if (req.async === true) { + if (typeof callback === 'function') + callback(null); + }; control.ref(); - } else { - process.nextTick(function() { req.oncomplete(); }); + } else if (typeof callback === 'function') { + process.nextTick(callback, null); } } else { // Cleanup handle on error @@ -733,27 +738,32 @@ function setupChannel(target, channel) { process.nextTick(finish); }; + function emit(event, message, handle) { + target.emit(event, message, handle); + } + + function handleMessage(message, handle, internal, nextTick) { + if (!target.channel) + return; + + var eventName = (internal ? 'internalMessage' : 'message'); + if (nextTick) + process.nextTick(emit, eventName, message, handle); + else + target.emit(eventName, message, handle); + } + channel.readStart(); return control; } - const INTERNAL_PREFIX = 'NODE_'; -function handleMessage(target, message, handle) { - if (!target.channel) - return; - - var eventName = 'message'; - if (message !== null && - typeof message === 'object' && - typeof message.cmd === 'string' && - message.cmd.length > INTERNAL_PREFIX.length && - message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) { - eventName = 'internalMessage'; - } - process.nextTick(() => { - target.emit(eventName, message, handle); - }); +function isInternal(message) { + return (message !== null && + typeof message === 'object' && + typeof message.cmd === 'string' && + message.cmd.length > INTERNAL_PREFIX.length && + message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX); } function nop() { }