Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 23, 2021
1 parent ea09812 commit 9506bc2
Showing 1 changed file with 70 additions and 20 deletions.
90 changes: 70 additions & 20 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,6 @@ function isStream(obj) {
return isReadable(obj) || isWritable(obj);
}

function isDuplexNodeStream(obj) {
return !!(
obj &&
(typeof obj.pipe === 'function' && obj._readableState) &&
typeof obj.on === 'function' &&
typeof obj.write === 'function'
);
}

function isReadableNodeStream(obj) {
return !!(
obj &&
Expand All @@ -50,6 +41,15 @@ function isWritableNodeStream(obj) {
);
}

function isDuplexNodeStream(obj) {
return !!(
obj &&
(typeof obj.pipe === 'function' && obj._readableState) &&
typeof obj.on === 'function' &&
typeof obj.write === 'function'
);
}

function isNodeStream(obj) {
return (
obj &&
Expand All @@ -62,20 +62,65 @@ function isNodeStream(obj) {
);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
return typeof obj[SymbolAsyncIterator] === 'function' ||
typeof obj[SymbolIterator] === 'function';
}

function isDestroyed(stream) {
if (!isNodeStream(stream)) return null;
const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;
return !!(stream.destroyed || state?.destroyed);
return !!(stream.destroyed || stream[kDestroyed] || state?.destroyed);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
return typeof obj[SymbolAsyncIterator] === 'function' ||
typeof obj[SymbolIterator] === 'function';
// Have been end():d.
function isWritableEnded(stream) {
if (!isWritableNodeStream(stream)) return null;
if (stream.writableEnded === true) return true;
const wState = stream._writableState;
if (wState?.errored) return false;
if (typeof wState?.ended !== 'boolean') return null;
return wState.ended;
}

// Have emitted 'finish'.
function isWritableFinished(stream, strict) {
if (!isWritableNodeStream(stream)) return null;
if (stream.writableFinished === true) return true;
const wState = stream._writableState;
if (wState?.errored) return false;
if (typeof wState?.finished !== 'boolean') return null;
return !!(
wState.finished ||
(strict === false && wState.ended === true && wState.length === 0)
);
}

// Have been push(null):d.
function isReadableEnded(stream) {
if (!isReadableNodeStream(stream)) return null;
if (stream.readableEnded === true) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
if (typeof rState?.ended !== 'boolean') return null;
return rState.ended;
}

// Have emitted 'end'.
function isReadableFinished(stream, strict) {
if (!isReadableNodeStream(stream)) return null;
const rState = stream._readableState;
if (rState?.errored) return false;
if (typeof rState?.endEmitted !== 'boolean') return null;
return !!(
rState.endEmitted ||
(strict === false && rState.ended === true && rState.length === 0)
);
}

function isDisturbed(stream) {
Expand All @@ -89,12 +134,17 @@ function isDisturbed(stream) {
module.exports = {
isDisturbed,
kIsDisturbed,
isStream,
isDestroyed,
isReadableNodeStream,
isWritableNodeStream,
isDuplexNodeStream,
isNodeStream,
isIterable,
isReadable,
isStream,
isReadableNodeStream,
isReadableEnded,
isReadableFinished,
isNodeStream,
isWritable,
isWritableNodeStream,
isWritableEnded,
isWritableFinished,
};

0 comments on commit 9506bc2

Please sign in to comment.