Skip to content

Commit

Permalink
stream: improve Transform performance
Browse files Browse the repository at this point in the history
PR-URL: #13322
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
mscdex authored and addaleax committed Jul 11, 2017
1 parent b22a04b commit 6512fd7
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 37 deletions.
23 changes: 23 additions & 0 deletions benchmark/streams/transform-creation.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';
var common = require('../common.js');
var Transform = require('stream').Transform;
var inherits = require('util').inherits;

var bench = common.createBenchmark(main, {
n: [1e6]
});

function MyTransform() {
Transform.call(this);
}
inherits(MyTransform, Transform);
MyTransform.prototype._transform = function() {};

function main(conf) {
var n = +conf.n;

bench.start();
for (var i = 0; i < n; ++i)
new MyTransform();
bench.end(n);
}
67 changes: 30 additions & 37 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,41 +70,29 @@ const util = require('util');
util.inherits(Transform, Duplex);


function TransformState(stream) {
this.afterTransform = function(er, data) {
return afterTransform(stream, er, data);
};

this.needTransform = false;
this.transforming = false;
this.writecb = null;
this.writechunk = null;
this.writeencoding = null;
}

function afterTransform(stream, er, data) {
var ts = stream._transformState;
function afterTransform(er, data) {
var ts = this._transformState;
ts.transforming = false;

var cb = ts.writecb;

if (!cb) {
return stream.emit('error',
new Error('write callback called multiple times'));
return this.emit('error',
new Error('write callback called multiple times'));
}

ts.writechunk = null;
ts.writecb = null;

if (data !== null && data !== undefined)
stream.push(data);
if (data != null) // single equals check for both `null` and `undefined`
this.push(data);

cb(er);

var rs = stream._readableState;
var rs = this._readableState;
rs.reading = false;
if (rs.needReadable || rs.length < rs.highWaterMark) {
stream._read(rs.highWaterMark);
this._read(rs.highWaterMark);
}
}

Expand All @@ -115,9 +103,14 @@ function Transform(options) {

Duplex.call(this, options);

this._transformState = new TransformState(this);

var stream = this;
this._transformState = {
afterTransform: afterTransform.bind(this),
needTransform: false,
transforming: false,
writecb: null,
writechunk: null,
writeencoding: null
};

// start out asking for a readable event once data is transformed.
this._readableState.needReadable = true;
Expand All @@ -136,14 +129,17 @@ function Transform(options) {
}

// When the writable side finishes, then flush out anything remaining.
this.once('prefinish', function() {
if (typeof this._flush === 'function')
this._flush(function(er, data) {
done(stream, er, data);
});
else
done(stream);
});
this.on('prefinish', prefinish);
}

function prefinish() {
if (typeof this._flush === 'function') {
this._flush((er, data) => {
done(this, er, data);
});
} else {
done(this, null, null);
}
}

Transform.prototype.push = function(chunk, encoding) {
Expand Down Expand Up @@ -208,18 +204,15 @@ function done(stream, er, data) {
if (er)
return stream.emit('error', er);

if (data !== null && data !== undefined)
if (data != null) // single equals check for both `null` and `undefined`
stream.push(data);

// if there's nothing in the write buffer, then that means
// that nothing more will ever be provided
var ws = stream._writableState;
var ts = stream._transformState;

if (ws.length)
if (stream._writableState.length)
throw new Error('Calling transform done when ws.length != 0');

if (ts.transforming)
if (stream._transformState.transforming)
throw new Error('Calling transform done when still transforming');

return stream.push(null);
Expand Down

0 comments on commit 6512fd7

Please sign in to comment.