-
-
Notifications
You must be signed in to change notification settings - Fork 35.4k
stream: flow for 'data' listeners upon removal of last 'readable' listener #21696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 1 commit
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
stream: postpone setting flowing for on('readable')
Now state.flowing will be set only after all of the 'readable'
listeners are gone and if we have at least one 'data' listener.
* on('data') will not flow (flowing === null and not false) if there
are 'readable' listeners
* pipe() will work regardless of 'readable' listeners
* isPause reports only user .pause call (before setting 'data' listener
when there is already 'readable' listener also set flowing to false)
* resume always sets stream to flowing state- Loading branch information
commit c07abd7fe802bf2b8b68bcd79feddd99994de5ab
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| 'use strict'; | ||
|
|
||
| const common = require('../common'); | ||
|
|
||
| // This test ensures that if we have both 'readable' and 'data' | ||
| // listeners on Readable instance once all of the 'readable' listeners | ||
| // are gone and there are still 'data' listeners stream will *not* | ||
| // try to flow if it was explicitly paused. | ||
|
|
||
| const { Readable } = require('stream'); | ||
|
|
||
| const r = new Readable({ | ||
| read: () => {}, | ||
| }); | ||
|
|
||
| const data = ['foo', 'bar', 'baz']; | ||
|
|
||
| r.pause(); | ||
|
|
||
| r.on('data', common.mustNotCall()); | ||
| r.on('end', common.mustNotCall()); | ||
| r.once('readable', common.mustCall()); | ||
|
|
||
| for (const d of data) | ||
| r.push(d); | ||
| r.push(null); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| 'use strict'; | ||
|
|
||
| const common = require('../common'); | ||
|
|
||
| // This test ensures that if have 'readable' listener | ||
| // on Readable instance it will not disrupt the pipe. | ||
|
|
||
| const assert = require('assert'); | ||
| const { Readable, Writable } = require('stream'); | ||
|
|
||
| let receivedData = ''; | ||
| const w = new Writable({ | ||
| write: (chunk, env, callback) => { | ||
| receivedData += chunk; | ||
| callback(); | ||
| }, | ||
| }); | ||
|
|
||
| const data = ['foo', 'bar', 'baz']; | ||
| const r = new Readable({ | ||
| read: () => {}, | ||
| }); | ||
|
|
||
| r.on('readable', common.mustCall()); | ||
|
|
||
| r.pipe(w); | ||
| r.push(data[0]); | ||
| r.push(data[1]); | ||
| r.push(data[2]); | ||
| r.push(null); | ||
|
|
||
| w.on('finish', common.mustCall(() => { | ||
| assert.strictEqual(receivedData, data.join('')); | ||
| })); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| 'use strict'; | ||
|
|
||
| const common = require('../common'); | ||
|
|
||
| // This test ensures that if we remove last 'readable' listener | ||
| // on Readable instance that is piped it will not disrupt the pipe. | ||
|
|
||
| const assert = require('assert'); | ||
| const { Readable, Writable } = require('stream'); | ||
|
|
||
| let receivedData = ''; | ||
| const w = new Writable({ | ||
| write: (chunk, env, callback) => { | ||
| receivedData += chunk; | ||
| callback(); | ||
| }, | ||
| }); | ||
|
|
||
| const data = ['foo', 'bar', 'baz']; | ||
| const r = new Readable({ | ||
| read: () => {}, | ||
| }); | ||
|
|
||
| const listener = common.mustNotCall(); | ||
| r.on('readable', listener); | ||
|
|
||
| r.pipe(w); | ||
| r.push(data[0]); | ||
|
|
||
| r.removeListener('readable', listener); | ||
|
|
||
| process.nextTick(() => { | ||
| r.push(data[1]); | ||
| r.push(data[2]); | ||
| r.push(null); | ||
| }); | ||
|
|
||
| w.on('finish', common.mustCall(() => { | ||
| assert.strictEqual(receivedData, data.join('')); | ||
| })); |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the stream was explicitly paused before? Something like:
Seems bad that this would auto resume that, if that is the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lundibundi can you please add a unit test in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've added a test and now spotted a problem. After @mcollina PR to make readable take precedence over data there is no way now to determine whether it was user who called pause or we via (this.resume() in on('data') handler) because we use cf5f986#diff-ba6a0df0f5212f5cba5ca5179e209a17R878 and now state.flowing is false if we add 'data' listener after 'readable' listener even though user didn't explicitly pause the stream.
I'm not sure what's the correct solution here. If I add
this.isPausedcheck in here it will only work if a user called it like thisand won't work for
which is extremely bizarre.
I'm thinking of not setting
flowingat all until we have 0 'readable' listeners at which point check for data listeners and then set it to true/false (resume/pause).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would work.