child_process: serialize advanced IPC messages natively#63933
Conversation
|
Review requested:
|
1984b05 to
f5b85a2
Compare
mcollina
left a comment
There was a problem hiding this comment.
why canβt we use V8 serdes?
There was a problem hiding this comment.
There is no observable behavior change and the IPC wire format is unchanged.
There are observable changes tho:
repro.js
// repro.js
// Run: node repro.js
'use strict';
const { fork } = require('node:child_process');
const fs = require('node:fs');
const v8 = require('node:v8');
const { MessageChannel } = require('node:worker_threads');
if (process.argv[2] === 'inspect') {
process.on('message', (value) => {
process.send({
isBuffer: Buffer.isBuffer(value),
constructorName: value?.constructor?.name,
keys: Object.keys(value),
visible: value?.visible,
bytes: ArrayBuffer.isView(value) ?
Buffer.from(value.buffer, value.byteOffset, value.byteLength).toString('hex') :
undefined,
});
});
} else if (process.argv[2] === 'bad-tag') {
class BadTagSerializer extends v8.DefaultSerializer {
_writeHostObject(value) {
this.writeUint32(2); // old child_process codec only accepts 0 or 1
return super._writeHostObject(value);
}
}
const ser = new BadTagSerializer();
ser.writeHeader();
ser.writeValue(Buffer.from('x'));
const payload = ser.releaseBuffer();
const framed = Buffer.allocUnsafe(4 + payload.length);
framed.writeUInt32BE(payload.length, 0);
payload.copy(framed, 4);
fs.writeSync(process.channel.fd, framed);
} else {
main();
}
function inspect(value) {
return new Promise((resolve) => {
const child = fork(__filename, ['inspect'], {
serialization: 'advanced',
stdio: ['ignore', 'ignore', 'inherit', 'ipc'],
});
child.once('message', (message) => {
child.disconnect();
resolve({ ok: true, message });
});
child.once('exit', (code, signal) => {
resolve({ ok: false, code, signal });
});
try {
child.send(value);
} catch (err) {
child.disconnect();
resolve({ ok: false, threw: err.message });
}
});
}
function sendBadTag() {
return new Promise((resolve) => {
const child = fork(__filename, ['bad-tag'], {
serialization: 'advanced',
stdio: ['ignore', 'ignore', 'inherit', 'ipc'],
});
function onUncaughtException(err) {
process.removeListener('uncaughtException', onUncaughtException);
resolve({ rejected: true, error: err.code || err.message });
}
process.once('uncaughtException', onUncaughtException);
child.once('message', (value) => {
process.removeListener('uncaughtException', onUncaughtException);
resolve({
accepted: true,
isBuffer: Buffer.isBuffer(value),
value: Buffer.isBuffer(value) ? value.toString() : value,
});
});
});
}
function check(name, passed, details) {
console.log(`${passed ? 'PASS' : 'FAIL'}: ${name}`);
console.log(details);
console.log();
if (!passed) process.exitCode = 1;
}
async function main() {
console.log('Expected on release/current Node: all PASS');
console.log('Regression on PR build: one or more FAIL\n');
const { port1, port2 } = new MessageChannel();
port1.visible = 1;
Object.defineProperty(Object.prototype, 'visible', {
configurable: true,
set() { throw new Error('setter called'); },
});
const setter = await inspect(port1);
delete Object.prototype.visible;
port1.close();
port2.close();
check(
'host-object spread must not invoke inherited setters',
setter.ok && setter.message.visible === 1 && setter.message.keys[0] === 'visible',
setter.threw ? `regression: child.send() threw "${setter.threw}"` :
`result: ${JSON.stringify(setter)}`,
);
const buf = Buffer.from('abc');
buf.constructor = Uint8Array;
const bufResult = await inspect(buf);
check(
'Buffer with constructor = Uint8Array keeps old classification',
bufResult.ok && bufResult.message.isBuffer === false,
`expected Buffer.isBuffer(received) === false\nresult: ${JSON.stringify(bufResult)}`,
);
const uint8 = new Uint8Array([1, 2, 3]);
uint8.constructor = Buffer;
const uint8Result = await inspect(uint8);
check(
'Uint8Array with constructor = Buffer keeps old classification',
uint8Result.ok && uint8Result.message.isBuffer === true,
`expected Buffer.isBuffer(received) === true\nresult: ${JSON.stringify(uint8Result)}`,
);
const badTag = await sendBadTag();
check(
'invalid child_process host-object tag is rejected',
badTag.rejected === true,
badTag.rejected ? `rejected with: ${badTag.error}` :
`regression: malformed tag accepted: ${JSON.stringify(badTag)}`,
);
}The `advanced` IPC serialization codec was implemented in JavaScript (ChildProcessSerializer / ChildProcessDeserializer in lib/internal/child_process/serialization.js). It allocated a wrapper serializer/deserializer per message and crossed the JS/C++ boundary several times for every message (writeHeader, writeValue, releaseBuffer, readHeader, readValue and friends). Move the codec into a native `ipc_serdes` binding that drives the V8 ValueSerializer/ValueDeserializer with a C++ delegate. The wire format is preserved byte-for-byte: a big-endian uint32 length prefix followed by the V8 payload, with ArrayBufferViews tagged as host objects so that Node Buffers round-trip as Buffers rather than plain Uint8Arrays. The JSON codec is left unchanged. A cctest (test/cctest/test_node_ipc_serdes.cc) exercises the binding directly, covering round-trips of primitives, objects, typed arrays and Buffers (including the Buffer-vs-Uint8Array distinction) and asserting the big-endian length-prefix framing. Round-trip throughput (benchmark/child_process/child-process-ipc-roundtrip): payload before after change 64 B ~300k/s ~800k/s +166% 1 KiB ~272k/s ~616k/s +126% 16 KiB ~91k/s ~120k/s +32% 64 KiB ~30k/s ~35k/s +16% The gain is largest for small messages, where per-message JavaScript overhead dominated, and tapers for large messages, where the actual serialization (already native) dominates. Signed-off-by: Yagiz Nizipli <yagiz@nizipli.com>
f5b85a2 to
ef73d8f
Compare
|
@panva thanks for the thorough repro β you're right, those were real regressions, and they're fixed in ef73d8f:
I added |
|
@mcollina it does use V8 serdes β the binding wraps |
The
advancedchild_process IPC serialization codec was implemented inJavaScript (
ChildProcessSerializer/ChildProcessDeserializerinlib/internal/child_process/serialization.js). It allocated a wrapperserializer/deserializer per message and crossed the JS/C++ boundary several
times for every message (
writeHeader,writeValue,releaseBuffer,readHeader,readValue, β¦).This moves the codec into a native
ipc_serdesbinding that drives the V8ValueSerializer/ValueDeserializerwith a C++ delegate. The wire format ispreserved byte-for-byte: a big-endian uint32 length prefix followed by the
V8 payload, with
ArrayBufferViews tagged as host objects so that NodeBuffers round-trip asBuffers rather than plainUint8Arrays. Thejsoncodec is intentionally left unchanged (its hot path,
JSON.stringify/parse,is already native).
Performance
Measured A/B on identical built arm64 release binaries (only this change
differs),
benchmark/child_process/child-process-ipc-roundtrip.js,round-trips/sec, average of 3 runs:
jsonmode is unchanged within noise (~543k β ~554k at 1 KiB).The gain is largest for small messages, where the fixed per-message JavaScript
overhead (per-message serializer/deserializer allocation and the JS/C++
boundary crossings) dominated. It tapers for large messages, where the actual
serialization β already native in both versions β dominates. These are
codec/IPC-throughput numbers from a saturated round-trip; a real
fork()workload also pays for pipe I/O, the event loop and the user
messagehandler,so application-level gains will be smaller.
Verification
test/parallel/test-child-process-*andtest/parallel/test-cluster-*(190+ tests) pass, including
advanced-serialization,-largebuffer,-splitted-length-fieldandfork-advanced-header-serialization.test/cctest/test_node_ipc_serdes.cccovers serialize/deserializeround-trips for primitives, objects, typed arrays and Buffers (including the
Buffer-vs-Uint8Array distinction) and asserts the length-prefix framing; the
full cctest suite passes.
cpplint,git-clang-format,eslintandtsc --strict(typings) are clean.There is no observable behavior change and the IPC wire format is unchanged.