Skip to content

Commit

Permalink
stream: simplify pipeline
Browse files Browse the repository at this point in the history
PR-URL: nodejs#31316
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
Backport-PR-URL: nodejs#32174
  • Loading branch information
ronag committed Mar 10, 2020
1 parent 4640ea2 commit 0abd4f0
Showing 1 changed file with 3 additions and 23 deletions.
26 changes: 3 additions & 23 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,39 +97,19 @@ function makeAsyncIterable(val) {
return val;
} else if (isReadable(val)) {
// Legacy streams are not Iterable.
return _fromReadable(val);
return fromReadable(val);
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
}
}

async function* _fromReadable(val) {
async function* fromReadable(val) {
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

try {
if (typeof val.read !== 'function') {
// createReadableStreamAsyncIterator does not support
// v1 streams. Convert it into a v2 stream.

if (!PassThrough) {
PassThrough = require('_stream_passthrough');
}

const pt = new PassThrough();
val
.on('error', (err) => pt.destroy(err))
.pipe(pt);
yield* createReadableStreamAsyncIterator(pt);
} else {
yield* createReadableStreamAsyncIterator(val);
}
} finally {
destroyStream(val);
}
yield* createReadableStreamAsyncIterator(val);
}

async function pump(iterable, writable, finish) {
Expand Down

0 comments on commit 0abd4f0

Please sign in to comment.