Skip to content

Commit

Permalink
stream: cleanup eos
Browse files Browse the repository at this point in the history
PR-URL: #40998
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag authored and danielleadams committed Dec 13, 2021
1 parent e292271 commit 513305c
Showing 1 changed file with 34 additions and 17 deletions.
51 changes: 34 additions & 17 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,10 @@ function eos(stream, options, callback) {

callback = once(callback);

const readable = options.readable ||
(options.readable !== false && isReadableNodeStream(stream));
const writable = options.writable ||
(options.writable !== false && isWritableNodeStream(stream));
const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);

if (isNodeStream(stream)) {
// Do nothing...
} else {
if (!isNodeStream(stream)) {
// TODO: Webstreams.
// TODO: Throw INVALID_ARG_TYPE.
}
Expand All @@ -65,7 +61,9 @@ function eos(stream, options, callback) {
const rState = stream._readableState;

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
if (!stream.writable) {
onfinish();
}
};

// TODO (ronag): Improve soft detection to include core modules and
Expand All @@ -83,10 +81,17 @@ function eos(stream, options, callback) {
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
if (stream.destroyed) willEmitClose = false;
if (stream.destroyed) {
willEmitClose = false;
}

if (willEmitClose && (!stream.readable || readable)) return;
if (!readable || readableFinished) callback.call(stream);
if (willEmitClose && (!stream.readable || readable)) {
return;
}

if (!readable || readableFinished) {
callback.call(stream);
}
};

let readableFinished = isReadableFinished(stream, false);
Expand All @@ -95,10 +100,17 @@ function eos(stream, options, callback) {
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
if (stream.destroyed) willEmitClose = false;
if (stream.destroyed) {
willEmitClose = false;
}

if (willEmitClose && (!stream.writable || writable)) return;
if (!writable || writableFinished) callback.call(stream);
if (willEmitClose && (!stream.writable || writable)) {
return;
}

if (!writable || writableFinished) {
callback.call(stream);
}
};

const onerror = (err) => {
Expand Down Expand Up @@ -139,8 +151,11 @@ function eos(stream, options, callback) {
if (!willEmitClose) {
stream.on('abort', onclose);
}
if (stream.req) onrequest();
else stream.on('request', onrequest);
if (stream.req) {
onrequest();
} else {
stream.on('request', onrequest);
}
} else if (writable && !wState) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
Expand All @@ -153,7 +168,9 @@ function eos(stream, options, callback) {

stream.on('end', onend);
stream.on('finish', onfinish);
if (options.error !== false) stream.on('error', onerror);
if (options.error !== false) {
stream.on('error', onerror);
}
stream.on('close', onclose);

if (closed) {
Expand Down

0 comments on commit 513305c

Please sign in to comment.