From bc976cef014f540b24ba92c8e1b889c9dff51bf6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 11 Aug 2019 15:29:30 +0200 Subject: [PATCH 1/9] fs: allow overring fs for streams --- doc/api/fs.md | 24 +++++++++- lib/internal/fs/streams.js | 56 +++++++++++++++++++++--- test/parallel/test-fs-read-stream.js | 13 +++++- test/parallel/test-fs-write-stream-fs.js | 38 ++++++++++++++++ 4 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 test/parallel/test-fs-write-stream-fs.js diff --git a/doc/api/fs.md b/doc/api/fs.md index 643efaae03395d..a051e4c6358e12 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -1668,6 +1668,10 @@ changes: - version: v2.3.0 pr-url: https://github.com/nodejs/node/pull/1845 description: The passed `options` object can be a string now. + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/REPLACEME + description: The `fs` options allow overriding the used `fs` + implementation. --> * `path` {string|Buffer|URL} @@ -1682,7 +1686,8 @@ changes: * `start` {integer} * `end` {integer} **Default:** `Infinity` * `highWaterMark` {integer} **Default:** `64 * 1024` -* Returns: {fs.ReadStream} + * `fs` {Object|null} **Default:** `null` +* Returns: {fs.ReadStream} See [Readable Streams][]. Unlike the 16 kb default `highWaterMark` for a readable stream, the stream returned by this method has a default `highWaterMark` of 64 kb. @@ -1709,6 +1714,10 @@ By default, the stream will not emit a `'close'` event after it has been destroyed. This is the opposite of the default for other `Readable` streams. Set the `emitClose` option to `true` to change this behavior. +By providing the `fs` option it is possible to override the corresponding `fs` +implementations for `open`, `read` and `close`. When providing the `fs` option, +you must override `open`, `close` and `read`. + ```js const fs = require('fs'); // Create a stream from some character device. @@ -1762,6 +1771,10 @@ changes: - version: v2.3.0 pr-url: https://github.com/nodejs/node/pull/1845 description: The passed `options` object can be a string now. + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/REPLACEME + description: The `fs` options allow overriding the used `fs` + implementation. --> * `path` {string|Buffer|URL} @@ -1774,7 +1787,8 @@ changes: * `autoClose` {boolean} **Default:** `true` * `emitClose` {boolean} **Default:** `false` * `start` {integer} -* Returns: {fs.WriteStream} + * `fs` {Object|null} **Default:** `null` +* Returns: {fs.WriteStream} See [Writable Stream][]. `options` may also include a `start` option to allow writing data at some position past the beginning of the file, allowed values are in the @@ -1793,6 +1807,12 @@ By default, the stream will not emit a `'close'` event after it has been destroyed. This is the opposite of the default for other `Writable` streams. Set the `emitClose` option to `true` to change this behavior. +By providing the `fs` option it is possible to override the corresponding `fs` +implementations for `open`, `write`, `writev` and `close`. Overriding `write()` +without `writev()` can reduce performance as some optimizations (`_writev()`) +will be disabled. When providing the `fs` option, you must override `open`, +`close` and at least one of `write` and `writev`. + Like [`ReadStream`][], if `fd` is specified, [`WriteStream`][] will ignore the `path` argument and will use the specified file descriptor. This means that no `'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 880828e0d7ae57..f2e5c0ec9c44f3 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -28,6 +28,7 @@ const kIoDone = Symbol('kIoDone'); const kIsPerformingIO = Symbol('kIsPerformingIO'); const kMinPoolSpace = 128; +const kFs = Symbol('kFs'); let pool; // It can happen that we expect to read a large chunk of data, and reserve @@ -76,6 +77,20 @@ function ReadStream(path, options) { options.emitClose = false; } + this[kFs] = options.fs || fs; + + if (typeof this[kFs].open !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', this[kFs].open); + } + + if (typeof this[kFs].read !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function', this[kFs].read); + } + + if (typeof this[kFs].close !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', this[kFs].close); + } + Readable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy @@ -136,7 +151,7 @@ function _openReadFs(stream) { return; } - fs.open(stream.path, stream.flags, stream.mode, (er, fd) => { + stream[kFs](stream.path, stream.flags, stream.mode, (er, fd) => { if (er) { if (stream.autoClose) { stream.destroy(); @@ -186,7 +201,7 @@ ReadStream.prototype._read = function(n) { // the actual read. this[kIsPerformingIO] = true; - fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { + this[kFs].read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) return this.emit(kIoDone, er); @@ -245,7 +260,7 @@ ReadStream.prototype._destroy = function(err, cb) { }; function closeFsStream(stream, cb, err) { - fs.close(stream.fd, (er) => { + stream[kFs].close(stream.fd, (er) => { er = er || err; cb(er); stream.closed = true; @@ -279,6 +294,35 @@ function WriteStream(path, options) { options.emitClose = false; } + this[kFs] = options.fs || fs; + if (typeof this[kFs].open !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', this[kFs].open); + } + + if (!this[kFs].write && !this[kFs].writev) { + throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', this[kFs].write); + } + + if (this[kFs].write && typeof this[kFs].write !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', this[kFs].write); + } + + if (this[kFs].writev && typeof this[kFs].writev !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function', this[kFs].writev); + } + + if (typeof this[kFs].close !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', this[kFs].close); + } + + // It's enough to override either, in which case only one will be used. + if (!this[kFs].write) { + this._write = null; + } + if (!this[kFs].writev) { + this._writev = null; + } + Writable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy @@ -335,7 +379,7 @@ function _openWriteFs(stream) { return; } - fs.open(stream.path, stream.flags, stream.mode, (er, fd) => { + stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { if (er) { if (stream.autoClose) { stream.destroy(); @@ -361,7 +405,7 @@ WriteStream.prototype._write = function(data, encoding, cb) { if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); this[kIsPerformingIO] = true; - fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { + this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) { @@ -405,7 +449,7 @@ WriteStream.prototype._writev = function(data, cb) { } this[kIsPerformingIO] = true; - fs.writev(this.fd, chunks, this.pos, (er, bytes) => { + this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) { diff --git a/test/parallel/test-fs-read-stream.js b/test/parallel/test-fs-read-stream.js index e33c6dec4ee264..05c86f724680e6 100644 --- a/test/parallel/test-fs-read-stream.js +++ b/test/parallel/test-fs-read-stream.js @@ -31,11 +31,11 @@ const fixtures = require('../common/fixtures'); const fn = fixtures.path('elipses.txt'); const rangeFile = fixtures.path('x.txt'); -{ +function test1(options) { let paused = false; let bytesRead = 0; - const file = fs.createReadStream(fn); + const file = fs.createReadStream(fn, options); const fileSize = fs.statSync(fn).size; assert.strictEqual(file.bytesRead, 0); @@ -88,6 +88,15 @@ const rangeFile = fixtures.path('x.txt'); }); } +test1({}); +test1({ + fs: { + open: common.mustCall(fs.open), + read: common.mustCallAtLeast(fs.read, 1), + close: common.mustCall(fs.close), + } +}); + { const file = fs.createReadStream(fn, { encoding: 'utf8' }); file.length = 0; diff --git a/test/parallel/test-fs-write-stream-fs.js b/test/parallel/test-fs-write-stream-fs.js new file mode 100644 index 00000000000000..2e15f57c764838 --- /dev/null +++ b/test/parallel/test-fs-write-stream-fs.js @@ -0,0 +1,38 @@ +'use strict'; +const common = require('../common'); +const path = require('path'); +const fs = require('fs'); + +const tmpdir = require('../common/tmpdir'); +tmpdir.refresh(); + +{ + const file = path.join(tmpdir.path, 'write-end-test0.txt'); + const stream = fs.createWriteStream(file, { + fs: { + open: common.mustCall(fs.open), + write: common.mustCallAtLeast(fs.write, 1), + close: common.mustCall(fs.close), + } + }); + stream.end('asd'); + stream.on('close', common.mustCall()); +} + + +{ + const file = path.join(tmpdir.path, 'write-end-test1.txt'); + const stream = fs.createWriteStream(file, { + fs: { + open: common.mustCall(fs.open), + write: fs.write, + writev: common.mustCallAtLeast(fs.writev, 1), + close: common.mustCall(fs.close), + } + }); + stream.write('asd'); + stream.write('asd'); + stream.write('asd'); + stream.end(); + stream.on('close', common.mustCall()); +} From 5123e5853fdcb7098f68ed5d9b882bf032cf7cbb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 15 Oct 2019 08:39:39 +0200 Subject: [PATCH 2/9] fixup: bad rebase --- lib/internal/fs/streams.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index f2e5c0ec9c44f3..621d3749cc6784 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -151,7 +151,7 @@ function _openReadFs(stream) { return; } - stream[kFs](stream.path, stream.flags, stream.mode, (er, fd) => { + stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { if (er) { if (stream.autoClose) { stream.destroy(); From 3d39786453f234026df38beee663d2f065e85fcf Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 15 Oct 2019 08:47:52 +0200 Subject: [PATCH 3/9] fixup: linting --- lib/internal/fs/streams.js | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 621d3749cc6784..d1617ec2abc305 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -11,8 +11,13 @@ const { } = primordials; const { +<<<<<<< HEAD ERR_OUT_OF_RANGE, ERR_STREAM_DESTROYED +======= + ERR_INVALID_ARG_TYPE, + ERR_OUT_OF_RANGE +>>>>>>> fixup: linting } = require('internal/errors').codes; const internalUtil = require('internal/util'); const { validateNumber } = require('internal/validators'); @@ -80,15 +85,18 @@ function ReadStream(path, options) { this[kFs] = options.fs || fs; if (typeof this[kFs].open !== 'function') { - throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', this[kFs].open); + throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', + this[kFs].open); } if (typeof this[kFs].read !== 'function') { - throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function', this[kFs].read); + throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function', + this[kFs].read); } if (typeof this[kFs].close !== 'function') { - throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', this[kFs].close); + throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', + this[kFs].close); } Readable.call(this, options); @@ -294,25 +302,30 @@ function WriteStream(path, options) { options.emitClose = false; } - this[kFs] = options.fs || fs; + this[kFs] = options.fs || fs; if (typeof this[kFs].open !== 'function') { - throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', this[kFs].open); + throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', + this[kFs].open); } if (!this[kFs].write && !this[kFs].writev) { - throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', this[kFs].write); + throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', + this[kFs].write); } if (this[kFs].write && typeof this[kFs].write !== 'function') { - throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', this[kFs].write); + throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', + this[kFs].write); } if (this[kFs].writev && typeof this[kFs].writev !== 'function') { - throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function', this[kFs].writev); + throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function', + this[kFs].writev); } if (typeof this[kFs].close !== 'function') { - throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', this[kFs].close); + throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', + this[kFs].close); } // It's enough to override either, in which case only one will be used. From e0cc161d0c7c8cea94cc7f464d5d81eb17691dbe Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 15 Nov 2019 21:37:23 +0100 Subject: [PATCH 4/9] fixup: missing doc refs --- doc/api/fs.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/api/fs.md b/doc/api/fs.md index a051e4c6358e12..a41dc596152089 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -5534,6 +5534,7 @@ the file contents. [`Number.MAX_SAFE_INTEGER`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER [`ReadDirectoryChangesW`]: https://docs.microsoft.com/en-us/windows/desktop/api/winbase/nf-winbase-readdirectorychangesw [`ReadStream`]: #fs_class_fs_readstream +[Readable Stream]: #stream_class_stream_readable [`URL`]: url.html#url_the_whatwg_url_api [`UV_THREADPOOL_SIZE`]: cli.html#cli_uv_threadpool_size_size [`WriteStream`]: #fs_class_fs_writestream @@ -5591,3 +5592,4 @@ the file contents. [chcp]: https://ss64.com/nt/chcp.html [inode]: https://en.wikipedia.org/wiki/Inode [support of file system `flags`]: #fs_file_system_flags +[Writable Stream]: #stream_class_stream_writable From b9f518bfd6739e568bc98f4f7d68288b588a1205 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 24 Nov 2019 15:25:28 +0100 Subject: [PATCH 5/9] Update doc/api/fs.md Co-Authored-By: Corey Farrell --- doc/api/fs.md | 2 +- lib/internal/fs/streams.js | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/doc/api/fs.md b/doc/api/fs.md index a41dc596152089..54f29dc327947c 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -1687,7 +1687,7 @@ changes: * `end` {integer} **Default:** `Infinity` * `highWaterMark` {integer} **Default:** `64 * 1024` * `fs` {Object|null} **Default:** `null` -* Returns: {fs.ReadStream} See [Readable Streams][]. +* Returns: {fs.ReadStream} See [Readable Stream][]. Unlike the 16 kb default `highWaterMark` for a readable stream, the stream returned by this method has a default `highWaterMark` of 64 kb. diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index d1617ec2abc305..8d797ff0b2368e 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -11,13 +11,9 @@ const { } = primordials; const { -<<<<<<< HEAD + ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, ERR_STREAM_DESTROYED -======= - ERR_INVALID_ARG_TYPE, - ERR_OUT_OF_RANGE ->>>>>>> fixup: linting } = require('internal/errors').codes; const internalUtil = require('internal/util'); const { validateNumber } = require('internal/validators'); From cdca4b761abf4a3b5cc57b1d6d401a0c8558a47f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 11 Dec 2019 20:50:59 +0100 Subject: [PATCH 6/9] WIP --- lib/internal/streams/destroyer.js | 27 +++++++++++++++++++++++++++ lib/stream.js | 2 ++ 2 files changed, 29 insertions(+) create mode 100644 lib/internal/streams/destroyer.js diff --git a/lib/internal/streams/destroyer.js b/lib/internal/streams/destroyer.js new file mode 100644 index 00000000000000..910ac4b6ac981e --- /dev/null +++ b/lib/internal/streams/destroyer.js @@ -0,0 +1,27 @@ +'use strict'; + +const { once } = require('util'); + +module.exports = function destroy (stream, err, cb) { + cb = once(cb); + if (stream.destroy) { + stream.destroy(err, (er) => { + process.nextTick(cb, er); + }); + } else if (stream.abort) { + stream.abort(); + } else if (stream.end) { + stream.end(); + } else { + process.nextTick(cb, new Error('invalid stream')); + return; + } + // Register event listener for streams without destroy(err, cb). + stream + .once('error', cb) + .once('aborted', () => cb(new Error('aborted'))) + .once('close', () => { + // Wait one tick in case 'error' is emitted after 'close'. + process.nextTick(cb); + }); +} diff --git a/lib/stream.js b/lib/stream.js index 725038ba9c0d1c..ad27e2943e9251 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -23,6 +23,7 @@ const pipeline = require('internal/streams/pipeline'); const eos = require('internal/streams/end-of-stream'); +const destroyer = require('internal/streams/destroyer'); const internalBuffer = require('internal/buffer'); // Note: export Stream before Readable/Writable/Duplex/... @@ -37,6 +38,7 @@ Stream.PassThrough = require('_stream_passthrough'); Stream.pipeline = pipeline; Stream.finished = eos; +Stream.destroy = destroyer; // Backwards-compat with node 0.4.x Stream.Stream = Stream; From f3f609af72f5ec16c1f390fe54427a3034ccc0c3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Dec 2019 00:27:24 +0100 Subject: [PATCH 7/9] fixup --- lib/internal/fs/streams.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 8d797ff0b2368e..867f477ef13469 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -205,7 +205,8 @@ ReadStream.prototype._read = function(n) { // the actual read. this[kIsPerformingIO] = true; - this[kFs].read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { + this[kFs].read( + this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) return this.emit(kIoDone, er); From 835a02a61ad99f34867ca1d2769240af322a5aa7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Dec 2019 08:47:08 +0100 Subject: [PATCH 8/9] fixup --- lib/internal/streams/destroyer.js | 27 --------------------------- lib/stream.js | 2 -- 2 files changed, 29 deletions(-) delete mode 100644 lib/internal/streams/destroyer.js diff --git a/lib/internal/streams/destroyer.js b/lib/internal/streams/destroyer.js deleted file mode 100644 index 910ac4b6ac981e..00000000000000 --- a/lib/internal/streams/destroyer.js +++ /dev/null @@ -1,27 +0,0 @@ -'use strict'; - -const { once } = require('util'); - -module.exports = function destroy (stream, err, cb) { - cb = once(cb); - if (stream.destroy) { - stream.destroy(err, (er) => { - process.nextTick(cb, er); - }); - } else if (stream.abort) { - stream.abort(); - } else if (stream.end) { - stream.end(); - } else { - process.nextTick(cb, new Error('invalid stream')); - return; - } - // Register event listener for streams without destroy(err, cb). - stream - .once('error', cb) - .once('aborted', () => cb(new Error('aborted'))) - .once('close', () => { - // Wait one tick in case 'error' is emitted after 'close'. - process.nextTick(cb); - }); -} diff --git a/lib/stream.js b/lib/stream.js index ad27e2943e9251..725038ba9c0d1c 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -23,7 +23,6 @@ const pipeline = require('internal/streams/pipeline'); const eos = require('internal/streams/end-of-stream'); -const destroyer = require('internal/streams/destroyer'); const internalBuffer = require('internal/buffer'); // Note: export Stream before Readable/Writable/Duplex/... @@ -38,7 +37,6 @@ Stream.PassThrough = require('_stream_passthrough'); Stream.pipeline = pipeline; Stream.finished = eos; -Stream.destroy = destroyer; // Backwards-compat with node 0.4.x Stream.Stream = Stream; From be601863ca952e7539920be429fa8c6f95fb13cc Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Dec 2019 19:18:24 +0100 Subject: [PATCH 9/9] fixup: linting --- lib/internal/fs/streams.js | 60 +++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 867f477ef13469..f18af488c30a85 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -207,41 +207,41 @@ ReadStream.prototype._read = function(n) { this[kIsPerformingIO] = true; this[kFs].read( this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { - this[kIsPerformingIO] = false; - // Tell ._destroy() that it's safe to close the fd now. - if (this.destroyed) return this.emit(kIoDone, er); + this[kIsPerformingIO] = false; + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) return this.emit(kIoDone, er); - if (er) { - if (this.autoClose) { - this.destroy(); - } - this.emit('error', er); - } else { - let b = null; - // Now that we know how much data we have actually read, re-wind the - // 'used' field if we can, and otherwise allow the remainder of our - // reservation to be used as a new pool later. - if (start + toRead === thisPool.used && thisPool === pool) { - const newUsed = thisPool.used + bytesRead - toRead; - thisPool.used = roundUpToMultipleOf8(newUsed); + if (er) { + if (this.autoClose) { + this.destroy(); + } + this.emit('error', er); } else { - // Round down to the next lowest multiple of 8 to ensure the new pool - // fragment start and end positions are aligned to an 8 byte boundary. - const alignedEnd = (start + toRead) & ~7; - const alignedStart = roundUpToMultipleOf8(start + bytesRead); - if (alignedEnd - alignedStart >= kMinPoolSpace) { - poolFragments.push(thisPool.slice(alignedStart, alignedEnd)); + let b = null; + // Now that we know how much data we have actually read, re-wind the + // 'used' field if we can, and otherwise allow the remainder of our + // reservation to be used as a new pool later. + if (start + toRead === thisPool.used && thisPool === pool) { + const newUsed = thisPool.used + bytesRead - toRead; + thisPool.used = roundUpToMultipleOf8(newUsed); + } else { + // Round down to the next lowest multiple of 8 to ensure the new pool + // fragment start and end positions are aligned to an 8 byte boundary. + const alignedEnd = (start + toRead) & ~7; + const alignedStart = roundUpToMultipleOf8(start + bytesRead); + if (alignedEnd - alignedStart >= kMinPoolSpace) { + poolFragments.push(thisPool.slice(alignedStart, alignedEnd)); + } } - } - if (bytesRead > 0) { - this.bytesRead += bytesRead; - b = thisPool.slice(start, start + bytesRead); - } + if (bytesRead > 0) { + this.bytesRead += bytesRead; + b = thisPool.slice(start, start + bytesRead); + } - this.push(b); - } - }); + this.push(b); + } + }); // Move the pool positions, and internal position for reading. if (this.pos !== undefined)