File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ 'use strict' ;
2+ const common = require ( '../common' ) ;
3+ const { Readable, Writable, PassThrough } = require ( 'stream' ) ;
4+
5+ {
6+ let ticks = 17 ;
7+
8+ const rs = new Readable ( {
9+ objectMode : true ,
10+ read : ( ) => {
11+ if ( ticks -- > 0 )
12+ return process . nextTick ( ( ) => rs . push ( { } ) ) ;
13+ rs . push ( { } ) ;
14+ rs . push ( null ) ;
15+ }
16+ } ) ;
17+
18+ const ws = new Writable ( {
19+ highWaterMark : 0 ,
20+ objectMode : true ,
21+ write : ( data , end , cb ) => setImmediate ( cb )
22+ } ) ;
23+
24+ rs . on ( 'end' , common . mustCall ( ) ) ;
25+ ws . on ( 'finish' , common . mustCall ( ) ) ;
26+ rs . pipe ( ws ) ;
27+ }
28+
29+ {
30+ let missing = 8 ;
31+
32+ const rs = new Readable ( {
33+ objectMode : true ,
34+ read : ( ) => {
35+ if ( missing -- ) rs . push ( { } ) ;
36+ else rs . push ( null ) ;
37+ }
38+ } ) ;
39+
40+ const pt = rs
41+ . pipe ( new PassThrough ( { objectMode : true , highWaterMark : 2 } ) )
42+ . pipe ( new PassThrough ( { objectMode : true , highWaterMark : 2 } ) ) ;
43+
44+ pt . on ( 'end' , function ( ) {
45+ wrapper . push ( null ) ;
46+ } ) ;
47+
48+ const wrapper = new Readable ( {
49+ objectMode : true ,
50+ read : ( ) => {
51+ process . nextTick ( function ( ) {
52+ let data = pt . read ( ) ;
53+ if ( data === null ) {
54+ pt . once ( 'readable' , function ( ) {
55+ data = pt . read ( ) ;
56+ if ( data !== null ) wrapper . push ( data ) ;
57+ } ) ;
58+ } else {
59+ wrapper . push ( data ) ;
60+ }
61+ } ) ;
62+ }
63+ } ) ;
64+
65+ wrapper . resume ( ) ;
66+ wrapper . on ( 'end' , common . mustCall ( ) ) ;
67+ }
You can’t perform that action at this time.
0 commit comments