@@ -10,7 +10,6 @@ import { join } from 'vs/base/common/path';
1010import { tmpdir } from 'os' ;
1111import { generateUuid } from 'vs/base/common/uuid' ;
1212import { IDisposable } from 'vs/base/common/lifecycle' ;
13- import { TimeoutTimer } from 'vs/base/common/async' ;
1413
1514export function generateRandomPipeName ( ) : string {
1615 const randomSuffix = generateUuid ( ) ;
@@ -22,6 +21,80 @@ export function generateRandomPipeName(): string {
2221 }
2322}
2423
24+ class ChunkStream {
25+
26+ private _chunks : Buffer [ ] ;
27+ private _totalLength : number ;
28+
29+ public get byteLength ( ) {
30+ return this . _totalLength ;
31+ }
32+
33+ constructor ( ) {
34+ this . _chunks = [ ] ;
35+ this . _totalLength = 0 ;
36+ }
37+
38+ public acceptChunk ( buff : Buffer ) {
39+ this . _chunks . push ( buff ) ;
40+ this . _totalLength += buff . byteLength ;
41+ }
42+
43+ public readUInt32BE ( ) : number {
44+ let tmp = this . read ( 4 ) ;
45+ return tmp . readUInt32BE ( 0 ) ;
46+ }
47+
48+ public read ( byteCount : number ) : Buffer {
49+ if ( byteCount === 0 ) {
50+ return Buffer . allocUnsafe ( 0 ) ;
51+ }
52+
53+ if ( byteCount > this . _totalLength ) {
54+ throw new Error ( `Cannot read so many bytes!` ) ;
55+ }
56+
57+ if ( this . _chunks [ 0 ] . byteLength === byteCount ) {
58+ // super fast path, precisely first chunk must be returned
59+ const result = this . _chunks . shift ( ) ! ;
60+ this . _totalLength -= byteCount ;
61+ return result ;
62+ }
63+
64+ if ( this . _chunks [ 0 ] . byteLength > byteCount ) {
65+ // fast path, the reading is entirely within the first chunk
66+ const result = this . _chunks [ 0 ] . slice ( 0 , byteCount ) ;
67+ this . _chunks [ 0 ] = this . _chunks [ 0 ] . slice ( byteCount ) ;
68+ this . _totalLength -= byteCount ;
69+ return result ;
70+ }
71+
72+ let result = Buffer . allocUnsafe ( byteCount ) ;
73+ let resultOffset = 0 ;
74+ while ( byteCount > 0 ) {
75+ const chunk = this . _chunks [ 0 ] ;
76+ if ( chunk . byteLength > byteCount ) {
77+ // this chunk will survive
78+ this . _chunks [ 0 ] = chunk . slice ( byteCount ) ;
79+
80+ chunk . copy ( result , resultOffset , 0 , byteCount ) ;
81+ resultOffset += byteCount ;
82+ this . _totalLength -= byteCount ;
83+ byteCount -= byteCount ;
84+ } else {
85+ // this chunk will be entirely read
86+ this . _chunks . shift ( ) ;
87+
88+ chunk . copy ( result , resultOffset , 0 , chunk . byteLength ) ;
89+ resultOffset += chunk . byteLength ;
90+ this . _totalLength -= chunk . byteLength ;
91+ byteCount -= chunk . byteLength ;
92+ }
93+ }
94+ return result ;
95+ }
96+ }
97+
2598/**
2699 * A message has the following format:
27100 *
@@ -35,9 +108,8 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
35108 private static readonly _headerLen = 4 ;
36109
37110 private _isDisposed : boolean ;
38- private _chunks : Buffer [ ] ;
111+ private _incomingData : ChunkStream ;
39112
40- private _firstChunkTimer : TimeoutTimer ;
41113 private _socketDataListener : ( data : Buffer ) => void ;
42114 private _socketEndListener : ( ) => void ;
43115 private _socketCloseListener : ( ) => void ;
@@ -48,11 +120,9 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
48120 private _onClose = new Emitter < void > ( ) ;
49121 readonly onClose : Event < void > = this . _onClose . event ;
50122
51- constructor ( private _socket : Socket , firstDataChunk ?: Buffer ) {
123+ constructor ( private _socket : Socket ) {
52124 this . _isDisposed = false ;
53- this . _chunks = [ ] ;
54-
55- let totalLength = 0 ;
125+ this . _incomingData = new ChunkStream ( ) ;
56126
57127 const state = {
58128 readHead : true ,
@@ -61,24 +131,15 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
61131
62132 const acceptChunk = ( data : Buffer ) => {
63133
64- this . _chunks . push ( data ) ;
65- totalLength += data . length ;
134+ this . _incomingData . acceptChunk ( data ) ;
66135
67- while ( totalLength > 0 ) {
136+ while ( this . _incomingData . byteLength > 0 ) {
68137
69138 if ( state . readHead ) {
70- // expecting header -> read 5bytes for header
71- // information: `bodyIsJson` and `bodyLen`
72- if ( totalLength >= Protocol . _headerLen ) {
73- const all = Buffer . concat ( this . _chunks ) ;
74-
75- state . bodyLen = all . readUInt32BE ( 0 ) ;
139+ // expecting header -> read header
140+ if ( this . _incomingData . byteLength >= Protocol . _headerLen ) {
141+ state . bodyLen = this . _incomingData . readUInt32BE ( ) ;
76142 state . readHead = false ;
77-
78- const rest = all . slice ( Protocol . _headerLen ) ;
79- totalLength = rest . length ;
80- this . _chunks = [ rest ] ;
81-
82143 } else {
83144 break ;
84145 }
@@ -87,15 +148,8 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
87148 if ( ! state . readHead ) {
88149 // expecting body -> read bodyLen-bytes for
89150 // the actual message or wait for more data
90- if ( totalLength >= state . bodyLen ) {
91-
92- const all = Buffer . concat ( this . _chunks ) ;
93- const buffer = all . slice ( 0 , state . bodyLen ) ;
94-
95- // ensure the getBuffer returns a valid value if invoked from the event listeners
96- const rest = all . slice ( state . bodyLen ) ;
97- totalLength = rest . length ;
98- this . _chunks = [ rest ] ;
151+ if ( this . _incomingData . byteLength >= state . bodyLen ) {
152+ const buffer = this . _incomingData . read ( state . bodyLen ) ;
99153
100154 state . bodyLen = - 1 ;
101155 state . readHead = true ;
@@ -113,28 +167,12 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
113167 }
114168 } ;
115169
116- const acceptFirstDataChunk = ( ) => {
117- if ( firstDataChunk && firstDataChunk . length > 0 ) {
118- let tmp = firstDataChunk ;
119- firstDataChunk = undefined ;
120- acceptChunk ( tmp ) ;
121- }
122- } ;
123-
124- // Make sure to always handle the firstDataChunk if no more `data` event comes in
125- this . _firstChunkTimer = new TimeoutTimer ( ) ;
126- this . _firstChunkTimer . setIfNotSet ( ( ) => {
127- acceptFirstDataChunk ( ) ;
128- } , 0 ) ;
129-
130170 this . _socketDataListener = ( data : Buffer ) => {
131- acceptFirstDataChunk ( ) ;
132171 acceptChunk ( data ) ;
133172 } ;
134173 _socket . on ( 'data' , this . _socketDataListener ) ;
135174
136175 this . _socketEndListener = ( ) => {
137- acceptFirstDataChunk ( ) ;
138176 } ;
139177 _socket . on ( 'end' , this . _socketEndListener ) ;
140178
@@ -146,7 +184,6 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
146184
147185 dispose ( ) : void {
148186 this . _isDisposed = true ;
149- this . _firstChunkTimer . dispose ( ) ;
150187 this . _socket . removeListener ( 'data' , this . _socketDataListener ) ;
151188 this . _socket . removeListener ( 'end' , this . _socketEndListener ) ;
152189 this . _socket . removeListener ( 'close' , this . _socketCloseListener ) ;
@@ -156,8 +193,8 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
156193 this . _socket . end ( ) ;
157194 }
158195
159- getBuffer ( ) : Buffer {
160- return Buffer . concat ( this . _chunks ) ;
196+ readEntireBuffer ( ) : Buffer {
197+ return this . _incomingData . read ( this . _incomingData . byteLength ) ;
161198 }
162199
163200 send ( buffer : Buffer ) : void {
0 commit comments