Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup: NodeStream
  • Loading branch information
ronag committed Jul 10, 2021
commit 57cfc0d6993e6477788fe8eeaae093cef9d3a8b1
4 changes: 2 additions & 2 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ const validateAbortSignal = (signal, name) => {
}
};

function isStream(obj) {
function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isStream(stream)) {
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
Expand Down
12 changes: 6 additions & 6 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ const {
const {
isClosed,
isReadable,
isReadableStream,
isReadableNodeStream,
isReadableFinished,
isWritable,
isWritableStream,
isWritableNodeStream,
isWritableFinished,
willEmitClose: _willEmitClose,
} = require('internal/streams/utils');
Expand All @@ -49,9 +49,9 @@ function eos(stream, options, callback) {
callback = once(callback);

const readable = options.readable ||
(options.readable !== false && isReadableStream(stream));
(options.readable !== false && isReadableNodeStream(stream));
const writable = options.writable ||
(options.writable !== false && isWritableStream(stream));
(options.writable !== false && isWritableNodeStream(stream));

const wState = stream._writableState;
const rState = stream._readableState;
Expand All @@ -65,8 +65,8 @@ function eos(stream, options, callback) {
// this generic check.
let willEmitClose = (
_willEmitClose(stream) &&
isReadableStream(stream) === readable &&
isWritableStream(stream) === writable
isReadableNodeStream(stream) === readable &&
isWritableNodeStream(stream) === writable
);
Comment thread
ronag marked this conversation as resolved.

let writableFinished = isWritableFinished(stream, false);
Expand Down
14 changes: 7 additions & 7 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');

const {
isIterable,
isReadableStream,
isStream,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');

let PassThrough;
Expand Down Expand Up @@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadableStream(val)) {
} else if (isReadableNodeStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
}
Expand All @@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadableStream(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -271,8 +271,8 @@ function pipeline(...streams) {
finishCount++;
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadableStream(ret)) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
Expand Down
34 changes: 17 additions & 17 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const {

const kDestroyed = Symbol('kDestroyed');

function isReadableStream(obj) {
function isReadableNodeStream(obj) {
return !!(
obj &&
typeof obj.pipe === 'function' &&
Expand All @@ -18,7 +18,7 @@ function isReadableStream(obj) {
);
}

function isWritableStream(obj) {
function isWritableNodeStream(obj) {
return !!(
obj &&
typeof obj.write === 'function' &&
Expand All @@ -27,8 +27,8 @@ function isWritableStream(obj) {
);
}

function isStream(obj) {
return isReadableStream(obj) || isWritableStream(obj);
function isNodeStream(obj) {
return isReadableNodeStream(obj) || isWritableNodeStream(obj);
}

function isIterable(obj, isAsync) {
Expand All @@ -40,7 +40,7 @@ function isIterable(obj, isAsync) {
}

function isDestroyed(stream) {
if (!isStream(stream)) return null;
if (!isNodeStream(stream)) return null;
const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;
Expand All @@ -49,7 +49,7 @@ function isDestroyed(stream) {

// Have been end():d.
function isWritableEnded(stream) {
if (!isWritableStream(stream)) return null;
if (!isWritableNodeStream(stream)) return null;
if (stream.writableEnded === true) return true;
const wState = stream._writableState;
if (wState?.errored) return false;
Expand All @@ -59,7 +59,7 @@ function isWritableEnded(stream) {

// Have emitted 'finish'.
function isWritableFinished(stream, strict) {
if (!isWritableStream(stream)) return null;
if (!isWritableNodeStream(stream)) return null;
if (stream.writableFinished === true) return true;
const wState = stream._writableState;
if (wState?.errored) return false;
Expand All @@ -72,7 +72,7 @@ function isWritableFinished(stream, strict) {

// Have been push(null):d.
function isReadableEnded(stream) {
if (!isReadableStream(stream)) return null;
if (!isReadableNodeStream(stream)) return null;
if (stream.readableEnded === true) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
Expand All @@ -82,7 +82,7 @@ function isReadableEnded(stream) {

// Have emitted 'end'.
function isReadableFinished(stream, strict) {
if (!isReadableStream(stream)) return null;
if (!isReadableNodeStream(stream)) return null;
const rState = stream._readableState;
if (rState?.errored) return false;
if (typeof rState?.endEmitted !== 'boolean') return null;
Expand All @@ -93,21 +93,21 @@ function isReadableFinished(stream, strict) {
}

function isReadable(stream) {
const r = isReadableStream(stream);
const r = isReadableNodeStream(stream);
if (r === null || typeof stream.readable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.readable && !isReadableFinished(stream);
}

function isWritable(stream) {
const r = isWritableStream(stream);
const r = isWritableNodeStream(stream);
if (r === null || typeof stream.writable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.writable && !isWritableEnded(stream);
}

function isFinished(stream, opts) {
if (!isStream(stream)) {
if (!isNodeStream(stream)) {
return null;
}

Expand All @@ -127,7 +127,7 @@ function isFinished(stream, opts) {
}

function isClosed(stream) {
if (!isStream(stream)) {
if (!isNodeStream(stream)) {
return null;
}

Expand Down Expand Up @@ -173,7 +173,7 @@ function isServerRequest(stream) {
}

function willEmitClose(stream) {
if (!isStream(stream)) return null;
if (!isNodeStream(stream)) return null;

const wState = stream._writableState;
const rState = stream._readableState;
Expand All @@ -194,12 +194,12 @@ module.exports = {
isFinished,
isIterable,
isReadable,
isReadableStream,
isReadableNodeStream,
isReadableEnded,
isReadableFinished,
isStream,
isNodeStream,
isWritable,
isWritableStream,
isWritableNodeStream,
isWritableEnded,
isWritableFinished,
Comment thread
ronag marked this conversation as resolved.
isServerRequest,
Expand Down
4 changes: 2 additions & 2 deletions lib/stream/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const {

const {
isIterable,
isStream,
isNodeStream,
} = require('internal/streams/utils');

const pl = require('internal/streams/pipeline');
Expand All @@ -26,7 +26,7 @@ function pipeline(...streams) {
let signal;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isStream(lastArg) && !isIterable(lastArg)) {
!isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
validateAbortSignal(signal, 'options.signal');
Expand Down