Skip to content

Commit caa10c9

Browse files
committed
Simplify the Protocol implementation
1 parent c4bed46 commit caa10c9

2 files changed

Lines changed: 86 additions & 84 deletions

File tree

src/vs/base/parts/ipc/node/ipc.net.ts

Lines changed: 86 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import { join } from 'vs/base/common/path';
1010
import { tmpdir } from 'os';
1111
import { generateUuid } from 'vs/base/common/uuid';
1212
import { IDisposable } from 'vs/base/common/lifecycle';
13-
import { TimeoutTimer } from 'vs/base/common/async';
1413

1514
export function generateRandomPipeName(): string {
1615
const randomSuffix = generateUuid();
@@ -22,6 +21,80 @@ export function generateRandomPipeName(): string {
2221
}
2322
}
2423

24+
class ChunkStream {
25+
26+
private _chunks: Buffer[];
27+
private _totalLength: number;
28+
29+
public get byteLength() {
30+
return this._totalLength;
31+
}
32+
33+
constructor() {
34+
this._chunks = [];
35+
this._totalLength = 0;
36+
}
37+
38+
public acceptChunk(buff: Buffer) {
39+
this._chunks.push(buff);
40+
this._totalLength += buff.byteLength;
41+
}
42+
43+
public readUInt32BE(): number {
44+
let tmp = this.read(4);
45+
return tmp.readUInt32BE(0);
46+
}
47+
48+
public read(byteCount: number): Buffer {
49+
if (byteCount === 0) {
50+
return Buffer.allocUnsafe(0);
51+
}
52+
53+
if (byteCount > this._totalLength) {
54+
throw new Error(`Cannot read so many bytes!`);
55+
}
56+
57+
if (this._chunks[0].byteLength === byteCount) {
58+
// super fast path, precisely first chunk must be returned
59+
const result = this._chunks.shift()!;
60+
this._totalLength -= byteCount;
61+
return result;
62+
}
63+
64+
if (this._chunks[0].byteLength > byteCount) {
65+
// fast path, the reading is entirely within the first chunk
66+
const result = this._chunks[0].slice(0, byteCount);
67+
this._chunks[0] = this._chunks[0].slice(byteCount);
68+
this._totalLength -= byteCount;
69+
return result;
70+
}
71+
72+
let result = Buffer.allocUnsafe(byteCount);
73+
let resultOffset = 0;
74+
while (byteCount > 0) {
75+
const chunk = this._chunks[0];
76+
if (chunk.byteLength > byteCount) {
77+
// this chunk will survive
78+
this._chunks[0] = chunk.slice(byteCount);
79+
80+
chunk.copy(result, resultOffset, 0, byteCount);
81+
resultOffset += byteCount;
82+
this._totalLength -= byteCount;
83+
byteCount -= byteCount;
84+
} else {
85+
// this chunk will be entirely read
86+
this._chunks.shift();
87+
88+
chunk.copy(result, resultOffset, 0, chunk.byteLength);
89+
resultOffset += chunk.byteLength;
90+
this._totalLength -= chunk.byteLength;
91+
byteCount -= chunk.byteLength;
92+
}
93+
}
94+
return result;
95+
}
96+
}
97+
2598
/**
2699
* A message has the following format:
27100
*
@@ -35,9 +108,8 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
35108
private static readonly _headerLen = 4;
36109

37110
private _isDisposed: boolean;
38-
private _chunks: Buffer[];
111+
private _incomingData: ChunkStream;
39112

40-
private _firstChunkTimer: TimeoutTimer;
41113
private _socketDataListener: (data: Buffer) => void;
42114
private _socketEndListener: () => void;
43115
private _socketCloseListener: () => void;
@@ -48,11 +120,9 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
48120
private _onClose = new Emitter<void>();
49121
readonly onClose: Event<void> = this._onClose.event;
50122

51-
constructor(private _socket: Socket, firstDataChunk?: Buffer) {
123+
constructor(private _socket: Socket) {
52124
this._isDisposed = false;
53-
this._chunks = [];
54-
55-
let totalLength = 0;
125+
this._incomingData = new ChunkStream();
56126

57127
const state = {
58128
readHead: true,
@@ -61,24 +131,15 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
61131

62132
const acceptChunk = (data: Buffer) => {
63133

64-
this._chunks.push(data);
65-
totalLength += data.length;
134+
this._incomingData.acceptChunk(data);
66135

67-
while (totalLength > 0) {
136+
while (this._incomingData.byteLength > 0) {
68137

69138
if (state.readHead) {
70-
// expecting header -> read 5bytes for header
71-
// information: `bodyIsJson` and `bodyLen`
72-
if (totalLength >= Protocol._headerLen) {
73-
const all = Buffer.concat(this._chunks);
74-
75-
state.bodyLen = all.readUInt32BE(0);
139+
// expecting header -> read header
140+
if (this._incomingData.byteLength >= Protocol._headerLen) {
141+
state.bodyLen = this._incomingData.readUInt32BE();
76142
state.readHead = false;
77-
78-
const rest = all.slice(Protocol._headerLen);
79-
totalLength = rest.length;
80-
this._chunks = [rest];
81-
82143
} else {
83144
break;
84145
}
@@ -87,15 +148,8 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
87148
if (!state.readHead) {
88149
// expecting body -> read bodyLen-bytes for
89150
// the actual message or wait for more data
90-
if (totalLength >= state.bodyLen) {
91-
92-
const all = Buffer.concat(this._chunks);
93-
const buffer = all.slice(0, state.bodyLen);
94-
95-
// ensure the getBuffer returns a valid value if invoked from the event listeners
96-
const rest = all.slice(state.bodyLen);
97-
totalLength = rest.length;
98-
this._chunks = [rest];
151+
if (this._incomingData.byteLength >= state.bodyLen) {
152+
const buffer = this._incomingData.read(state.bodyLen);
99153

100154
state.bodyLen = -1;
101155
state.readHead = true;
@@ -113,28 +167,12 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
113167
}
114168
};
115169

116-
const acceptFirstDataChunk = () => {
117-
if (firstDataChunk && firstDataChunk.length > 0) {
118-
let tmp = firstDataChunk;
119-
firstDataChunk = undefined;
120-
acceptChunk(tmp);
121-
}
122-
};
123-
124-
// Make sure to always handle the firstDataChunk if no more `data` event comes in
125-
this._firstChunkTimer = new TimeoutTimer();
126-
this._firstChunkTimer.setIfNotSet(() => {
127-
acceptFirstDataChunk();
128-
}, 0);
129-
130170
this._socketDataListener = (data: Buffer) => {
131-
acceptFirstDataChunk();
132171
acceptChunk(data);
133172
};
134173
_socket.on('data', this._socketDataListener);
135174

136175
this._socketEndListener = () => {
137-
acceptFirstDataChunk();
138176
};
139177
_socket.on('end', this._socketEndListener);
140178

@@ -146,7 +184,6 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
146184

147185
dispose(): void {
148186
this._isDisposed = true;
149-
this._firstChunkTimer.dispose();
150187
this._socket.removeListener('data', this._socketDataListener);
151188
this._socket.removeListener('end', this._socketEndListener);
152189
this._socket.removeListener('close', this._socketCloseListener);
@@ -156,8 +193,8 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
156193
this._socket.end();
157194
}
158195

159-
getBuffer(): Buffer {
160-
return Buffer.concat(this._chunks);
196+
readEntireBuffer(): Buffer {
197+
return this._incomingData.read(this._incomingData.byteLength);
161198
}
162199

163200
send(buffer: Buffer): void {

src/vs/base/parts/ipc/test/node/ipc.net.test.ts

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -85,39 +85,4 @@ suite('IPC, Socket Protocol', () => {
8585
});
8686
});
8787
});
88-
89-
test('can devolve to a socket and evolve again without losing data', () => {
90-
let resolve: (v: void) => void;
91-
let result = new Promise<void>((_resolve, _reject) => {
92-
resolve = _resolve;
93-
});
94-
const sender = new Protocol(stream);
95-
const receiver1 = new Protocol(stream);
96-
97-
assert.equal(stream.listenerCount('data'), 2);
98-
assert.equal(stream.listenerCount('end'), 2);
99-
100-
receiver1.onMessage((msg) => {
101-
assert.equal(JSON.parse(msg.toString()).value, 1);
102-
103-
let buffer = receiver1.getBuffer();
104-
receiver1.dispose();
105-
106-
assert.equal(stream.listenerCount('data'), 1);
107-
assert.equal(stream.listenerCount('end'), 1);
108-
109-
const receiver2 = new Protocol(stream, buffer);
110-
receiver2.onMessage((msg) => {
111-
assert.equal(JSON.parse(msg.toString()).value, 2);
112-
resolve(undefined);
113-
});
114-
});
115-
116-
const msg1 = { value: 1 };
117-
const msg2 = { value: 2 };
118-
sender.send(Buffer.from(JSON.stringify(msg1)));
119-
sender.send(Buffer.from(JSON.stringify(msg2)));
120-
121-
return result;
122-
});
12388
});

0 commit comments

Comments
 (0)