Skip to content

Commit

Permalink
stream: multiple stream backports
Browse files Browse the repository at this point in the history
includes:

* stream: simpler and faster Readable async iterator
* stream: don't destroy on async iterator success
* stream: async iterator stop read if destroyed

PR-URL: nodejs#34887
Refs: nodejs#34035
Refs: nodejs#35122
Refs: nodejs#35640
Refs: nodejs#34680
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
ronag authored and MylesBorins committed Oct 15, 2020
1 parent b82fc40 commit 573410f
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 271 deletions.
84 changes: 79 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const {
NumberParseInt,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
Set,
SymbolAsyncIterator,
Symbol
Expand Down Expand Up @@ -60,11 +61,11 @@ const kPaused = Symbol('kPaused');

// Lazy loaded to improve the startup performance.
let StringDecoder;
let createReadableStreamAsyncIterator;
let from;

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
function nop() {}

const { errorOrDestroy } = destroyImpl;

Expand Down Expand Up @@ -1076,13 +1077,86 @@ Readable.prototype.wrap = function(stream) {
};

Readable.prototype[SymbolAsyncIterator] = function() {
if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
let stream = this;

if (typeof stream.read !== 'function') {
// v1 stream
const src = stream;
stream = new Readable({
objectMode: true,
destroy(err, callback) {
destroyImpl.destroyer(src, err);
callback(err);
}
}).wrap(src);
}
return createReadableStreamAsyncIterator(this);

const iter = createAsyncIterator(stream);
iter.stream = stream;
return iter;
};

async function* createAsyncIterator(stream) {
let callback = nop;

function next(resolve) {
if (this === stream) {
callback();
callback = nop;
} else {
callback = resolve;
}
}

const state = stream._readableState;

let error = state.errored;
let errorEmitted = state.errorEmitted;
let endEmitted = state.endEmitted;
let closeEmitted = state.closeEmitted;

stream
.on('readable', next)
.on('error', function(err) {
error = err;
errorEmitted = true;
next.call(this);
})
.on('end', function() {
endEmitted = true;
next.call(this);
})
.on('close', function() {
closeEmitted = true;
next.call(this);
});

try {
while (true) {
const chunk = stream.destroyed ? null : stream.read();
if (chunk !== null) {
yield chunk;
} else if (errorEmitted) {
throw error;
} else if (endEmitted) {
break;
} else if (closeEmitted) {
break;
} else {
await new Promise(next);
}
}
} catch (err) {
destroyImpl.destroyer(stream, err);
throw err;
} finally {
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
}
}
}

// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
Expand Down
221 changes: 0 additions & 221 deletions lib/internal/streams/async_iterator.js

This file was deleted.

1 change: 1 addition & 0 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ function undestroy() {

if (w) {
w.closed = false;
w.closeEmitted = false;
w.destroyed = false;
w.errored = null;
w.ended = false;
Expand Down
9 changes: 4 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const {

let EE;
let PassThrough;
let createReadableStreamAsyncIterator;
let Readable;

function destroyer(stream, reading, writing, callback) {
callback = once(callback);
Expand Down Expand Up @@ -113,11 +113,10 @@ function makeAsyncIterable(val) {
}

async function* fromReadable(val) {
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
if (!Readable) {
Readable = require('_stream_readable');
}
yield* createReadableStreamAsyncIterator(val);
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish) {
Expand Down
1 change: 0 additions & 1 deletion node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@
'lib/internal/worker/js_transferable.js',
'lib/internal/watchdog.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
'lib/internal/streams/duplexpair.js',
'lib/internal/streams/from.js',
Expand Down
Loading

0 comments on commit 573410f

Please sign in to comment.