Skip to content

Commit

Permalink
stream: always invoke callback before emitting error
Browse files Browse the repository at this point in the history
Ensure the callback is always invoked before emitting
the error in both sync and async case.

PR-URL: #29293
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag authored and Trott committed Sep 30, 2019
1 parent 634a9a9 commit f663b31
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 20 deletions.
3 changes: 2 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,8 @@ The `writable.write()` method writes some data to the stream, and calls the
supplied `callback` once the data has been fully handled. If an error
occurs, the `callback` *may or may not* be called with the error as its
first argument. To reliably detect write errors, add a listener for the
`'error'` event.
`'error'` event. If `callback` is called with an error, it will be called
before the `'error'` event is emitted.

The return value is `true` if the internal buffer is less than the
`highWaterMark` configured when the stream was created after admitting `chunk`.
Expand Down
37 changes: 21 additions & 16 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ function WritableState(options, stream, isDuplex) {
// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!(options && options.autoDestroy);

// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;

// Count buffered requests
this.bufferedRequestCount = 0;

Expand Down Expand Up @@ -401,7 +406,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;

if (state.writing || state.corked) {
if (state.writing || state.corked || state.errored) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
Expand All @@ -420,7 +425,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
doWrite(stream, state, false, len, chunk, encoding, cb);
}

return ret;
// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
Expand All @@ -437,18 +444,11 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.sync = false;
}

function onwriteError(stream, state, sync, er, cb) {
function onwriteError(stream, state, er, cb) {
--state.pendingcb;

if (sync) {
// Defer the callback if we are being called synchronously
// to avoid piling up things on the stack
process.nextTick(cb, er);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
}
cb(er);
// This can emit error, but error must always follow cb.
errorOrDestroy(stream, er);
}

Expand All @@ -465,9 +465,14 @@ function onwrite(stream, er) {
state.length -= state.writelen;
state.writelen = 0;

if (er)
onwriteError(stream, state, sync, er, cb);
else {
if (er) {
state.errored = true;
if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
onwriteError(stream, state, er, cb);
}
} else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;

Expand Down Expand Up @@ -622,7 +627,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
function needFinish(state) {
return (state.ending &&
state.length === 0 &&
!state.errorEmitted &&
!state.errored &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
Expand Down
13 changes: 12 additions & 1 deletion lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;

if (w && err) {
w.errored = true;
}

if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
Expand All @@ -50,10 +54,12 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => {
const emitClose = (w && w.emitClose) || (r && r.emitClose);
if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
cb(err);
if (emitClose) {
process.nextTick(emitCloseNT, this);
}
cb(err);
} else if (needError(this, err)) {
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
} else if (emitClose) {
Expand Down Expand Up @@ -91,6 +97,7 @@ function undestroy() {

if (w) {
w.destroyed = false;
w.errored = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
Expand All @@ -110,6 +117,10 @@ function errorOrDestroy(stream, err) {
const r = stream._readableState;
const w = stream._writableState;

if (w & err) {
w.errored = true;
}

if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (needError(stream, err))
Expand Down
5 changes: 4 additions & 1 deletion test/parallel/test-http2-reset-flood.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => {
h2header.writeIntBE(1, 0, 3); // Length: 1
h2header.writeIntBE(i, 5, 4); // Stream ID
// 0x88 = :status: 200
conn.write(Buffer.concat([h2header, Buffer.from([0x88])]));
if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) {
process.nextTick(writeRequests);
break;
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ const assert = require('assert');
assert.strictEqual(write.destroyed, true);
}

{
const write = new Writable({
write(chunk, enc, cb) {
this.destroy(new Error('asd'));
cb();
}
});

write.on('error', common.mustCall());
write.on('finish', common.mustNotCall());
write.end('asd');
assert.strictEqual(write.destroyed, true);
}

{
const write = new Writable({
write(chunk, enc, cb) { cb(); }
Expand Down
58 changes: 58 additions & 0 deletions test/parallel/test-stream-writable-write-cb-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'use strict';
const common = require('../common');
const { Writable } = require('stream');
const assert = require('assert');

// Ensure callback is always invoked before
// error is emitted. Regardless if error was
// sync or async.

{
let callbackCalled = false;
// Sync Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
cb(new Error());
})
});
writable.on('error', common.mustCall(() => {
assert.strictEqual(callbackCalled, true);
}));
writable.write('hi', common.mustCall(() => {
callbackCalled = true;
}));
}

{
let callbackCalled = false;
// Async Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
process.nextTick(cb, new Error());
})
});
writable.on('error', common.mustCall(() => {
assert.strictEqual(callbackCalled, true);
}));
writable.write('hi', common.mustCall(() => {
callbackCalled = true;
}));
}

{
// Sync Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
cb(new Error());
})
});

writable.on('error', common.mustCall());

let cnt = 0;
// Ensure we don't live lock on sync error
while (writable.write('a'))
cnt++;

assert.strictEqual(cnt, 0);
}
6 changes: 5 additions & 1 deletion test/parallel/test-wrap-js-stream-exceptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ const socket = new JSStreamWrap(new Duplex({
})
}));

assert.throws(() => socket.end('foo'), /Error: write EPROTO/);
socket.end('foo');
socket.on('error', common.expectsError({
type: Error,
message: 'write EPROTO'
}));

0 comments on commit f663b31

Please sign in to comment.