From 8263fd15869751ff0ae090f40eefd8fe34a5ac5c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 22 Nov 2019 19:13:00 +0100 Subject: [PATCH 1/6] stream: invoke buffered write callbacks on error Buffered write callbacks were only invoked upon error of `autoDestroy` is invoked. --- lib/_stream_writable.js | 18 ++++++++++----- test/parallel/test-stream-writable-destroy.js | 22 +++++++++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index a9f7164dc73b13..eef27c271de7b9 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -467,6 +467,7 @@ function onwriteError(stream, state, er, cb) { --state.pendingcb; cb(er); + errorBuffer(state, new ERR_STREAM_DESTROYED('write')); // This can emit error, but error must always follow cb. errorOrDestroy(stream, er); } @@ -541,6 +542,16 @@ function afterWrite(stream, state, count, cb) { finishMaybe(stream, state); } +// If there's something in the buffer waiting, then invoke callbacks. +function errorBuffer (state, err) { + for (let entry = state.bufferedRequest; entry; entry = entry.next) { + process.nextTick(entry.callback, err); + } + state.bufferedRequest = null; + state.lastBufferedRequest = null; + state.bufferedRequestCount = 0; +} + // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { state.bufferProcessing = true; @@ -817,12 +828,7 @@ const destroy = destroyImpl.destroy; Writable.prototype.destroy = function(err, cb) { const state = this._writableState; if (!state.destroyed) { - for (let entry = state.bufferedRequest; entry; entry = entry.next) { - process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write')); - } - state.bufferedRequest = null; - state.lastBufferedRequest = null; - state.bufferedRequestCount = 0; + errorBuffer(state, new ERR_STREAM_DESTROYED('write')); } destroy.call(this, err, cb); return this; diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 30e4503c05773a..f50363d45fbc7c 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -344,3 +344,25 @@ const assert = require('assert'); })); write.destroy(new Error('asd')); } + +{ + // Call buffered write callback with error + + const write = new Writable({ + write(chunk, enc, cb) { + process.nextTick(cb, new Error('asd')); + }, + autoDestroy: false + }); + write.cork(); + write.write('asd', common.mustCall(err => { + assert.strictEqual(err.message, 'asd'); + })); + write.write('asd', common.mustCall(err => { + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + })); + write.on('error', common.mustCall(err => { + assert.strictEqual(err.message, 'asd'); + })) + write.uncork(); +} From 614ba71a5d4b8488d0db4963926d0b5f949720e2 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 22 Nov 2019 19:54:08 +0100 Subject: [PATCH 2/6] fixup: comment + linting --- lib/_stream_writable.js | 6 +++++- test/parallel/test-stream-writable-destroy.js | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index eef27c271de7b9..fcdba220e7420b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -467,6 +467,10 @@ function onwriteError(stream, state, er, cb) { --state.pendingcb; cb(er); + // Ensure callbacks are invoked even when autoDestroy is + // not enabled. Passing `er` here doesn't make sense since + // it's related to one specific write, not to the buffered + // writes. errorBuffer(state, new ERR_STREAM_DESTROYED('write')); // This can emit error, but error must always follow cb. errorOrDestroy(stream, er); @@ -543,7 +547,7 @@ function afterWrite(stream, state, count, cb) { } // If there's something in the buffer waiting, then invoke callbacks. -function errorBuffer (state, err) { +function errorBuffer(state, err) { for (let entry = state.bufferedRequest; entry; entry = entry.next) { process.nextTick(entry.callback, err); } diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index f50363d45fbc7c..353a6b6b9e2dc6 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -355,14 +355,14 @@ const assert = require('assert'); autoDestroy: false }); write.cork(); - write.write('asd', common.mustCall(err => { + write.write('asd', common.mustCall((err) => { assert.strictEqual(err.message, 'asd'); })); - write.write('asd', common.mustCall(err => { + write.write('asd', common.mustCall((err) => { assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); })); - write.on('error', common.mustCall(err => { + write.on('error', common.mustCall((err) => { assert.strictEqual(err.message, 'asd'); - })) + })); write.uncork(); } From c4fd5021294e6f057cfa0c82fb0edb4804026055 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 22 Nov 2019 19:55:51 +0100 Subject: [PATCH 3/6] fixup: don't use nextTick if not needed --- lib/_stream_writable.js | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index fcdba220e7420b..e13f332ad64a92 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -471,7 +471,7 @@ function onwriteError(stream, state, er, cb) { // not enabled. Passing `er` here doesn't make sense since // it's related to one specific write, not to the buffered // writes. - errorBuffer(state, new ERR_STREAM_DESTROYED('write')); + errorBuffer(state, new ERR_STREAM_DESTROYED('write'), false); // This can emit error, but error must always follow cb. errorOrDestroy(stream, er); } @@ -547,9 +547,13 @@ function afterWrite(stream, state, count, cb) { } // If there's something in the buffer waiting, then invoke callbacks. -function errorBuffer(state, err) { +function errorBuffer(state, err, sync) { for (let entry = state.bufferedRequest; entry; entry = entry.next) { - process.nextTick(entry.callback, err); + if (sync) { + process.nextTick(entry.callback, err); + } else { + entry.callback(err); + } } state.bufferedRequest = null; state.lastBufferedRequest = null; @@ -832,7 +836,7 @@ const destroy = destroyImpl.destroy; Writable.prototype.destroy = function(err, cb) { const state = this._writableState; if (!state.destroyed) { - errorBuffer(state, new ERR_STREAM_DESTROYED('write')); + errorBuffer(state, new ERR_STREAM_DESTROYED('write'), true); } destroy.call(this, err, cb); return this; From 764e41a8a506b9783d423582068de215d78bf37d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 23 Nov 2019 12:02:55 +0100 Subject: [PATCH 4/6] fixup: state.length --- lib/_stream_writable.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index e13f332ad64a92..fc205fc6cbe45b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -555,6 +555,7 @@ function errorBuffer(state, err, sync) { entry.callback(err); } } + state.length = 0; state.bufferedRequest = null; state.lastBufferedRequest = null; state.bufferedRequestCount = 0; From cf95bbd38ea156d6a3472b26e63a0f8302410bb9 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 23 Nov 2019 14:58:48 +0100 Subject: [PATCH 5/6] fixup: state.length --- lib/_stream_writable.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index fc205fc6cbe45b..9b6c944bb39fa9 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -549,13 +549,14 @@ function afterWrite(stream, state, count, cb) { // If there's something in the buffer waiting, then invoke callbacks. function errorBuffer(state, err, sync) { for (let entry = state.bufferedRequest; entry; entry = entry.next) { + const len = state.objectMode ? 1 : entry.chunk.length; + state.length -= len; if (sync) { process.nextTick(entry.callback, err); } else { entry.callback(err); } } - state.length = 0; state.bufferedRequest = null; state.lastBufferedRequest = null; state.bufferedRequestCount = 0; From 771a0559d94bec28d0b7fc775cfb0ed294fa0a8f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 23 Nov 2019 15:09:46 +0100 Subject: [PATCH 6/6] stream: error queued callbacks after active write has completed --- lib/_stream_writable.js | 20 +++++++++++------- test/parallel/test-stream-writable-destroy.js | 21 +++++++++++++++++++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 9b6c944bb39fa9..32f027bcf18bbd 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -471,7 +471,7 @@ function onwriteError(stream, state, er, cb) { // not enabled. Passing `er` here doesn't make sense since // it's related to one specific write, not to the buffered // writes. - errorBuffer(state, new ERR_STREAM_DESTROYED('write'), false); + errorBuffer(state, new ERR_STREAM_DESTROYED('write')); // This can emit error, but error must always follow cb. errorOrDestroy(stream, er); } @@ -543,19 +543,23 @@ function afterWrite(stream, state, count, cb) { cb(); } + if (state.destroyed) { + errorBuffer(state, new ERR_STREAM_DESTROYED('write')); + } + finishMaybe(stream, state); } // If there's something in the buffer waiting, then invoke callbacks. -function errorBuffer(state, err, sync) { +function errorBuffer(state, err) { + if (state.writing || !state.bufferedRequest) { + return; + } + for (let entry = state.bufferedRequest; entry; entry = entry.next) { const len = state.objectMode ? 1 : entry.chunk.length; state.length -= len; - if (sync) { - process.nextTick(entry.callback, err); - } else { - entry.callback(err); - } + entry.callback(err); } state.bufferedRequest = null; state.lastBufferedRequest = null; @@ -838,7 +842,7 @@ const destroy = destroyImpl.destroy; Writable.prototype.destroy = function(err, cb) { const state = this._writableState; if (!state.destroyed) { - errorBuffer(state, new ERR_STREAM_DESTROYED('write'), true); + process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write')); } destroy.call(this, err, cb); return this; diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 353a6b6b9e2dc6..408616c8a27f91 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -366,3 +366,24 @@ const assert = require('assert'); })); write.uncork(); } + +{ + // Ensure callback order. + + let state = 0; + const write = new Writable({ + write(chunk, enc, cb) { + // `setImmediate()` is used on purpose to ensure the callback is called + // after `process.nextTick()` callbacks. + setImmediate(cb); + } + }); + write.write('asd', common.mustCall(() => { + assert.strictEqual(state++, 0); + })); + write.write('asd', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + assert.strictEqual(state++, 1); + })); + write.destroy(); +}