Skip to content

Commit

Permalink
stream: always invoke end callback
Browse files Browse the repository at this point in the history
Ensure that the callback passed into end() is always invoke in
order to avoid bug such as deadlock the user.

PR-URL: #29747
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
  • Loading branch information
ronag authored and addaleax committed Nov 19, 2019
1 parent 535e957 commit 9d09969
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 4 deletions.
34 changes: 30 additions & 4 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -611,11 +611,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
}

// Ignore unnecessary end() calls.
if (!state.ending)
if (!state.ending) {
endWritable(this, state, cb);
else if (typeof cb === 'function') {
} else if (typeof cb === 'function') {
if (!state.finished) {
this.once('finish', cb);
onFinished(this, state, cb);
} else {
cb(new ERR_STREAM_ALREADY_FINISHED('end'));
}
Expand Down Expand Up @@ -695,7 +695,7 @@ function endWritable(stream, state, cb) {
if (state.finished)
process.nextTick(cb);
else
stream.once('finish', cb);
onFinished(stream, state, cb);
}
state.ended = true;
stream.writable = false;
Expand All @@ -715,6 +715,32 @@ function onCorkedFinish(corkReq, state, err) {
state.corkedRequestsFree.next = corkReq;
}

function onFinished(stream, state, cb) {
if (state.destroyed && state.errorEmitted) {
// TODO(ronag): Backwards compat. Should be moved to end() without
// errorEmitted check and with errorOrDestroy.
const err = new ERR_STREAM_DESTROYED('end');
process.nextTick(cb, err);
return;
}

function onerror(err) {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb(err);
if (stream.listenerCount('error') === 0) {
stream.emit('error', err);
}
}
function onfinish() {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb();
}
stream.on('finish', onfinish);
stream.prependListener('error', onerror);
}

Object.defineProperty(Writable.prototype, 'destroyed', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
52 changes: 52 additions & 0 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,55 @@ const assert = require('assert');
}));
write.uncork();
}

{
// Call end(cb) after error & destroy

const write = new Writable({
write(chunk, enc, cb) { cb(new Error('asd')); }
});
write.on('error', common.mustCall(() => {
write.destroy();
let ticked = false;
write.end(common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
ticked = true;
}));
write.write('asd');
}

{
// Call end(cb) after finish & destroy

const write = new Writable({
write(chunk, enc, cb) { cb(); }
});
write.on('finish', common.mustCall(() => {
write.destroy();
let ticked = false;
write.end(common.mustCall((err) => {
assert.strictEqual(ticked, false);
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
ticked = true;
}));
write.end();
}

{
// Call end(cb) after error & destroy and don't trigger
// unhandled exception.

const write = new Writable({
write(chunk, enc, cb) { process.nextTick(cb); }
});
write.once('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
write.end('asd', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
write.destroy(new Error('asd'));
}
48 changes: 48 additions & 0 deletions test/parallel/test-stream-writable-end-cb-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

{
// Invoke end callback on failure.
const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
process.nextTick(cb, new Error('kaboom'));
};

writable.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
}

{
// Don't invoke end callback twice
const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
process.nextTick(cb);
};

let called = false;
writable.end('asd', common.mustCall((err) => {
called = true;
assert.strictEqual(err, undefined);
}));

writable.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
writable.on('finish', common.mustCall(() => {
assert.strictEqual(called, true);
writable.emit('error', new Error('kaboom'));
}));
}
23 changes: 23 additions & 0 deletions test/parallel/test-stream-writable-end-cb-uncaugth.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));

const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
cb();
};
writable._final = (cb) => {
cb(new Error('kaboom'));
};

writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));

0 comments on commit 9d09969

Please sign in to comment.