Skip to content

Commit

Permalink
stream: writable buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 5, 2019
1 parent 1592d0a commit 9033d42
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 152 deletions.
168 changes: 51 additions & 117 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ Object.setPrototypeOf(Writable, Stream);

function nop() {}

function bufferedDispatch(err) {
const index = this.index;
for (let i = 0; i < index; i++) {
this[i].callback(err);
this[i] = null;
}

this.splice(0, index);
this.index -= index;
this.allBuffers = this.allBuffers || this.every((request) => request.isBuf);
}

function WritableState(options, stream, isDuplex) {
options = options || {};

Expand Down Expand Up @@ -134,8 +146,10 @@ function WritableState(options, stream, isDuplex) {
// The amount that is being written when _write is called.
this.writelen = 0;

this.bufferedRequest = null;
this.lastBufferedRequest = null;
this.buffered = [];
this.buffered.index = 0;
this.buffered.allBuffers = true;
this.buffered.dispatch = bufferedDispatch.bind(this.buffered);

// Number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
Expand All @@ -153,25 +167,10 @@ function WritableState(options, stream, isDuplex) {

// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!options.autoDestroy;

// Count buffered requests
this.bufferedRequestCount = 0;

// Allocate the first CorkedRequest, there is always
// one allocated and free to use, and we maintain at most two
const corkReq = { next: null, entry: null, finish: undefined };
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
this.corkedRequestsFree = corkReq;
}

WritableState.prototype.getBuffer = function getBuffer() {
var current = this.bufferedRequest;
const out = [];
while (current) {
out.push(current);
current = current.next;
}
return out;
return this.buffered.slice(this.buffered.index);
};

Object.defineProperty(WritableState.prototype, 'buffer', {
Expand Down Expand Up @@ -314,12 +313,7 @@ Writable.prototype.uncork = function() {

if (state.corked) {
state.corked--;

if (!state.writing &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest)
clearBuffer(this, state);
clearBuffer(this, state);
}
};

Expand Down Expand Up @@ -365,7 +359,7 @@ Object.defineProperty(Writable.prototype, 'writableHighWaterMark', {
// If we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
function writeOrBuffer(stream, state, isBuf, chunk, encoding, callback) {
if (!isBuf) {
var newChunk = decodeChunk(state, chunk, encoding);
if (chunk !== newChunk) {
Expand All @@ -384,22 +378,16 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
state.needDrain = true;

if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
const buffered = state.buffered;
buffered.push({
chunk,
encoding,
isBuf,
callback: cb,
next: null
};
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;
callback,
isBuf
});
buffered.allBuffers = isBuf && buffered.allBuffers;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
doWrite(stream, state, false, len, chunk, encoding, callback);
}

return ret;
Expand All @@ -426,21 +414,13 @@ function onwriteError(stream, state, sync, er, cb) {
// Defer the callback if we are being called synchronously
// to avoid piling up things on the stack
process.nextTick(cb, er);
// This can emit finish, and it will always happen
// after error
process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
// This can emit finish, but finish must
// always follow error
finishMaybe(stream, state);
}
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
}

function onwrite(stream, er) {
Expand All @@ -462,10 +442,7 @@ function onwrite(stream, er) {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;

if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest) {
if (!finished) {
clearBuffer(stream, state);
}

Expand Down Expand Up @@ -497,67 +474,34 @@ function onwriteDrain(stream, state) {

// If there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
var entry = state.bufferedRequest;

if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
var l = state.bufferedRequestCount;
var buffer = new Array(l);
var holder = state.corkedRequestsFree;
holder.entry = entry;

var count = 0;
var allBuffers = true;
while (entry) {
buffer[count] = entry;
if (!entry.isBuf)
allBuffers = false;
entry = entry.next;
count += 1;
}
buffer.allBuffers = allBuffers;
if (state.writing || state.bufferProcessing || state.corked) {
return;
}

doWrite(stream, state, true, state.length, buffer, '', holder.finish);
const buffered = state.buffered;
const bufferedCount = buffered.length - buffered.index;

// doWrite is almost always async, defer these to save a bit of time
// as the hot path ends with doWrite
state.pendingcb++;
state.lastBufferedRequest = null;
if (holder.next) {
state.corkedRequestsFree = holder.next;
holder.next = null;
} else {
var corkReq = { next: null, entry: null, finish: undefined };
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state);
state.corkedRequestsFree = corkReq;
}
state.bufferedRequestCount = 0;
if (!bufferedCount) {
return;
}

state.bufferProcessing = true;
if (stream._writev) {
buffered.index += bufferedCount;
doWrite(stream, state, true, state.length, buffered, '', buffered.dispatch);
} else {
// Slow case, write chunks one-by-one
while (entry) {
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;

doWrite(stream, state, false, len, chunk, encoding, cb);
entry = entry.next;
state.bufferedRequestCount--;
for (const { chunk, encoding, length } of buffered) {
const len = state.objectMode ? 1 : length;
buffered.index += 1;
doWrite(stream, state, false, len, chunk, encoding, buffered.dispatch);
// If we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
break;
}
}

if (entry === null)
state.lastBufferedRequest = null;
}

state.bufferedRequest = entry;
state.bufferProcessing = false;
}

Expand Down Expand Up @@ -606,9 +550,13 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
});

function needFinish(state) {
const buffered = state.buffered;
const bufferedCount = buffered.length - buffered.index;

return (state.ending &&
state.length === 0 &&
state.bufferedRequest === null &&
bufferedCount === 0 &&
!state.errorEmitted &&
!state.finished &&
!state.writing);
}
Expand Down Expand Up @@ -670,20 +618,6 @@ function endWritable(stream, state, cb) {
stream.writable = false;
}

function onCorkedFinish(corkReq, state, err) {
var entry = corkReq.entry;
corkReq.entry = null;
while (entry) {
var cb = entry.callback;
state.pendingcb--;
cb(err);
entry = entry.next;
}

// Reuse the free corkReq.
state.corkedRequestsFree.next = corkReq;
}

Object.defineProperty(Writable.prototype, 'destroyed', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
Expand Down
54 changes: 19 additions & 35 deletions test/parallel/test-stream-writable-write-writev-finish.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,11 @@ const stream = require('stream');
cb(new Error('write test error'));
};

let firstError = false;
writable.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));

writable.on('prefinish', common.mustCall());
writable.on('finish', common.mustNotCall());
writable.on('prefinish', common.mustNotCall());

writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'write test error');
firstError = true;
}));

writable.end('test');
Expand All @@ -36,16 +31,11 @@ const stream = require('stream');
setImmediate(cb, new Error('write test error'));
};

let firstError = false;
writable.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));

writable.on('prefinish', common.mustCall());
writable.on('finish', common.mustNotCall());
writable.on('prefinish', common.mustNotCall());

writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'write test error');
firstError = true;
}));

writable.end('test');
Expand All @@ -62,16 +52,11 @@ const stream = require('stream');
cb(new Error('writev test error'));
};

let firstError = false;
writable.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));

writable.on('prefinish', common.mustCall());
writable.on('finish', common.mustNotCall());
writable.on('prefinish', common.mustNotCall());

writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'writev test error');
firstError = true;
}));

writable.cork();
Expand All @@ -93,16 +78,11 @@ const stream = require('stream');
setImmediate(cb, new Error('writev test error'));
};

let firstError = false;
writable.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));

writable.on('prefinish', common.mustCall());
writable.on('finish', common.mustNotCall());
writable.on('prefinish', common.mustNotCall());

writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'writev test error');
firstError = true;
}));

writable.cork();
Expand All @@ -123,14 +103,9 @@ const stream = require('stream');
rs._read = () => {};

const ws = new stream.Writable();
let firstError = false;

ws.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));
ws.on('error', common.mustCall(function() {
firstError = true;
}));
ws.on('finish', common.mustNotCall());
ws.on('error', common.mustCall());

ws._write = (chunk, encoding, done) => {
setImmediate(done, new Error());
Expand Down Expand Up @@ -178,3 +153,12 @@ const stream = require('stream');
});
w.end();
}

{
const w = new stream.Writable();
w._write = (chunk, encoding, cb) => {
process.nextTick(cb);
};
w.on('finish', common.mustCall());
w.end();
}

0 comments on commit 9033d42

Please sign in to comment.