Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
stream: postpone setting flowing for on('readable')
Now state.flowing will be set only after all of the 'readable'
listeners are gone and if we have at least one 'data' listener.

* on('data') will not flow (flowing === null and not false) if there
  are 'readable' listeners
* pipe() will work regardless of 'readable' listeners
* isPause reports only user .pause call (before setting 'data' listener
  when there is already 'readable' listener also set flowing to false)
* resume always sets stream to flowing state
  • Loading branch information
lundibundi committed Aug 9, 2018
commit c07abd7fe802bf2b8b68bcd79feddd99994de5ab
17 changes: 8 additions & 9 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -801,8 +801,8 @@ Readable.prototype.on = function(ev, fn) {
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;

// Try start flowing on next tick if stream isn't explicitly paused
if (state.flowing !== false)
// Try start flowing on next tick for data if stream isn't explicitly paused
if (!state.readableListening && state.flowing !== false)
this.resume();
} else if (ev === 'readable') {
if (!state.endEmitted && !state.readableListening) {
Expand Down Expand Up @@ -855,13 +855,14 @@ Readable.prototype.removeAllListeners = function(ev) {
function updateReadableListening(self) {
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;

// try to start flowing for 'data' listeners
// (if pipesCount is not 0 then we have already 'flowed')
if (!state.readableListening &&
self.listenerCount('data') > 0 &&
state.pipesCount === 0) {
if (self.listenerCount('data') > 0 &&
!self.isPaused() &&
state.pipesCount === 0 &&
!state.readableListening)
self.resume();
Copy link
Copy Markdown
Member

@mafintosh mafintosh Aug 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the stream was explicitly paused before? Something like:

stream.pause()
stream.on('data', ...)
stream.once('readable', ...)

Seems bad that this would auto resume that, if that is the case

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lundibundi can you please add a unit test in that case?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've added a test and now spotted a problem. After @mcollina PR to make readable take precedence over data there is no way now to determine whether it was user who called pause or we via (this.resume() in on('data') handler) because we use cf5f986#diff-ba6a0df0f5212f5cba5ca5179e209a17R878 and now state.flowing is false if we add 'data' listener after 'readable' listener even though user didn't explicitly pause the stream.

I'm not sure what's the correct solution here. If I add this.isPaused check in here it will only work if a user called it like this

r.on('data', (chunk) => receivedData += chunk);
r.once('readable', common.mustCall());

and won't work for

r.once('readable', common.mustCall());
r.on('data', (chunk) => receivedData += chunk);

which is extremely bizarre.

I'm thinking of not setting flowing at all until we have 0 'readable' listeners at which point check for data listeners and then set it to true/false (resume/pause).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of not setting flowing at all until we have 0 'readable' listeners at which point check for data listeners and then set it to true/false (resume/pause).

I think that would work.

}
}

function nReadingNextTick(self) {
Expand All @@ -875,9 +876,7 @@ Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
// we flow only if there is no one listening
// for readable
state.flowing = !state.readableListening;
state.flowing = true;
resume(this, state);
}
return this;
Expand Down
26 changes: 26 additions & 0 deletions test/parallel/test-stream-readable-and-data-pause.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';

const common = require('../common');

// This test ensures that if we have both 'readable' and 'data'
// listeners on Readable instance once all of the 'readable' listeners
// are gone and there are still 'data' listeners stream will *not*
// try to flow if it was explicitly paused.

const { Readable } = require('stream');

const r = new Readable({
read: () => {},
});

const data = ['foo', 'bar', 'baz'];

r.pause();

r.on('data', common.mustNotCall());
r.on('end', common.mustNotCall());
r.once('readable', common.mustCall());

for (const d of data)
r.push(d);
r.push(null);
7 changes: 3 additions & 4 deletions test/parallel/test-stream-readable-and-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const common = require('../common');

// This test ensures that if we have both 'readable' and 'data'
// listeners on Readable instance once the 'readable' listeners
// listeners on Readable instance once all of the 'readable' listeners
// are gone and there are still 'data' listeners stream will try
// to flow to satisfy the 'data' listeners.

Expand All @@ -23,7 +23,6 @@ r.once('end', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));

r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
for (const d of data)
r.push(d);
r.push(null);
34 changes: 34 additions & 0 deletions test/parallel/test-stream-readable-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';

const common = require('../common');

// This test ensures that if have 'readable' listener
// on Readable instance it will not disrupt the pipe.

const assert = require('assert');
const { Readable, Writable } = require('stream');

let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

r.on('readable', common.mustCall());

r.pipe(w);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));
40 changes: 40 additions & 0 deletions test/parallel/test-stream-readable-remove-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const common = require('../common');

// This test ensures that if we remove last 'readable' listener
// on Readable instance that is piped it will not disrupt the pipe.

const assert = require('assert');
const { Readable, Writable } = require('stream');

let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

const listener = common.mustNotCall();
r.on('readable', listener);

r.pipe(w);
r.push(data[0]);

r.removeListener('readable', listener);

process.nextTick(() => {
r.push(data[1]);
r.push(data[2]);
r.push(null);
});

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));