Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix end-of-stream for HTTP/2 #24926

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,24 @@ function eos(stream, opts, callback) {

callback = once(callback);

const ws = stream._writableState;
const rs = stream._readableState;
let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

var writableEnded = stream._writableState && stream._writableState.finished;
const onfinish = () => {
writable = false;
writableEnded = true;
if (!readable) callback.call(stream);
};

var readableEnded = stream._readableState && stream._readableState.endEmitted;
const onend = () => {
readable = false;
readableEnded = true;
if (!writable) callback.call(stream);
};

Expand All @@ -60,11 +62,16 @@ function eos(stream, opts, callback) {
};

const onclose = () => {
if (readable && !(rs && rs.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
let err;
if (readable && !readableEnded) {
if (!stream._readableState || !stream._readableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
if (writable && !(ws && ws.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
if (writable && !writableEnded) {
if (!stream._writableState || !stream._writableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
};

Expand All @@ -77,7 +84,7 @@ function eos(stream, opts, callback) {
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !ws) { // legacy streams
} else if (writable && !stream._writableState) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}
Expand Down
2 changes: 0 additions & 2 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ test-net-connect-options-port: PASS,FLAKY
test-http2-pipe: PASS,FLAKY
test-worker-syntax-error: PASS,FLAKY
test-worker-syntax-error-file: PASS,FLAKY
# https://github.com/nodejs/node/issues/24456
test-stream-pipeline-http2: PASS,FLAKY

[$system==linux]

Expand Down
39 changes: 39 additions & 0 deletions test/parallel/test-stream-pipeline-queued-end-in-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable, Duplex, pipeline } = require('stream');

// Test that the callback for pipeline() is called even when the ._destroy()
// method of the stream places an .end() request to itself that does not
// get processed before the destruction of the stream (i.e. the 'close' event).
// Refs: https://github.com/nodejs/node/issues/24456

const readable = new Readable({
read: common.mustCall(() => {})
});

const duplex = new Duplex({
write(chunk, enc, cb) {
// Simulate messages queueing up.
},
read() {},
destroy(err, cb) {
// Call end() from inside the destroy() method, like HTTP/2 streams
// do at the time of writing.
this.end();
cb(err);
}
});

duplex.on('finished', common.mustNotCall());

pipeline(readable, duplex, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));

// Write one chunk of data, and destroy the stream later.
// That should trigger the pipeline destruction.
readable.push('foo');
setImmediate(() => {
readable.destroy();
});