From ecc7b9e4f17a9ec877fa7e52d9a4a29140b7307d Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sun, 26 Jun 2016 05:47:38 +0200 Subject: [PATCH 1/2] stream: support decoding buffers for Writables Support decoding the input of writable streams to a specific decoding before passing it to `_write()`. By default, all data written to a writable stream is encoded into Buffers. This change enables the reverse situation, i.e. when it is desired by the stream implementer to process all input as strings, whether it was passed to `write()` as a string or not. This makes sense for multi-byte character encodings where the buffers that are written using `write()` may contain partial characters, so calling `chunk.toString()` is not universally applicable. Fixes: https://github.com/nodejs/node/issues/7315 --- doc/api/stream.md | 3 + lib/_stream_writable.js | 68 ++++++++-- .../test-stream-writable-decode-buffers.js | 121 ++++++++++++++++++ 3 files changed, 181 insertions(+), 11 deletions(-) create mode 100644 test/parallel/test-stream-writable-decode-buffers.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 291cda735aee3d..b6b8d7bcdea13c 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1184,6 +1184,9 @@ constructor and implement the `writable._write()` method. The * `decodeStrings` {Boolean} Whether or not to decode strings into Buffers before passing them to [`stream._write()`][stream-_write]. Defaults to `true` + * `decodeBuffers` {Boolean} Whether or not to decode Buffers into strings + using the default encoding before passing them to + [`stream._write()`][stream-_write]. * `objectMode` {Boolean} Whether or not the [`stream.write(anyObj)`][stream-write] is a valid operation. When set, it becomes possible to write JavaScript values other than string or diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index ba56225d974fe9..deb6e0853dd028 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -11,6 +11,7 @@ const util = require('util'); const internalUtil = require('internal/util'); const Stream = require('stream'); const Buffer = require('buffer').Buffer; +var StringDecoder; util.inherits(Writable, Stream); @@ -46,6 +47,8 @@ function WritableState(options, stream) { // drain event flag. this.needDrain = false; // at the start of calling end() + this.flushing = false; + // at the start of calling endWritable() this.ending = false; // when end() has been called, and returned this.ended = false; @@ -55,6 +58,8 @@ function WritableState(options, stream) { // should we decode strings into buffers before passing to _write? // this is here so that some node-core streams can optimize string // handling at a lower level. + // This is confusingly named. For character encodings (like utf8), setting + // decodeStrings to true will *encode* strings as Buffers. var noDecode = options.decodeStrings === false; this.decodeStrings = !noDecode; @@ -63,6 +68,21 @@ function WritableState(options, stream) { // Everything else in the universe uses 'utf8', though. this.defaultEncoding = options.defaultEncoding || 'utf8'; + // A StringDecoder instance that is used for decoding incoming Buffers + // if that is desired by the stream implementer, as indicated by the + // `decodeBuffers` option. + this.stringDecoder = null; + if (options.decodeBuffers) { + // Check whether the caller explicitly requested inconsistent options. + if (options.decodeStrings === true) { + throw new Error('decodeBuffers and decodeStrings cannot both be true'); + } + + if (!StringDecoder) + StringDecoder = require('string_decoder').StringDecoder; + this.stringDecoder = new StringDecoder(this.defaultEncoding); + } + // not an actual buffer we keep track of, but a measurement // of how much we're waiting to get pushed to some underlying // socket or file. @@ -276,11 +296,27 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { }; function decodeChunk(state, chunk, encoding) { - if (!state.objectMode && - state.decodeStrings !== false && - typeof chunk === 'string') { - chunk = Buffer.from(chunk, encoding); + if (state.objectMode) + return chunk; + + var sd = state.stringDecoder; + if (typeof chunk === 'string') { + if (sd !== null && encoding === sd.encoding && sd.lastNeed === 0) + return chunk; // No re-encoding encessary. + + if (state.decodeStrings !== false || sd !== null) + chunk = Buffer.from(chunk, encoding); } + + if (sd !== null) { + // chunk is always a Buffer now. + if (state.flushing) { + chunk = sd.end(chunk); + } else { + chunk = sd.write(chunk); + } + } + return chunk; } @@ -288,11 +324,15 @@ function decodeChunk(state, chunk, encoding) { // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { - if (!isBuf) { + var sd = state.stringDecoder; + if (!isBuf || sd) { chunk = decodeChunk(state, chunk, encoding); if (chunk instanceof Buffer) encoding = 'buffer'; + else if (sd) + encoding = sd.encoding; } + var len = state.objectMode ? 1 : chunk.length; state.length += len; @@ -459,20 +499,26 @@ Writable.prototype._write = function(chunk, encoding, cb) { Writable.prototype._writev = null; -Writable.prototype.end = function(chunk, encoding, cb) { +const empty = Buffer.alloc(0); + +Writable.prototype.end = function(chunk, enc, cb) { var state = this._writableState; if (typeof chunk === 'function') { cb = chunk; chunk = null; - encoding = null; - } else if (typeof encoding === 'function') { - cb = encoding; - encoding = null; + enc = null; + } else if (typeof enc === 'function') { + cb = enc; + enc = null; } + state.flushing = true; + if (chunk !== null && chunk !== undefined) - this.write(chunk, encoding); + this.write(chunk, enc); + else if (state.stringDecoder && state.stringDecoder.lastNeed > 0) + this.write(empty); // .end() fully uncorks if (state.corked) { diff --git a/test/parallel/test-stream-writable-decode-buffers.js b/test/parallel/test-stream-writable-decode-buffers.js new file mode 100644 index 00000000000000..f785896f4a1741 --- /dev/null +++ b/test/parallel/test-stream-writable-decode-buffers.js @@ -0,0 +1,121 @@ +'use strict'; +require('../common'); +const assert = require('assert'); + +const stream = require('stream'); + +class ChunkStoringWritable extends stream.Writable { + constructor(options) { + super(options); + + this.chunks = []; + } + + _write(data, encoding, callback) { + this.chunks.push({ data, encoding }); + callback(); + } +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + w.write(Buffer.from('e4bd', 'hex')); + w.write(Buffer.from('a0e5', 'hex')); + w.write(Buffer.from('a5bd', 'hex')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['', '你', '好']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + w.write(Buffer.from('你', 'utf8')); + w.write(Buffer.from('好', 'utf8')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你', '好']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + w.write(Buffer.from('80', 'hex')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['\ufffd']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + w.write(Buffer.from('c3', 'hex')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['', '\ufffd']); +} + +{ + const w = new ChunkStoringWritable({ + decodeBuffers: true, + defaultEncoding: 'utf16le' + }); + + w.write(Buffer.from('你好', 'utf16le')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你好']); +} + +{ + const w = new ChunkStoringWritable({ + decodeBuffers: true, + defaultEncoding: 'base64' + }); + + w.write(Buffer.from('你好', 'utf16le')); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['YE99', 'WQ==']); +} + +{ + const w = new ChunkStoringWritable({ + decodeBuffers: true, + defaultEncoding: 'utf16le' + }); + + w.write('你好', 'utf16le'); + w.end(); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你好']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + + w.write(Buffer.from([0x44, 0xc3])); // Ends on incomplete UTF8. + + // This write should *not* be passed through directly as there's + // input pending. + w.write('a'); + + w.end(Buffer.from('bc7373656c', 'hex')); + + assert.deepStrictEqual(w.chunks.map((c) => c.data), ['D', '�a', '�ssel']); +} + +{ + const w = new ChunkStoringWritable({ decodeBuffers: true }); + + w.write('a'); + w.setDefaultEncoding('ucs2'); + w.end('换'); + + assert.deepStrictEqual(w.chunks, [ + { data: 'a', encoding: 'utf8' }, + { data: 'bc', encoding: 'utf8' } + ]); +} + +assert.throws(() => new stream.Writable({ + decodeBuffers: true, + decodeStrings: true +}), /decodeBuffers and decodeStrings cannot both be true/); From c4ea4e6163600e0b11960b5e5603ce4e07057af4 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 27 Jun 2016 03:53:04 +0200 Subject: [PATCH 2/2] benchmark: add writable stream encoding benchmark --- benchmark/streams/writable-simple.js | 35 ++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 benchmark/streams/writable-simple.js diff --git a/benchmark/streams/writable-simple.js b/benchmark/streams/writable-simple.js new file mode 100644 index 00000000000000..b6b4642f945492 --- /dev/null +++ b/benchmark/streams/writable-simple.js @@ -0,0 +1,35 @@ +'use strict'; + +const common = require('../common'); +const Writable = require('stream').Writable; + +const bench = common.createBenchmark(main, { + n: [50000], + inputType: ['buffer', 'string'], + inputEncoding: ['utf8', 'ucs2'], + decodeAs: ['', 'utf8', 'ucs2'] +}); + +const inputStr = `袎逜 釂鱞鸄 碄碆碃 蒰裧頖 鋑, 瞂 覮轀 蔝蓶蓨 踥踕踛 鬐鶤 鄜 忁曨曣 +翀胲胵, 鬵鵛嚪 釢髟偛 碞碠粻 漀 涾烰 跬 窱縓 墥墡嬇 禒箈箑, 餤駰 瀁瀎瀊 躆轖轕 蒛, 銙 簎艜薤 +樆樦潏 魡鳱 櫱瀯灂 鷜鷙鷵 禒箈箑 綧 駓駗, 鋡 嗛嗕塨 嶭嶴憝 爂犤繵 罫蓱 摮 灉礭蘠 蠬襱覾 脬舑莕 +躐鑏, 襆贂 漀 刲匊呥 肒芅邥 泏狔狑, 瀗犡礝 浘涀缹 輲輹 綧`; + +function main(conf) { + const n = +conf.n; + const s = new Writable({ + decodeBuffers: !!conf.decodeAs, + defaultEncoding: conf.decodeAs || undefined, + write(chunk, encoding, cb) { cb(); } + }); + + const inputEnc = conf.inputType === 'buffer' ? undefined : conf.inputEncoding; + const input = conf.inputType === 'buffer' ? + Buffer.from(inputStr, conf.inputEncoding) : inputStr; + + bench.start(); + for (var k = 0; k < n; ++k) { + s.write(input, inputEnc); + } + bench.end(n); +}