From 518ffc125680f0916635d2ed97c076dbff3bd05b Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 3 Jul 2019 12:38:05 +0200 Subject: [PATCH] zlib: do not coalesce multiple `.flush()` calls This is an approach to address the issue linked below. Previously, when `.write()` and `.flush()` calls to a zlib stream were interleaved synchronously (i.e. without waiting for these operations to finish), multiple flush calls would have been coalesced into a single flushing operation. This patch changes behaviour so that each `.flush()` all corresponds to one flushing operation on the underlying zlib resource, and the order of operations is as if the `.flush()` call were a `.write()` call. One test had to be removed because it specifically tested the previous behaviour. As a drive-by fix, this also makes sure that all flush callbacks are called. Previously, that was not the case. Fixes: https://github.com/nodejs/node/issues/28478 PR-URL: https://github.com/nodejs/node/pull/28520 Reviewed-By: Rich Trott Reviewed-By: Ruben Bridgewater Reviewed-By: Luigi Pinca --- lib/zlib.js | 35 +++++++----- .../test-zlib-flush-multiple-scheduled.js | 39 ------------- .../test-zlib-flush-write-sync-interleaved.js | 57 +++++++++++++++++++ test/parallel/test-zlib-write-after-flush.js | 1 - 4 files changed, 79 insertions(+), 53 deletions(-) delete mode 100644 test/parallel/test-zlib-flush-multiple-scheduled.js create mode 100644 test/parallel/test-zlib-flush-write-sync-interleaved.js diff --git a/lib/zlib.js b/lib/zlib.js index a8a1e09a7068e1..72144cdc01db16 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -49,6 +49,8 @@ const { } = require('buffer'); const { owner_symbol } = require('internal/async_hooks').symbols; +const kFlushFlag = Symbol('kFlushFlag'); + const constants = internalBinding('constants').zlib; const { // Zlib flush levels @@ -261,7 +263,6 @@ function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) { this._chunkSize = chunkSize; this._defaultFlushFlag = flush; this._finishFlushFlag = finishFlush; - this._nextFlush = -1; this._defaultFullFlushFlag = fullFlush; this.once('end', this.close); this._info = opts && opts.info; @@ -308,13 +309,16 @@ ZlibBase.prototype._flush = function(callback) { // If a flush is scheduled while another flush is still pending, a way to figure // out which one is the "stronger" flush is needed. +// This is currently only used to figure out which flush flag to use for the +// last chunk. // Roughly, the following holds: // Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH < // Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH const flushiness = []; let i = 0; -for (const flushFlag of [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH, - Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]) { +const kFlushFlagList = [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH, + Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]; +for (const flushFlag of kFlushFlagList) { flushiness[flushFlag] = i++; } @@ -322,7 +326,18 @@ function maxFlush(a, b) { return flushiness[a] > flushiness[b] ? a : b; } -const flushBuffer = Buffer.alloc(0); +// Set up a list of 'special' buffers that can be written using .write() +// from the .flush() code as a way of introducing flushing operations into the +// write sequence. +const kFlushBuffers = []; +{ + const dummyArrayBuffer = new ArrayBuffer(); + for (const flushFlag of kFlushFlagList) { + kFlushBuffers[flushFlag] = Buffer.from(dummyArrayBuffer); + kFlushBuffers[flushFlag][kFlushFlag] = flushFlag; + } +} + ZlibBase.prototype.flush = function(kind, callback) { const ws = this._writableState; @@ -337,13 +352,8 @@ ZlibBase.prototype.flush = function(kind, callback) { } else if (ws.ending) { if (callback) this.once('end', callback); - } else if (this._nextFlush !== -1) { - // This means that there is a flush currently in the write queue. - // We currently coalesce this flush into the pending one. - this._nextFlush = maxFlush(this._nextFlush, kind); } else { - this._nextFlush = kind; - this.write(flushBuffer, '', callback); + this.write(kFlushBuffers[kind], '', callback); } }; @@ -361,9 +371,8 @@ ZlibBase.prototype._transform = function(chunk, encoding, cb) { var flushFlag = this._defaultFlushFlag; // We use a 'fake' zero-length chunk to carry information about flushes from // the public API to the actual stream implementation. - if (chunk === flushBuffer) { - flushFlag = this._nextFlush; - this._nextFlush = -1; + if (typeof chunk[kFlushFlag] === 'number') { + flushFlag = chunk[kFlushFlag]; } // For the last chunk, also apply `_finishFlushFlag`. diff --git a/test/parallel/test-zlib-flush-multiple-scheduled.js b/test/parallel/test-zlib-flush-multiple-scheduled.js deleted file mode 100644 index 0b752557e441bc..00000000000000 --- a/test/parallel/test-zlib-flush-multiple-scheduled.js +++ /dev/null @@ -1,39 +0,0 @@ -'use strict'; - -const common = require('../common'); -const assert = require('assert'); -const zlib = require('zlib'); - -const { - Z_PARTIAL_FLUSH, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH -} = zlib.constants; - -async function getOutput(...sequenceOfFlushes) { - const zipper = zlib.createGzip({ highWaterMark: 16384 }); - - zipper.write('A'.repeat(17000)); - for (const flush of sequenceOfFlushes) { - zipper.flush(flush); - } - - const data = []; - - return new Promise((resolve) => { - zipper.on('data', common.mustCall((d) => { - data.push(d); - if (data.length === 2) resolve(Buffer.concat(data)); - }, 2)); - }); -} - -(async function() { - assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH), - await getOutput(Z_SYNC_FLUSH, Z_PARTIAL_FLUSH)); - assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH), - await getOutput(Z_PARTIAL_FLUSH, Z_SYNC_FLUSH)); - - assert.deepStrictEqual(await getOutput(Z_FINISH), - await getOutput(Z_FULL_FLUSH, Z_FINISH)); - assert.deepStrictEqual(await getOutput(Z_FINISH), - await getOutput(Z_SYNC_FLUSH, Z_FINISH)); -})(); diff --git a/test/parallel/test-zlib-flush-write-sync-interleaved.js b/test/parallel/test-zlib-flush-write-sync-interleaved.js new file mode 100644 index 00000000000000..9fed592a34bb1b --- /dev/null +++ b/test/parallel/test-zlib-flush-write-sync-interleaved.js @@ -0,0 +1,57 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { createGzip, createGunzip, Z_PARTIAL_FLUSH } = require('zlib'); + +// Verify that .flush() behaves like .write() in terms of ordering, e.g. in +// a sequence like .write() + .flush() + .write() + .flush() each .flush() call +// only affects the data written before it. +// Refs: https://github.com/nodejs/node/issues/28478 + +const compress = createGzip(); +const decompress = createGunzip(); +decompress.setEncoding('utf8'); + +const events = []; +const compressedChunks = []; + +for (const chunk of ['abc', 'def', 'ghi']) { + compress.write(chunk, common.mustCall(() => events.push({ written: chunk }))); + compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => { + events.push('flushed'); + const chunk = compress.read(); + if (chunk !== null) + compressedChunks.push(chunk); + })); +} + +compress.end(common.mustCall(() => { + events.push('compress end'); + writeToDecompress(); +})); + +function writeToDecompress() { + // Write the compressed chunks to a decompressor, one by one, in order to + // verify that the flushes actually worked. + const chunk = compressedChunks.shift(); + if (chunk === undefined) return decompress.end(); + decompress.write(chunk, common.mustCall(() => { + events.push({ read: decompress.read() }); + writeToDecompress(); + })); +} + +process.on('exit', () => { + assert.deepStrictEqual(events, [ + { written: 'abc' }, + 'flushed', + { written: 'def' }, + 'flushed', + { written: 'ghi' }, + 'flushed', + 'compress end', + { read: 'abc' }, + { read: 'def' }, + { read: 'ghi' } + ]); +}); diff --git a/test/parallel/test-zlib-write-after-flush.js b/test/parallel/test-zlib-write-after-flush.js index 2fcae2a2139768..6edcae2e2f18bf 100644 --- a/test/parallel/test-zlib-write-after-flush.js +++ b/test/parallel/test-zlib-write-after-flush.js @@ -39,7 +39,6 @@ for (const [ createCompress, createDecompress ] of [ gunz.on('data', (c) => output += c); gunz.on('end', common.mustCall(() => { assert.strictEqual(output, input); - assert.strictEqual(gzip._nextFlush, -1); })); // Make sure that flush/write doesn't trigger an assert failure