Skip to content

Commit

Permalink
streams: add cork option to pipe
Browse files Browse the repository at this point in the history
Adds an option to .pipe to cork it before each write and
then uncork it next tick, based on discussion at
nodejs/readable-stream#145
  • Loading branch information
calvinmetcalf committed Jun 19, 2015
1 parent c5353d7 commit 25b8da1
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 1 deletion.
4 changes: 3 additions & 1 deletion doc/api/stream.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ readable.isPaused() // === false
* `destination` {[Writable][] Stream} The destination for writing data
* `options` {Object} Pipe options
* `end` {Boolean} End the writer when the reader ends. Default = `true`
* `cork` {Boolean} Before each write, cork the stream and then uncork it on the
next tick in order to consolidate writes. Default = `false`

This method pulls all the data out of a readable stream, and writes it
to the supplied destination, automatically managing the flow so that
Expand Down Expand Up @@ -1315,7 +1317,7 @@ for examples and testing, but there are occasionally use cases where
it can come in handy as a building block for novel sorts of streams.


## Simplified Constructor API
## Simplified Constructor API

<!--type=misc-->

Expand Down
20 changes: 20 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest !== process.stdout &&
dest !== process.stderr;

var autoCork = pipeOpts && pipeOpts.cork && (typeof dest.cork === 'function');
var corked = false;

var endFn = doEnd ? onend : cleanup;
if (state.endEmitted)
process.nextTick(endFn);
Expand Down Expand Up @@ -515,9 +518,26 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
ondrain();
}

function maybeCork() {
if (!autoCork || corked) {
debug('already corked');
return;
}
debug('corking');
corked = true;
dest.cork();
process.nextTick(uncork);
}
function uncork() {
debug('uncorking');
corked = false;
dest.uncork();
}
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
maybeCork();
debug('corks=%d', dest._writableState.corked);
var ret = dest.write(chunk);
if (false === ret) {
debug('false write response, pause',
Expand Down
176 changes: 176 additions & 0 deletions test/parallel/test-stream-pipe-cork.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
'use strict';
var common = require('../common');
var assert = require('assert');

var stream = require('stream');
// tiny node-tap lookalike.
var tests = [];
var count = 0;

function test(name, fn) {
count++;
tests.push([name, fn]);
}

function run() {
var next = tests.shift();
if (!next)
return console.error('ok');

var name = next[0];
var fn = next[1];
console.log('# %s', name);
fn({
same: assert.deepEqual,
equal: assert.equal,
end: function() {
count--;
run();
}
});
}

// ensure all tests have run
process.on('exit', function() {
assert.equal(count, 0);
});

process.nextTick(run);
test('all sync', function (t) {
var counter = 0;
var expectCount = 0;
function cnt(msg) {
expectCount++;
var expect = expectCount;
var called = false;
return function(er) {
if (er)
throw er;
called = true;
counter++;
t.equal(counter, expect);
};
}
var p = new stream.PassThrough();
var w = new stream.Writable();
w._write = function(chunk, e, cb) {
assert(false, 'Should not call _write');
};
p.pipe(w, {
cork: true
});
var expectChunks =
[
{ encoding: 'buffer',
chunk: [104, 101, 108, 108, 111, 44, 32] },
{ encoding: 'buffer',
chunk: [119, 111, 114, 108, 100] },
{ encoding: 'buffer',
chunk: [33] },
{ encoding: 'buffer',
chunk: [10, 97, 110, 100, 32, 116, 104, 101, 110, 46, 46, 46] },
{ encoding: 'buffer',
chunk: [250, 206, 190, 167, 222, 173, 190, 239, 222, 202, 251, 173]}
];

var actualChunks;
w._writev = function(chunks, cb) {
actualChunks = chunks.map(function(chunk) {
return {
encoding: chunk.encoding,
chunk: Buffer.isBuffer(chunk.chunk) ?
Array.prototype.slice.call(chunk.chunk) : chunk.chunk
};
});
cb();
};
p.write('hello, ', 'ascii', cnt('hello'));
p.write('world', 'utf8', cnt('world'));

p.write(new Buffer('!'), 'buffer', cnt('!'));

p.write('\nand then...', 'binary', cnt('and then'));
p.write('facebea7deadbeefdecafbad', 'hex', cnt('hex'));

p.end(cnt('end'));


w.on('finish', function() {
// make sure finish comes after all the write cb
cnt('finish')();
t.same(expectChunks, actualChunks);
t.end();
});
});
test('2 groups', function (t) {
var counter = 0;
var expectCount = 0;
function cnt(msg) {
expectCount++;
var expect = expectCount;
var called = false;
return function(er) {
if (er)
throw er;
called = true;
counter++;
t.equal(counter, expect);
};
}
var p = new stream.PassThrough();
var w = new stream.Writable();
w._write = function(chunk, e, cb) {
assert(false, 'Should not call _write');
};
p.pipe(w, {
cork: true
});
var expectChunks = [
[
{ encoding: 'buffer',
chunk: [104, 101, 108, 108, 111, 44, 32] },
{ encoding: 'buffer',
chunk: [119, 111, 114, 108, 100] }
],[
{ encoding: 'buffer',
chunk: [33] },
{ encoding: 'buffer',
chunk: [10, 97, 110, 100, 32, 116, 104, 101, 110, 46, 46, 46] },
{ encoding: 'buffer',
chunk: [250, 206, 190, 167, 222, 173, 190, 239, 222, 202, 251, 173]}
]];

var actualChunks = [];
var called = 0;
w._writev = function(chunks, cb) {
actualChunks.push(chunks.map(function(chunk) {
return {
encoding: chunk.encoding,
chunk: Buffer.isBuffer(chunk.chunk) ?
Array.prototype.slice.call(chunk.chunk) : chunk.chunk
};
}));
cb();
};
p.write('hello, ', 'ascii', cnt('hello'));
p.write('world', 'utf8', cnt('world'));
process.nextTick(function () {
p.write(new Buffer('!'), 'buffer', cnt('!'));

p.write('\nand then...', 'binary', cnt('and then'));
p.write('facebea7deadbeefdecafbad', 'hex', cnt('hex'));

p.end(cnt('end'));
});

w.on('finish', function() {
// make sure finish comes after all the write cb
cnt('finish')();
console.log('expected');
console.log(expectChunks);
console.log('actual');
console.log(actualChunks);
t.same(expectChunks, actualChunks);
t.end();
});
});

0 comments on commit 25b8da1

Please sign in to comment.