Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 17 additions & 53 deletions lib/internal/child_process/serialization.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,42 @@
'use strict';

const {
ArrayPrototypePush,
JSONParse,
JSONStringify,
StringPrototypeSplit,
Symbol,
TypedArrayPrototypeSubarray,
} = primordials;
const { Buffer } = require('buffer');
const { StringDecoder } = require('string_decoder');
const { serialize, deserialize } = internalBinding('ipc_serdes');
const { serialize, IPCChannelFramer } = internalBinding('ipc_serdes');
const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap');

const kMessageBuffer = Symbol('kMessageBuffer');
const kMessageBufferSize = Symbol('kMessageBufferSize');
const kFramer = Symbol('kFramer');
const kJSONBuffer = Symbol('kJSONBuffer');
const kStringDecoder = Symbol('kStringDecoder');

// Messages are parsed in either of the following formats:
// - Newline-delimited JSON, or
// - V8-serialized buffers, prefixed with their length as a big endian uint32
// (aka 'advanced')
// (aka 'advanced'), or
// - newline-delimited JSON.
//
// The 'advanced' read path is framed natively: IPCChannelFramer accumulates
// partial reads in C++ and hands back an array of complete, deserialized
// messages, so JavaScript never reframes the byte stream. The 'json' read path
// stays in JavaScript: its StringDecoder + split('\n') pipeline already avoids
// copies via V8 rope concatenation and O(1) substrings, and is measurably
// faster than reframing the bytes in C++.
const advanced = {
initMessageChannel(channel) {
channel[kMessageBuffer] = [];
channel[kMessageBufferSize] = 0;
channel[kFramer] = new IPCChannelFramer();
channel.buffering = false;
},

*parseChannelMessages(channel, readData) {
if (readData.length === 0) return;

if (channel[kMessageBufferSize] && channel[kMessageBuffer][0].length < 4) {
// Message length split into two buffers, so let's concatenate it.
channel[kMessageBuffer][0] = Buffer.concat([channel[kMessageBuffer][0], readData]);
} else {
ArrayPrototypePush(channel[kMessageBuffer], readData);
}
channel[kMessageBufferSize] += readData.length;

// Index 0 should always be present because we just pushed data into it.
let messageBufferHead = channel[kMessageBuffer][0];
while (messageBufferHead.length >= 4) {
// We call `readUInt32BE` manually here, because this is faster than first converting
// it to a buffer and using `readUInt32BE` on that.
const fullMessageSize = ((
messageBufferHead[0] << 24 |
messageBufferHead[1] << 16 |
messageBufferHead[2] << 8 |
messageBufferHead[3]
) >>> 0) + 4;

if (channel[kMessageBufferSize] < fullMessageSize) break;

const concatenatedBuffer = channel[kMessageBuffer].length === 1 ?
channel[kMessageBuffer][0] :
Buffer.concat(
channel[kMessageBuffer],
channel[kMessageBufferSize],
);

const serializedMessage =
TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize);

messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize);
channel[kMessageBufferSize] = messageBufferHead.length;
channel[kMessageBuffer] =
channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : [];

yield deserialize(serializedMessage);
}

channel.buffering = channel[kMessageBufferSize] > 0;
parseChannelMessages(channel, readData) {
if (readData.length === 0) return [];
const messages = channel[kFramer].read(readData);
channel.buffering = channel[kFramer].buffering();
return messages;
},

writeChannelMessage(channel, req, message, handle) {
Expand Down
208 changes: 193 additions & 15 deletions src/node_ipc_serdes.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "base_object-inl.h"
#include "env-inl.h"
#include "memory_tracker-inl.h"
#include "node_buffer.h"
#include "node_errors.h"
#include "node_external_reference.h"
Expand All @@ -7,6 +9,7 @@

#include <cstring>
#include <utility>
#include <vector>

// Native implementation of the `advanced` child_process IPC serialization
// codec previously implemented in lib/internal/child_process/serialization.js
Expand All @@ -19,6 +22,7 @@

namespace node {

using v8::Array;
using v8::ArrayBuffer;
using v8::ArrayBufferView;
using v8::BackingStore;
Expand All @@ -31,12 +35,14 @@ using v8::Float16Array;
using v8::Float32Array;
using v8::Float64Array;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Int16Array;
using v8::Int32Array;
using v8::Int8Array;
using v8::Isolate;
using v8::Just;
using v8::Local;
using v8::LocalVector;
using v8::Maybe;
using v8::MaybeLocal;
using v8::Name;
Expand Down Expand Up @@ -280,14 +286,17 @@ class IPCDeserializerDelegate : public ValueDeserializer::Delegate {
if (!deserializer_->ReadRawBytes(byte_length, &data)) return {};

const size_t bytes_per_element = BytesPerElement(type_index);
const size_t offset_in_ab = static_cast<const uint8_t*>(data) -
static_cast<const uint8_t*>(ab_->Data());

// Reuse the backing ArrayBuffer when the data is suitably aligned,
// otherwise copy into a fresh aligned buffer. Mirrors _readHostObject()
// in lib/v8.js.
if (offset_in_ab % bytes_per_element == 0) {
return MakeView(env_, type_index, ab_, offset_in_ab, byte_length);
// in lib/v8.js. A frame reassembled across reads has no standalone
// ArrayBuffer to borrow from (`ab_` is empty), so always copy.
if (!ab_.IsEmpty()) {
const size_t offset_in_ab = static_cast<const uint8_t*>(data) -
static_cast<const uint8_t*>(ab_->Data());
if (offset_in_ab % bytes_per_element == 0) {
return MakeView(env_, type_index, ab_, offset_in_ab, byte_length);
}
}
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(isolate, byte_length);
Expand Down Expand Up @@ -339,42 +348,211 @@ static void Serialize(const FunctionCallbackInfo<Value>& args) {
}
}

static void Deserialize(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
// Deserializes a single `advanced` payload: the V8 wire header followed by the
// value, without the 4-byte big-endian length prefix. When `ab` is non-empty
// it is the ArrayBuffer backing `data`, which the delegate may borrow for
// zero-copy ArrayBufferView host objects; when it is empty (a frame reassembled
// across reads) host objects are copied into fresh buffers.
static MaybeLocal<Value> DeserializeAdvancedPayload(Environment* env,
Local<ArrayBuffer> ab,
const uint8_t* data,
size_t length) {
Isolate* isolate = env->isolate();
Local<Context> context = env->context();

IPCDeserializerDelegate delegate(env, ab);
ValueDeserializer deserializer(isolate, data, length, &delegate);
delegate.set_deserializer(&deserializer);

bool read_header;
if (!deserializer.ReadHeader(context).To(&read_header)) return {};
return deserializer.ReadValue(context);
}

static inline uint32_t ReadUInt32BE(const char* p) {
const uint8_t* b = reinterpret_cast<const uint8_t*>(p);
return (static_cast<uint32_t>(b[0]) << 24) |
(static_cast<uint32_t>(b[1]) << 16) |
(static_cast<uint32_t>(b[2]) << 8) | static_cast<uint32_t>(b[3]);
}

static void Deserialize(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

CHECK(args[0]->IsArrayBufferView());
Local<ArrayBufferView> view = args[0].As<ArrayBufferView>();
Local<ArrayBuffer> ab = view->Buffer();
const size_t byte_offset = view->ByteOffset();
const size_t byte_length = view->ByteLength();
const uint8_t* data = static_cast<const uint8_t*>(ab->Data()) + byte_offset;

IPCDeserializerDelegate delegate(env, ab);
ValueDeserializer deserializer(isolate, data, byte_length, &delegate);
delegate.set_deserializer(&deserializer);

bool read_header;
if (!deserializer.ReadHeader(context).To(&read_header)) return;

Local<Value> value;
if (deserializer.ReadValue(context).ToLocal(&value)) {
if (DeserializeAdvancedPayload(env, ab, data, byte_length).ToLocal(&value)) {
args.GetReturnValue().Set(value);
}
}

// Per-channel native framer for the `advanced` codec. It turns the raw IPC
// byte stream into complete, deserialized messages, owning the cross-read
// accumulation buffer that used to live in serialization.js (kMessageBuffer),
// so JavaScript receives whole messages and never reframes partial reads or
// concatenates partial frames itself.
//
// The `json` codec is intentionally not framed here: its StringDecoder +
// split('\n') pipeline in serialization.js already avoids copies (V8 rope
// concatenation and O(1) substrings) and is faster than reassembling the bytes
// in C++.
class IPCChannelFramer : public BaseObject {
public:
IPCChannelFramer(Environment* env, Local<Object> wrap)
: BaseObject(env, wrap) {
MakeWeak();
}

static void New(const FunctionCallbackInfo<Value>& args);
static void Read(const FunctionCallbackInfo<Value>& args);
static void Buffering(const FunctionCallbackInfo<Value>& args);

void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("buffered", buffered_);
}
SET_MEMORY_INFO_NAME(IPCChannelFramer)
SET_SELF_SIZE(IPCChannelFramer)

private:
// Append complete deserialized messages found in `data` to `out`, buffering
// any trailing partial frame for the next read. Return false (with a pending
// exception) if deserialization fails.
bool ReadAdvanced(Local<ArrayBuffer> ab,
const uint8_t* data,
size_t length,
LocalVector<Value>* out);

std::vector<char> buffered_;
};

void IPCChannelFramer::New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (!args.IsConstructCall()) {
return THROW_ERR_CONSTRUCT_CALL_REQUIRED(
env,
"Class constructor IPCChannelFramer cannot be invoked without 'new'");
}
new IPCChannelFramer(env, args.This());
}

bool IPCChannelFramer::ReadAdvanced(Local<ArrayBuffer> ab,
const uint8_t* data,
size_t length,
LocalVector<Value>* out) {
Environment* env = this->env();
size_t pos = 0;

// Phase 1: finish a frame whose bytes started in a previous read. It is
// reassembled in `buffered_`, so it has no borrowable ArrayBuffer and its
// host objects are copied.
if (!buffered_.empty()) {
while (buffered_.size() < 4 && pos < length) {
buffered_.push_back(static_cast<char>(data[pos++]));
}
if (buffered_.size() < 4) return true; // still buffering the length header

const size_t frame_total =
static_cast<size_t>(ReadUInt32BE(buffered_.data())) + 4;
while (buffered_.size() < frame_total && pos < length) {
buffered_.push_back(static_cast<char>(data[pos++]));
}
if (buffered_.size() < frame_total) return true; // still buffering payload

const uint8_t* payload =
reinterpret_cast<const uint8_t*>(buffered_.data()) + 4;
Local<Value> message;
if (!DeserializeAdvancedPayload(
env, Local<ArrayBuffer>(), payload, frame_total - 4)
.ToLocal(&message)) {
return false;
}
out->push_back(message);
buffered_.clear();
}

// Phase 2: messages contained entirely within this chunk are deserialized in
// place, letting host objects borrow the chunk's ArrayBuffer (zero copy).
while (length - pos >= 4) {
const size_t frame_total = static_cast<size_t>(ReadUInt32BE(
reinterpret_cast<const char*>(data + pos))) +
4;
if (length - pos < frame_total) break; // incomplete trailing frame

Local<Value> message;
if (!DeserializeAdvancedPayload(env, ab, data + pos + 4, frame_total - 4)
.ToLocal(&message)) {
return false;
}
out->push_back(message);
pos += frame_total;
}

if (pos < length) {
buffered_.assign(reinterpret_cast<const char*>(data + pos),
reinterpret_cast<const char*>(data + length));
}
return true;
}

void IPCChannelFramer::Read(const FunctionCallbackInfo<Value>& args) {
IPCChannelFramer* framer;
ASSIGN_OR_RETURN_UNWRAP(&framer, args.This());
Isolate* isolate = framer->env()->isolate();

CHECK(args[0]->IsUint8Array());
Local<Uint8Array> chunk = args[0].As<Uint8Array>();
Local<ArrayBuffer> ab = chunk->Buffer();
const size_t offset = chunk->ByteOffset();
const size_t length = chunk->ByteLength();
const uint8_t* data = static_cast<const uint8_t*>(ab->Data()) + offset;

LocalVector<Value> messages(isolate);
if (!framer->ReadAdvanced(ab, data, length, &messages)) {
return; // a pending exception propagates to JavaScript
}

args.GetReturnValue().Set(
Array::New(isolate, messages.data(), messages.size()));
}

void IPCChannelFramer::Buffering(const FunctionCallbackInfo<Value>& args) {
IPCChannelFramer* framer;
ASSIGN_OR_RETURN_UNWRAP(&framer, args.This());
args.GetReturnValue().Set(!framer->buffered_.empty());
}

static void Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
Isolate* isolate = env->isolate();

SetMethod(context, target, "serialize", Serialize);
SetMethod(context, target, "deserialize", Deserialize);

Local<FunctionTemplate> framer =
NewFunctionTemplate(isolate, IPCChannelFramer::New);
framer->InstanceTemplate()->SetInternalFieldCount(
IPCChannelFramer::kInternalFieldCount);
SetProtoMethod(isolate, framer, "read", IPCChannelFramer::Read);
SetProtoMethod(isolate, framer, "buffering", IPCChannelFramer::Buffering);
framer->ReadOnlyPrototype();
SetConstructorFunction(context, target, "IPCChannelFramer", framer);
}

static void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Serialize);
registry->Register(Deserialize);
registry->Register(IPCChannelFramer::New);
registry->Register(IPCChannelFramer::Read);
registry->Register(IPCChannelFramer::Buffering);
}

} // namespace ipc_serdes
Expand Down
Loading
Loading