From 9033d4277ce67b0eff61d494cba205dcdcf8d427 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 5 Aug 2019 15:43:02 +0200 Subject: [PATCH] stream: writable buffering --- lib/_stream_writable.js | 168 ++++++------------ ...est-stream-writable-write-writev-finish.js | 54 ++---- 2 files changed, 70 insertions(+), 152 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index bf9abeeed81d45..dec076797ee43a 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -53,6 +53,18 @@ Object.setPrototypeOf(Writable, Stream); function nop() {} +function bufferedDispatch(err) { + const index = this.index; + for (let i = 0; i < index; i++) { + this[i].callback(err); + this[i] = null; + } + + this.splice(0, index); + this.index -= index; + this.allBuffers = this.allBuffers || this.every((request) => request.isBuf); +} + function WritableState(options, stream, isDuplex) { options = options || {}; @@ -134,8 +146,10 @@ function WritableState(options, stream, isDuplex) { // The amount that is being written when _write is called. this.writelen = 0; - this.bufferedRequest = null; - this.lastBufferedRequest = null; + this.buffered = []; + this.buffered.index = 0; + this.buffered.allBuffers = true; + this.buffered.dispatch = bufferedDispatch.bind(this.buffered); // Number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted @@ -153,25 +167,10 @@ function WritableState(options, stream, isDuplex) { // Should .destroy() be called after 'finish' (and potentially 'end') this.autoDestroy = !!options.autoDestroy; - - // Count buffered requests - this.bufferedRequestCount = 0; - - // Allocate the first CorkedRequest, there is always - // one allocated and free to use, and we maintain at most two - const corkReq = { next: null, entry: null, finish: undefined }; - corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this); - this.corkedRequestsFree = corkReq; } WritableState.prototype.getBuffer = function getBuffer() { - var current = this.bufferedRequest; - const out = []; - while (current) { - out.push(current); - current = current.next; - } - return out; + return this.buffered.slice(this.buffered.index); }; Object.defineProperty(WritableState.prototype, 'buffer', { @@ -314,12 +313,7 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; - - if (!state.writing && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) - clearBuffer(this, state); + clearBuffer(this, state); } }; @@ -365,7 +359,7 @@ Object.defineProperty(Writable.prototype, 'writableHighWaterMark', { // If we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. -function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { +function writeOrBuffer(stream, state, isBuf, chunk, encoding, callback) { if (!isBuf) { var newChunk = decodeChunk(state, chunk, encoding); if (chunk !== newChunk) { @@ -384,22 +378,16 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { state.needDrain = true; if (state.writing || state.corked) { - var last = state.lastBufferedRequest; - state.lastBufferedRequest = { + const buffered = state.buffered; + buffered.push({ chunk, encoding, - isBuf, - callback: cb, - next: null - }; - if (last) { - last.next = state.lastBufferedRequest; - } else { - state.bufferedRequest = state.lastBufferedRequest; - } - state.bufferedRequestCount += 1; + callback, + isBuf + }); + buffered.allBuffers = isBuf && buffered.allBuffers; } else { - doWrite(stream, state, false, len, chunk, encoding, cb); + doWrite(stream, state, false, len, chunk, encoding, callback); } return ret; @@ -426,21 +414,13 @@ function onwriteError(stream, state, sync, er, cb) { // Defer the callback if we are being called synchronously // to avoid piling up things on the stack process.nextTick(cb, er); - // This can emit finish, and it will always happen - // after error - process.nextTick(finishMaybe, stream, state); - stream._writableState.errorEmitted = true; - errorOrDestroy(stream, er); } else { // The caller expect this to happen before if // it is async cb(er); - stream._writableState.errorEmitted = true; - errorOrDestroy(stream, er); - // This can emit finish, but finish must - // always follow error - finishMaybe(stream, state); } + stream._writableState.errorEmitted = true; + errorOrDestroy(stream, er); } function onwrite(stream, er) { @@ -462,10 +442,7 @@ function onwrite(stream, er) { // Check if we're actually ready to finish, but don't emit yet var finished = needFinish(state) || stream.destroyed; - if (!finished && - !state.corked && - !state.bufferProcessing && - state.bufferedRequest) { + if (!finished) { clearBuffer(stream, state); } @@ -497,67 +474,34 @@ function onwriteDrain(stream, state) { // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { - state.bufferProcessing = true; - var entry = state.bufferedRequest; - - if (stream._writev && entry && entry.next) { - // Fast case, write everything using _writev() - var l = state.bufferedRequestCount; - var buffer = new Array(l); - var holder = state.corkedRequestsFree; - holder.entry = entry; - - var count = 0; - var allBuffers = true; - while (entry) { - buffer[count] = entry; - if (!entry.isBuf) - allBuffers = false; - entry = entry.next; - count += 1; - } - buffer.allBuffers = allBuffers; + if (state.writing || state.bufferProcessing || state.corked) { + return; + } - doWrite(stream, state, true, state.length, buffer, '', holder.finish); + const buffered = state.buffered; + const bufferedCount = buffered.length - buffered.index; - // doWrite is almost always async, defer these to save a bit of time - // as the hot path ends with doWrite - state.pendingcb++; - state.lastBufferedRequest = null; - if (holder.next) { - state.corkedRequestsFree = holder.next; - holder.next = null; - } else { - var corkReq = { next: null, entry: null, finish: undefined }; - corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state); - state.corkedRequestsFree = corkReq; - } - state.bufferedRequestCount = 0; + if (!bufferedCount) { + return; + } + + state.bufferProcessing = true; + if (stream._writev) { + buffered.index += bufferedCount; + doWrite(stream, state, true, state.length, buffered, '', buffered.dispatch); } else { // Slow case, write chunks one-by-one - while (entry) { - var chunk = entry.chunk; - var encoding = entry.encoding; - var cb = entry.callback; - var len = state.objectMode ? 1 : chunk.length; - - doWrite(stream, state, false, len, chunk, encoding, cb); - entry = entry.next; - state.bufferedRequestCount--; + for (const { chunk, encoding, length } of buffered) { + const len = state.objectMode ? 1 : length; + buffered.index += 1; + doWrite(stream, state, false, len, chunk, encoding, buffered.dispatch); // If we didn't call the onwrite immediately, then // it means that we need to wait until it does. - // also, that means that the chunk and cb are currently - // being processed, so move the buffer counter past them. if (state.writing) { break; } } - - if (entry === null) - state.lastBufferedRequest = null; } - - state.bufferedRequest = entry; state.bufferProcessing = false; } @@ -606,9 +550,13 @@ Object.defineProperty(Writable.prototype, 'writableLength', { }); function needFinish(state) { + const buffered = state.buffered; + const bufferedCount = buffered.length - buffered.index; + return (state.ending && state.length === 0 && - state.bufferedRequest === null && + bufferedCount === 0 && + !state.errorEmitted && !state.finished && !state.writing); } @@ -670,20 +618,6 @@ function endWritable(stream, state, cb) { stream.writable = false; } -function onCorkedFinish(corkReq, state, err) { - var entry = corkReq.entry; - corkReq.entry = null; - while (entry) { - var cb = entry.callback; - state.pendingcb--; - cb(err); - entry = entry.next; - } - - // Reuse the free corkReq. - state.corkedRequestsFree.next = corkReq; -} - Object.defineProperty(Writable.prototype, 'destroyed', { // Making it explicit this property is not enumerable // because otherwise some prototype manipulation in diff --git a/test/parallel/test-stream-writable-write-writev-finish.js b/test/parallel/test-stream-writable-write-writev-finish.js index 4399f1ca503b37..cbd760780e3667 100644 --- a/test/parallel/test-stream-writable-write-writev-finish.js +++ b/test/parallel/test-stream-writable-write-writev-finish.js @@ -14,16 +14,11 @@ const stream = require('stream'); cb(new Error('write test error')); }; - let firstError = false; - writable.on('finish', common.mustCall(function() { - assert.strictEqual(firstError, true); - })); - - writable.on('prefinish', common.mustCall()); + writable.on('finish', common.mustNotCall()); + writable.on('prefinish', common.mustNotCall()); writable.on('error', common.mustCall((er) => { assert.strictEqual(er.message, 'write test error'); - firstError = true; })); writable.end('test'); @@ -36,16 +31,11 @@ const stream = require('stream'); setImmediate(cb, new Error('write test error')); }; - let firstError = false; - writable.on('finish', common.mustCall(function() { - assert.strictEqual(firstError, true); - })); - - writable.on('prefinish', common.mustCall()); + writable.on('finish', common.mustNotCall()); + writable.on('prefinish', common.mustNotCall()); writable.on('error', common.mustCall((er) => { assert.strictEqual(er.message, 'write test error'); - firstError = true; })); writable.end('test'); @@ -62,16 +52,11 @@ const stream = require('stream'); cb(new Error('writev test error')); }; - let firstError = false; - writable.on('finish', common.mustCall(function() { - assert.strictEqual(firstError, true); - })); - - writable.on('prefinish', common.mustCall()); + writable.on('finish', common.mustNotCall()); + writable.on('prefinish', common.mustNotCall()); writable.on('error', common.mustCall((er) => { assert.strictEqual(er.message, 'writev test error'); - firstError = true; })); writable.cork(); @@ -93,16 +78,11 @@ const stream = require('stream'); setImmediate(cb, new Error('writev test error')); }; - let firstError = false; - writable.on('finish', common.mustCall(function() { - assert.strictEqual(firstError, true); - })); - - writable.on('prefinish', common.mustCall()); + writable.on('finish', common.mustNotCall()); + writable.on('prefinish', common.mustNotCall()); writable.on('error', common.mustCall((er) => { assert.strictEqual(er.message, 'writev test error'); - firstError = true; })); writable.cork(); @@ -123,14 +103,9 @@ const stream = require('stream'); rs._read = () => {}; const ws = new stream.Writable(); - let firstError = false; - ws.on('finish', common.mustCall(function() { - assert.strictEqual(firstError, true); - })); - ws.on('error', common.mustCall(function() { - firstError = true; - })); + ws.on('finish', common.mustNotCall()); + ws.on('error', common.mustCall()); ws._write = (chunk, encoding, done) => { setImmediate(done, new Error()); @@ -178,3 +153,12 @@ const stream = require('stream'); }); w.end(); } + +{ + const w = new stream.Writable(); + w._write = (chunk, encoding, cb) => { + process.nextTick(cb); + }; + w.on('finish', common.mustCall()); + w.end(); +}