Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
617ee32
src: fix memory leak in ExternString
skomski Aug 16, 1970
56d9584
child_process: add callback parameter to .send()
bnoordhuis Aug 30, 2015
abbc8db
test: mark eval_messages as flaky
orangemocha Sep 2, 2015
6ce8f5f
doc: reorder collaborators by their usernames
jbergstroem Jul 25, 2015
47e5cf7
doc: update url doc to account for escaping
Fishrock123 Aug 28, 2015
8ca9ea2
test: refactor to eliminate flaky test
Trott Aug 29, 2015
107cbd6
child_process: check execFile and fork args
jasnell Sep 2, 2015
4a1b519
build: add --enable-asan with builtin leakcheck
skomski Aug 14, 2015
b513a33
events,lib: don't require EE#listenerCount()
Fishrock123 Sep 2, 2015
1134188
deps: upgrade V8 to 4.5.103.24
ofrobots Aug 23, 2015
a7392ff
src: apply debug force load fixups from 41e63fb
ofrobots Aug 23, 2015
709ed15
contextify: ignore getters during initialization
indutny Jul 7, 2015
06f38de
test: fix test-repl-tab-complete.js for V8 4.5
ofrobots Aug 23, 2015
39aa573
src: enable v8 deprecation warnings and fix them
bnoordhuis Jul 1, 2015
d08bb97
src: replace usage of v8::Handle with v8::Local
targos Jul 18, 2015
564e214
src: enable vector ics on arm again
ofrobots Aug 23, 2015
074315f
src: re-enable fast math on arm
targos Aug 28, 2015
64beab0
deps: upgrade V8 to 4.5.103.30
ofrobots Sep 1, 2015
fc66eed
test: fix use of `common` before required
rvagg Sep 4, 2015
eefe14c
buffer: SlowBuffer only accept valid numeric values
targos Sep 1, 2015
a338eb3
doc,test: enable recursive file watching in Windows
thefourtheye Sep 2, 2015
80bcab9
src: fix buffer overflow for long exception lines
skomski Sep 3, 2015
3a731da
src: use standard conform snprintf on windows
skomski Sep 3, 2015
a6cb3a5
doc: update environment vars in manpage and --help
silverwind Sep 4, 2015
880410d
doc: add TSC meeting minutes 2015-09-02
rvagg Sep 3, 2015
63a628d
build: fix .pkg creation tooling
rvagg Sep 4, 2015
d0c682a
deps: backport 75e43a6 from v8 upstream (again)
Aug 11, 2015
ffee40c
http: add STATUS_CODES to support Azure, CloudFlare
JungMinu Sep 5, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
child_process: add callback parameter to .send()
Add an optional callback parameter to `ChildProcess.prototype.send()`
that is invoked when the message has been sent.

Juggle the control channel's reference count so that in-flight messages
keep the event loop (and therefore the process) alive until they have
been sent.

`ChildProcess.prototype.send()` and `process.send()` used to operate
synchronously but became asynchronous in commit libuv/libuv@393c1c5
("unix: set non-block mode in uv_{pipe,tcp,udp}_open"), which landed
in io.js in commit 07bd05b ("deps: update libuv to 1.2.1").

Fixes: #760
PR-URL: #2620
Reviewed-By: trevnorris - Trevor Norris <trev.norris@gmail.com>
Reviewed-By: jasnell - James M Snell <jasnell@gmail.com>
  • Loading branch information
bnoordhuis authored and trevnorris committed Sep 2, 2015
commit 56d9584a0ead78874ca9d4de2e55b41c4056e502
23 changes: 14 additions & 9 deletions doc/api/child_process.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,15 @@ to a process.

See `kill(2)`

### child.send(message[, sendHandle])
### child.send(message[, sendHandle][, callback])

* `message` {Object}
* `sendHandle` {Handle object}
* `callback` {Function}
* Return: Boolean

When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
`child.send(message[, sendHandle][, callback])` and messages are received by
a `'message'` event on the child.

For example:
Expand All @@ -246,11 +248,6 @@ And then the child script, `'sub.js'` might look like this:
In the child the `process` object will have a `send()` method, and `process`
will emit objects each time it receives a message on its channel.

Please note that the `send()` method on both the parent and child are
synchronous - sending large chunks of data is not advised (pipes can be used
instead, see
[`child_process.spawn`](#child_process_child_process_spawn_command_args_options)).

There is a special case when sending a `{cmd: 'NODE_foo'}` message. All messages
containing a `NODE_` prefix in its `cmd` property will not be emitted in
the `message` event, since they are internal messages used by Node.js core.
Expand All @@ -261,8 +258,16 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its
second argument to the `message` event.

Emits an `'error'` event if the message cannot be sent, for example because
the child process has already exited.
The `callback` option is a function that is invoked after the message is
sent but before the target may have received it. It is called with a single
argument: `null` on success, or an `Error` object on failure.

`child.send()` emits an `'error'` event if no callback was given and the message
cannot be sent, for example because the child process has already exited.

Returns `true` under normal circumstances or `false` when the backlog of
unsent messages exceeds a threshold that makes it unwise to send more.
Use the callback mechanism to implement flow control.

#### Example: sending server object

Expand Down
4 changes: 3 additions & 1 deletion doc/api/cluster.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,12 @@ exit, the master may choose not to respawn a worker based on this value.
// kill worker
worker.kill();

### worker.send(message[, sendHandle])
### worker.send(message[, sendHandle][, callback])

* `message` {Object}
* `sendHandle` {Handle object}
* `callback` {Function}
* Return: Boolean

Send a message to a worker or master, optionally with a handle.

Expand Down
10 changes: 3 additions & 7 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,12 @@ exports._forkChild = function(fd) {
var p = new Pipe(true);
p.open(fd);
p.unref();
setupChannel(process, p);

var refs = 0;
const control = setupChannel(process, p);
process.on('newListener', function(name) {
if (name !== 'message' && name !== 'disconnect') return;
if (++refs === 1) p.ref();
if (name === 'message' || name === 'disconnect') control.ref();
});
process.on('removeListener', function(name) {
if (name !== 'message' && name !== 'disconnect') return;
if (--refs === 0) p.unref();
if (name === 'message' || name === 'disconnect') control.unref();
});
};

Expand Down
91 changes: 72 additions & 19 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,25 @@ function setupChannel(target, channel) {
target._channel = channel;
target._handleQueue = null;

const control = new class extends EventEmitter {
constructor() {
super();
this.channel = channel;
this.refs = 0;
}
ref() {
if (++this.refs === 1) {
this.channel.ref();
}
}
unref() {
if (--this.refs === 0) {
this.channel.unref();
this.emit('unref');
}
}
};

var decoder = new StringDecoder('utf8');
var jsonBuffer = '';
channel.buffering = false;
Expand Down Expand Up @@ -446,7 +465,7 @@ function setupChannel(target, channel) {
target._handleQueue = null;

queue.forEach(function(args) {
target._send(args.message, args.handle, false);
target._send(args.message, args.handle, false, args.callback);
});

// Process a pending disconnect (if any).
Expand Down Expand Up @@ -478,14 +497,24 @@ function setupChannel(target, channel) {
});
});

target.send = function(message, handle) {
if (!this.connected)
this.emit('error', new Error('channel closed'));
else
this._send(message, handle, false);
target.send = function(message, handle, callback) {
if (typeof handle === 'function') {
callback = handle;
handle = undefined;
}
if (this.connected) {
this._send(message, handle, false, callback);
return;
}
const ex = new Error('channel closed');
if (typeof callback === 'function') {
process.nextTick(callback, ex);
} else {
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
}
};

target._send = function(message, handle, swallowErrors) {
target._send = function(message, handle, swallowErrors, callback) {
assert(this.connected || this._channel);

if (message === undefined)
Expand Down Expand Up @@ -516,7 +545,11 @@ function setupChannel(target, channel) {

// Queue-up message and handle if we haven't received ACK yet.
if (this._handleQueue) {
this._handleQueue.push({ message: message.msg, handle: handle });
this._handleQueue.push({
callback: callback,
handle: handle,
message: message.msg,
});
return;
}

Expand All @@ -538,24 +571,43 @@ function setupChannel(target, channel) {
} else if (this._handleQueue &&
!(message && message.cmd === 'NODE_HANDLE_ACK')) {
// Queue request anyway to avoid out-of-order messages.
this._handleQueue.push({ message: message, handle: null });
this._handleQueue.push({
callback: callback,
handle: null,
message: message,
});
return;
}

var req = new WriteWrap();
req.oncomplete = nop;
req.async = false;

var string = JSON.stringify(message) + '\n';
var err = channel.writeUtf8String(req, string, handle);

if (err) {
if (!swallowErrors)
this.emit('error', errnoException(err, 'write'));
} else if (handle && !this._handleQueue) {
this._handleQueue = [];
}

if (obj && obj.postSend) {
req.oncomplete = obj.postSend.bind(null, handle);
if (err === 0) {
if (handle && !this._handleQueue)
this._handleQueue = [];
req.oncomplete = function() {
if (this.async === true)
control.unref();
if (obj && obj.postSend)
obj.postSend(handle);
if (typeof callback === 'function')
callback(null);
};
if (req.async === true) {
control.ref();
} else {
process.nextTick(function() { req.oncomplete(); });
}
} else if (!swallowErrors) {
const ex = errnoException(err, 'write');
if (typeof callback === 'function') {
process.nextTick(callback, ex);
} else {
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
}
}

/* If the master is > 2 read() calls behind, please stop sending. */
Expand Down Expand Up @@ -616,6 +668,7 @@ function setupChannel(target, channel) {
};

channel.readStart();
return control;
}


Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-child-process-send-cb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const fork = require('child_process').fork;

if (process.argv[2] === 'child') {
process.send('ok', common.mustCall(function(err) {
assert.strictEqual(err, null);
}));
} else {
const child = fork(process.argv[1], ['child']);
child.on('message', common.mustCall(function(message) {
assert.strictEqual(message, 'ok');
}));
child.on('exit', common.mustCall(function(exitCode, signalCode) {
assert.strictEqual(exitCode, 0);
assert.strictEqual(signalCode, null);
}));
}