Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup
  • Loading branch information
ronag committed Aug 24, 2021
commit 898514e42367b6f9374ba64786508fd8db4c48db
21 changes: 2 additions & 19 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
isNodeStream,
isDestroyed,
isReadableFinished,
isReadableNodeStream,
isWritableNodeStream,
isWritableEnded,
isReadable,
isWritable,
} = require('internal/streams/utils');
const {
AbortError,
Expand All @@ -19,20 +16,6 @@ const {
},
} = require('internal/errors');

function isReadable(stream) {
const r = isReadableNodeStream(stream);
if (r === null || typeof stream.readable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.readable && !isReadableFinished(stream);
}

function isWritable(stream) {
const r = isWritableNodeStream(stream);
if (r === null || typeof stream.writable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.writable && !isWritableEnded(stream);
}

// This is needed for pre node 17.
class ComposeDuplex extends Duplex {
constructor(options) {
Expand Down
16 changes: 2 additions & 14 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ const {
isIterable,
isNodeStream,
isDestroyed,
isReadable,
isReadableNodeStream,
isWritable,
isWritableNodeStream,
isDuplexNodeStream,
isReadableFinished,
Expand Down Expand Up @@ -32,20 +34,6 @@ const {
FunctionPrototypeCall
} = primordials;

function isReadable(stream) {
const r = isReadableNodeStream(stream);
if (r === null || typeof stream?.readable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.readable && !isReadableFinished(stream);
}

function isWritable(stream) {
const r = isWritableNodeStream(stream);
if (r === null || typeof stream?.writable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.writable && !isWritableEnded(stream);
}

// This is needed for pre node 17.
class Duplexify extends Duplex {
constructor(options) {
Expand Down
14 changes: 7 additions & 7 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');

const {
isIterable,
isReadable,
isStream,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');

let PassThrough;
Expand Down Expand Up @@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadable(val)) {
} else if (isReadableNodeStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
}
Expand All @@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
Expand Down Expand Up @@ -269,8 +269,8 @@ function pipeline(...streams) {
finishCount++;
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
Expand Down
117 changes: 102 additions & 15 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,6 @@ const {

const kIsDisturbed = Symbol('kIsDisturbed');

function isReadable(obj) {
return !!(obj && typeof obj.pipe === 'function' &&
typeof obj.on === 'function');
}

function isWritable(obj) {
return !!(obj && typeof obj.write === 'function' &&
typeof obj.on === 'function');
}

function isStream(obj) {
return isReadable(obj) || isWritable(obj);
}

function isReadableNodeStream(obj) {
return !!(
obj &&
Expand Down Expand Up @@ -131,12 +117,110 @@ function isDisturbed(stream) {
));
}

function isReadable(stream) {
const r = isReadableNodeStream(stream);
if (r === null || typeof stream?.readable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.readable && !isReadableFinished(stream);
}

function isWritable(stream) {
const r = isWritableNodeStream(stream);
if (r === null || typeof stream?.writable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.writable && !isWritableEnded(stream);
}

function isFinished(stream, opts) {
if (!isNodeStream(stream)) {
return null;
}

if (isDestroyed(stream)) {
return true;
}

if (opts?.readable !== false && isReadable(stream)) {
return false;
}

if (opts?.writable !== false && isWritable(stream)) {
return false;
}

return true;
}

function isClosed(stream) {
if (!isNodeStream(stream)) {
return null;
}

const wState = stream._writableState;
const rState = stream._readableState;

if (
typeof wState?.closed === 'boolean' ||
typeof rState?.closed === 'boolean'
) {
return wState?.closed || rState?.closed;
}

if (typeof stream._closed === 'boolean' && isOutgoingMessage(stream)) {
return stream._closed;
}

return null;
}

function isOutgoingMessage(stream) {
return (
typeof stream._closed === 'boolean' &&
typeof stream._defaultKeepAlive === 'boolean' &&
typeof stream._removedConnection === 'boolean' &&
typeof stream._removedContLen === 'boolean'
);
}

function isServerResponse(stream) {
return (
typeof stream._sent100 === 'boolean' &&
isOutgoingMessage(stream)
);
}

function isServerRequest(stream) {
return (
typeof stream._consuming === 'boolean' &&
typeof stream._dumped === 'boolean' &&
stream.req?.upgradeOrConnect === undefined
);
}

function willEmitClose(stream) {
if (!isNodeStream(stream)) return null;

const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;

return (!state && isServerResponse(stream)) || !!(
state &&
state.autoDestroy &&
state.emitClose &&
state.closed === false
);
}


module.exports = {
isDisturbed,
kIsDisturbed,
isStream,
isStream: isNodeStream,
isClosed,
isDestroyed,
isDuplexNodeStream,
isFinished,
isIterable,
isReadable,
isReadableNodeStream,
Expand All @@ -147,4 +231,7 @@ module.exports = {
isWritableNodeStream,
isWritableEnded,
isWritableFinished,
isServerRequest,
isServerResponse,
willEmitClose,
};