Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

child_process: add callback parameter to .send() #2620

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions doc/api/child_process.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,15 @@ to a process.

See `kill(2)`

### child.send(message[, sendHandle])
### child.send(message[, sendHandle][, callback])

* `message` {Object}
* `sendHandle` {Handle object}
* `callback` {Function}
* Return: Boolean

When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
`child.send(message[, sendHandle][, callback])` and messages are received by
a `'message'` event on the child.

For example:
Expand All @@ -246,11 +248,6 @@ And then the child script, `'sub.js'` might look like this:
In the child the `process` object will have a `send()` method, and `process`
will emit objects each time it receives a message on its channel.

Please note that the `send()` method on both the parent and child are
synchronous - sending large chunks of data is not advised (pipes can be used
instead, see
[`child_process.spawn`](#child_process_child_process_spawn_command_args_options)).

There is a special case when sending a `{cmd: 'NODE_foo'}` message. All messages
containing a `NODE_` prefix in its `cmd` property will not be emitted in
the `message` event, since they are internal messages used by Node.js core.
Expand All @@ -261,8 +258,16 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its
second argument to the `message` event.

Emits an `'error'` event if the message cannot be sent, for example because
the child process has already exited.
The `callback` option is a function that is invoked after the message is
sent but before the target may have received it. It is called with a single
argument: `null` on success, or an `Error` object on failure.

`child.send()` emits an `'error'` event if no callback was given and the message
cannot be sent, for example because the child process has already exited.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This impacts the cluster API and process API, which surface .send().

I assume process will now have an error event which can be emitted if send fails, and node will crash if it isn't listened on?

For users of cluster, if they fail to listen on worker.process.on('error', ..., the master will crash? Or will you re-emit from the process object onto the worker object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no change in functionality - what's described here is the the current behavior, it just wasn't documented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought send() after disconnect resulted in an error being thrown, not an error emitted, currently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It emits an error event, but a synchronous one. So yes, if you don't have an error listener, it's effectively throwing an exception. That behavior is (for better or worse) maintained in this patch.


Returns `true` under normal circumstances or `false` when the backlog of
unsent messages exceeds a threshold that makes it unwise to send more.
Use the callback mechanism to implement flow control.

#### Example: sending server object

Expand Down
4 changes: 3 additions & 1 deletion doc/api/cluster.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,12 @@ exit, the master may choose not to respawn a worker based on this value.
// kill worker
worker.kill();

### worker.send(message[, sendHandle])
### worker.send(message[, sendHandle][, callback])

* `message` {Object}
* `sendHandle` {Handle object}
* `callback` {Function}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, if there is no callback, send errors will be emitted on worker, or worker.process?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's backwards compatible.

* Return: Boolean

Send a message to a worker or master, optionally with a handle.

Expand Down
10 changes: 3 additions & 7 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,12 @@ exports._forkChild = function(fd) {
var p = new Pipe(true);
p.open(fd);
p.unref();
setupChannel(process, p);

var refs = 0;
const control = setupChannel(process, p);
process.on('newListener', function(name) {
if (name !== 'message' && name !== 'disconnect') return;
if (++refs === 1) p.ref();
if (name === 'message' || name === 'disconnect') control.ref();
});
process.on('removeListener', function(name) {
if (name !== 'message' && name !== 'disconnect') return;
if (--refs === 0) p.unref();
if (name === 'message' || name === 'disconnect') control.unref();
});
};

Expand Down
91 changes: 72 additions & 19 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,25 @@ function setupChannel(target, channel) {
target._channel = channel;
target._handleQueue = null;

const control = new class extends EventEmitter {
constructor() {
super();
this.channel = channel;
this.refs = 0;
}
ref() {
if (++this.refs === 1) {
this.channel.ref();
}
}
unref() {
if (--this.refs === 0) {
this.channel.unref();
this.emit('unref');
}
}
};

var decoder = new StringDecoder('utf8');
var jsonBuffer = '';
channel.buffering = false;
Expand Down Expand Up @@ -446,7 +465,7 @@ function setupChannel(target, channel) {
target._handleQueue = null;

queue.forEach(function(args) {
target._send(args.message, args.handle, false);
target._send(args.message, args.handle, false, args.callback);
});

// Process a pending disconnect (if any).
Expand Down Expand Up @@ -478,14 +497,24 @@ function setupChannel(target, channel) {
});
});

target.send = function(message, handle) {
if (!this.connected)
this.emit('error', new Error('channel closed'));
else
this._send(message, handle, false);
target.send = function(message, handle, callback) {
if (typeof handle === 'function') {
callback = handle;
handle = undefined;
}
if (this.connected) {
this._send(message, handle, false, callback);
return;
}
const ex = new Error('channel closed');
if (typeof callback === 'function') {
process.nextTick(callback, ex);
} else {
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
}
};

target._send = function(message, handle, swallowErrors) {
target._send = function(message, handle, swallowErrors, callback) {
assert(this.connected || this._channel);

if (message === undefined)
Expand Down Expand Up @@ -516,7 +545,11 @@ function setupChannel(target, channel) {

// Queue-up message and handle if we haven't received ACK yet.
if (this._handleQueue) {
this._handleQueue.push({ message: message.msg, handle: handle });
this._handleQueue.push({
callback: callback,
handle: handle,
message: message.msg,
});
return;
}

Expand All @@ -538,24 +571,43 @@ function setupChannel(target, channel) {
} else if (this._handleQueue &&
!(message && message.cmd === 'NODE_HANDLE_ACK')) {
// Queue request anyway to avoid out-of-order messages.
this._handleQueue.push({ message: message, handle: null });
this._handleQueue.push({
callback: callback,
handle: null,
message: message,
});
return;
}

var req = new WriteWrap();
req.oncomplete = nop;
req.async = false;

var string = JSON.stringify(message) + '\n';
var err = channel.writeUtf8String(req, string, handle);

if (err) {
if (!swallowErrors)
this.emit('error', errnoException(err, 'write'));
} else if (handle && !this._handleQueue) {
this._handleQueue = [];
}

if (obj && obj.postSend) {
req.oncomplete = obj.postSend.bind(null, handle);
if (err === 0) {
if (handle && !this._handleQueue)
this._handleQueue = [];
req.oncomplete = function() {
if (this.async === true)
control.unref();
if (obj && obj.postSend)
obj.postSend(handle);
if (typeof callback === 'function')
callback(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm following the logic correctly, it's possible to call child.send() and have the callback called synchronously. If that is the case, I'd recommend changing this to process.nextTick(callback, null);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait. nm. Damn 2 space indentation and inner function declaration got me again. I missed that this is called from req.oncomplete().

};
if (req.async === true) {
control.ref();
} else {
process.nextTick(function() { req.oncomplete(); });
}
} else if (!swallowErrors) {
const ex = errnoException(err, 'write');
if (typeof callback === 'function') {
process.nextTick(callback, ex);
} else {
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
}
}

/* If the master is > 2 read() calls behind, please stop sending. */
Expand Down Expand Up @@ -616,6 +668,7 @@ function setupChannel(target, channel) {
};

channel.readStart();
return control;
}


Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-child-process-send-cb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const fork = require('child_process').fork;

if (process.argv[2] === 'child') {
process.send('ok', common.mustCall(function(err) {
assert.strictEqual(err, null);
}));
} else {
const child = fork(process.argv[1], ['child']);
child.on('message', common.mustCall(function(message) {
assert.strictEqual(message, 'ok');
}));
child.on('exit', common.mustCall(function(exitCode, signalCode) {
assert.strictEqual(exitCode, 0);
assert.strictEqual(signalCode, null);
}));
}