@@ -35,6 +35,7 @@ const {
3535 SymbolAsyncIterator,
3636 Symbol,
3737 TypeError,
38+ Uint8Array,
3839} = primordials ;
3940
4041module . exports = Readable ;
@@ -45,6 +46,8 @@ const { Stream, prependListener } = require('internal/streams/legacy');
4546const { Buffer } = require ( 'buffer' ) ;
4647
4748let Blob ;
49+ let ReadableStream ;
50+ let CountQueuingStrategy ;
4851
4952const {
5053 addAbortSignal,
@@ -75,6 +78,7 @@ const { validateObject } = require('internal/validators');
7578
7679const kPaused = Symbol ( 'kPaused' ) ;
7780const kConsume = Symbol ( 'kConsume' ) ;
81+ const kReading = Symbol ( 'kReading' ) ;
7882
7983const { StringDecoder } = require ( 'string_decoder' ) ;
8084const from = require ( 'internal/streams/from' ) ;
@@ -213,6 +217,7 @@ function Readable(options) {
213217 }
214218
215219 this [ kConsume ] = null ;
220+ this [ kReading ] = false ; // Is stream being consumed through Readable API?
216221
217222 Stream . call ( this , options ) ;
218223
@@ -238,6 +243,11 @@ Readable.prototype[EE.captureRejectionSymbol] = function(err) {
238243// similar to how Writable.write() returns true if you should
239244// write() some more.
240245Readable . prototype . push = function ( chunk , encoding ) {
246+ if ( this [ kConsume ] && chunk !== null && ! this [ kReading ] ) {
247+ encoding = encoding || this . _readableState . defaultEncoding ;
248+ return this [ kConsume ] . push ( chunk , encoding ) ;
249+ }
250+
241251 return readableAddChunk ( this , chunk , encoding , false ) ;
242252} ;
243253
@@ -307,10 +317,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
307317 maybeReadMore ( stream , state ) ;
308318 }
309319
320+ const consumed = this [ kConsume ] ? this [ kConsume ] . push ( chunk , encoding ) : true ;
321+
310322 // We can push more data if we are below the highWaterMark.
311323 // Also, if we have no data yet, we can stand some more bytes.
312324 // This is to work around cases where hwm=0, such as the repl.
313- return ! state . ended &&
325+ return consumed && ! state . ended &&
314326 ( state . length < state . highWaterMark || state . length === 0 ) ;
315327}
316328
@@ -402,6 +414,27 @@ function howMuchToRead(n, state) {
402414 return state . ended ? state . length : 0 ;
403415}
404416
417+
418+ function _read ( self , n ) {
419+ // Call internal read method
420+ try {
421+ const result = self . _read ( n ) ;
422+ if ( result != null ) {
423+ const then = result . then ;
424+ if ( typeof then === 'function' ) {
425+ then . call (
426+ result ,
427+ nop ,
428+ function ( err ) {
429+ errorOrDestroy ( self , err ) ;
430+ } ) ;
431+ }
432+ }
433+ } catch ( err ) {
434+ errorOrDestroy ( self , err ) ;
435+ }
436+ }
437+
405438// You can override either this method, or the async _read(n) below.
406439Readable . prototype . read = function ( n ) {
407440 debug ( 'read' , n ) ;
@@ -496,22 +529,7 @@ Readable.prototype.read = function(n) {
496529 state . needReadable = true ;
497530
498531 // Call internal read method
499- try {
500- const result = this . _read ( state . highWaterMark ) ;
501- if ( result != null ) {
502- const then = result . then ;
503- if ( typeof then === 'function' ) {
504- then . call (
505- result ,
506- nop ,
507- function ( err ) {
508- errorOrDestroy ( this , err ) ;
509- } ) ;
510- }
511- }
512- } catch ( err ) {
513- errorOrDestroy ( this , err ) ;
514- }
532+ _read ( this , state . highWaterMark ) ;
515533
516534 state . sync = false ;
517535 // If _read pushed data synchronously, then `reading` will be false,
@@ -906,6 +924,8 @@ Readable.prototype.on = function(ev, fn) {
906924 const state = this . _readableState ;
907925
908926 if ( ev === 'data' ) {
927+ this [ kReading ] = true ;
928+
909929 // Update readableListening so that resume() may be a no-op
910930 // a few lines down. This is needed to support once('readable').
911931 state . readableListening = this . listenerCount ( 'readable' ) > 0 ;
@@ -914,6 +934,8 @@ Readable.prototype.on = function(ev, fn) {
914934 if ( state . flowing !== false )
915935 this . resume ( ) ;
916936 } else if ( ev === 'readable' ) {
937+ this [ kReading ] = true ;
938+
917939 if ( ! state . endEmitted && ! state . readableListening ) {
918940 state . readableListening = state . needReadable = true ;
919941 state . flowing = false ;
@@ -1363,56 +1385,154 @@ function consume(self, type) {
13631385 }
13641386
13651387 if ( type === kWebStreamType ) {
1366- self [ kConsume ] = {
1388+ if ( ! ReadableStream ) {
1389+ ReadableStream = require ( 'internal/webstreams/readablestream' )
1390+ . ReadableStream ;
1391+ }
1392+
1393+ const objectMode = self . readableObjectMode ;
1394+ const highWaterMark = self . readableHighWaterMark ;
1395+ // When not running in objectMode explicitly, we just fall
1396+ // back to a minimal strategy that just specifies the highWaterMark
1397+ // and no size algorithm. Using a ByteLengthQueuingStrategy here
1398+ // is unnecessary.
1399+ let strategy ;
1400+ if ( objectMode ) {
1401+ if ( ! CountQueuingStrategy ) {
1402+ CountQueuingStrategy = require ( 'internal/webstreams/queuingstrategies' ) ;
1403+ }
1404+ strategy = new CountQueuingStrategy ( { highWaterMark } ) ;
1405+ } else {
1406+ strategy = { highWaterMark } ;
1407+ }
1408+
1409+ self . on ( 'error' , function ( err ) {
1410+ const { controller } = this [ kConsume ] ;
1411+
1412+ controller . error ( err ) ;
1413+ } ) ;
1414+
1415+ const consume = self [ kConsume ] = {
13671416 type,
1368- body : Readable . toWeb ( self )
1417+ objectMode,
1418+ controller : null ,
1419+ push ( chunk ) {
1420+ const { objectMode, controller } = this ;
1421+
1422+ if ( chunk === null ) {
1423+ controller . close ( ) ;
1424+ } else {
1425+ if ( ! objectMode ) {
1426+ if ( typeof chunk === 'string' ) {
1427+ chunk = new Uint8Array ( Buffer . from ( chunk ) ) ;
1428+ } else if ( Buffer . isBuffer ( chunk ) ) {
1429+ // Copy the Buffer to detach it from the pool.
1430+ chunk = new Uint8Array ( chunk ) ;
1431+ } else if ( Stream . _isUint8Array ( chunk ) ) {
1432+ // Do nothing...
1433+ } else if ( chunk != null ) {
1434+ throw new ERR_INVALID_ARG_TYPE (
1435+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
1436+ }
1437+ }
1438+
1439+ controller . enqueue ( chunk ) ;
1440+ }
1441+
1442+ return controller . desiredSize > 0 ;
1443+ } ,
1444+ stream : new ReadableStream ( {
1445+ start ( c ) {
1446+ consume . controller = c ;
1447+ } ,
1448+ pull ( ) {
1449+ const n = consume . controller . desiredSize ;
1450+
1451+ if ( self [ kReading ] ) {
1452+ self . read ( n ) ;
1453+ } else {
1454+ _read ( self , n ) ;
1455+ }
1456+ } ,
1457+ cancel ( reason ) {
1458+ self . destroy ( reason ) ;
1459+ } ,
1460+ } , strategy )
13691461 } ;
13701462
1371- return self [ kConsume ] . body ;
1463+ return consume . stream ;
13721464 }
13731465
13741466 return new Promise ( ( resolve , reject ) => {
13751467 self [ kConsume ] = {
13761468 type,
13771469 resolve,
13781470 reject,
1379- body : type === kTextType || type === kJSONType ? '' : [ ]
1380- } ;
1381- self
1382- . on ( 'error' , reject )
1383- . on ( 'data' , function ( val ) {
1384- const { type } = this [ kConsume ] ;
1471+ decoder : null ,
1472+ body : type === kTextType || type === kJSONType ? '' : [ ] ,
1473+ push ( chunk , encoding ) {
1474+ const { type, body, resolve, decoder } = this [ kConsume ] ;
1475+
1476+ if ( chunk === null ) {
1477+ try {
1478+ if ( type === kTextType ) {
1479+ resolve ( body + ( decoder ? decoder . end ( ) : '' ) ) ;
1480+ } else if ( type === kJSONType ) {
1481+ resolve ( JSONParse ( body + ( decoder ? decoder . end ( ) : '' ) ) ) ;
1482+ } else if ( type === kArrayBufferType ) {
1483+ resolve ( Buffer . concat ( body ) . buffer ) ;
1484+ } else if ( type === kBlobType ) {
1485+ if ( ! Blob ) {
1486+ Blob = require ( 'buffer' ) . Blob ;
1487+ }
1488+ resolve ( new Blob ( body ) ) ;
1489+ }
13851490
1386- // TODO (fix): Do we need type check and/or conversion?
1491+ this [ kConsume ] . body = null ;
1492+ } catch ( err ) {
1493+ self . destroy ( err ) ;
1494+ }
1495+ } else if ( type === kTextType || type === kJSONType ) {
1496+ if ( typeof chunk === 'string' ) {
1497+ if ( decoder ) {
1498+ chunk = decoder . write ( Buffer . from ( decoder ) ) ;
1499+ }
1500+ // TODO: Encoding check/transform?
1501+ } else if ( chunk instanceof Buffer ) {
1502+ if ( ! decoder ) {
1503+ this [ kConsume ] . decoder = new StringDecoder ( 'utf8' ) ;
1504+ }
1505+ encoding = decoder . write ( chunk ) ;
1506+ } else if ( Stream . _isUint8Array ( chunk ) ) {
1507+ encoding = decoder . write ( Stream . _uint8ArrayToBuffer ( chunk ) ) ;
1508+ } else {
1509+ throw new ERR_INVALID_ARG_TYPE (
1510+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
1511+ }
13871512
1388- if ( type === kTextType || type === kJSONType ) {
1389- this [ kConsume ] . body += val ;
1513+ this [ kConsume ] . body += chunk ;
13901514 } else {
1391- this [ kConsume ] . body . push ( val ) ;
1392- }
1393- } )
1394- . on ( 'end' , function ( ) {
1395- const { type, resolve, body } = this [ kConsume ] ;
1396-
1397- try {
1398- if ( type === kTextType ) {
1399- resolve ( body ) ;
1400- } else if ( type === kJSONType ) {
1401- resolve ( JSONParse ( body ) ) ;
1402- } else if ( type === kArrayBufferType ) {
1403- resolve ( Buffer . concat ( body ) . buffer ) ;
1404- } else if ( type === kBlobType ) {
1405- if ( ! Blob ) {
1406- Blob = require ( 'buffer' ) . Blob ;
1407- }
1408- resolve ( new Blob ( body ) ) ;
1515+ if ( typeof chunk === 'string' ) {
1516+ chunk = Buffer . from ( chunk ) ;
1517+ // TODO: Encoding check/transform?
1518+ } else if ( chunk instanceof Buffer ) {
1519+ // Do nothing...
1520+ } else if ( Stream . _isUint8Array ( chunk ) ) {
1521+ chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
1522+ } else {
1523+ throw new ERR_INVALID_ARG_TYPE (
1524+ 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
14091525 }
14101526
1411- this [ kConsume ] . body = null ;
1412- } catch ( err ) {
1413- self . destroy ( err ) ;
1527+ this [ kConsume ] . body . push ( chunk ) ;
14141528 }
1415- } )
1529+
1530+ return true ;
1531+ }
1532+ } ;
1533+
1534+ self
1535+ . on ( 'error' , reject )
14161536 . on ( 'close' , function ( ) {
14171537 const { body, reject } = this [ kConsume ] ;
14181538
@@ -1522,5 +1642,6 @@ Readable.fromWeb = function(readableStream, options) {
15221642} ;
15231643
15241644Readable . toWeb = function ( streamReadable ) {
1525- return lazyWebStreams ( ) . newReadableStreamFromStreamReadable ( streamReadable ) ;
1645+ return streamReadable [ kConsume ] !== undefined ? streamReadable . body :
1646+ lazyWebStreams ( ) . newReadableStreamFromStreamReadable ( streamReadable ) ;
15261647} ;
0 commit comments