diff --git a/lib/zlib.js b/lib/zlib.js index 80ffb962b38c6c..35d998df26af3f 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -32,11 +32,11 @@ const kRangeErrorMessage = 'Cannot create final Buffer. It would be larger ' + const constants = process.binding('constants').zlib; const { - Z_NO_FLUSH, Z_BLOCK, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH, Z_MIN_CHUNK, - Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_MIN_LEVEL, Z_MAX_LEVEL, Z_MIN_MEMLEVEL, - Z_MAX_MEMLEVEL, Z_DEFAULT_CHUNK, Z_DEFAULT_COMPRESSION, Z_DEFAULT_STRATEGY, - Z_DEFAULT_WINDOWBITS, Z_DEFAULT_MEMLEVEL, Z_FIXED, DEFLATE, DEFLATERAW, - INFLATE, INFLATERAW, GZIP, GUNZIP, UNZIP + Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH, + Z_MIN_CHUNK, Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_MIN_LEVEL, Z_MAX_LEVEL, + Z_MIN_MEMLEVEL, Z_MAX_MEMLEVEL, Z_DEFAULT_CHUNK, Z_DEFAULT_COMPRESSION, + Z_DEFAULT_STRATEGY, Z_DEFAULT_WINDOWBITS, Z_DEFAULT_MEMLEVEL, Z_FIXED, + DEFLATE, DEFLATERAW, INFLATE, INFLATERAW, GZIP, GUNZIP, UNZIP } = constants; const { inherits } = require('util'); @@ -254,6 +254,7 @@ function Zlib(opts, mode) { this._strategy = strategy; this._chunkSize = chunkSize; this._flushFlag = flush; + this._scheduledFlushFlag = Z_NO_FLUSH; this._origFlushFlag = flush; this._finishFlushFlag = finishFlush; this._info = opts && opts.info; @@ -299,6 +300,22 @@ Zlib.prototype._flush = function _flush(callback) { this._transform(Buffer.alloc(0), '', callback); }; +// If a flush is scheduled while another flush is still pending, a way to figure +// out which one is the "stronger" flush is needed. +// Roughly, the following holds: +// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH < +// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH +const flushiness = []; +let i = 0; +for (const flushFlag of [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH, + Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]) { + flushiness[flushFlag] = i++; +} + +function maxFlush(a, b) { + return flushiness[a] > flushiness[b] ? a : b; +} + Zlib.prototype.flush = function flush(kind, callback) { var ws = this._writableState; @@ -314,13 +331,20 @@ Zlib.prototype.flush = function flush(kind, callback) { if (callback) this.once('end', callback); } else if (ws.needDrain) { - if (callback) { - const drainHandler = () => this.flush(kind, callback); + const alreadyHadFlushScheduled = this._scheduledFlushFlag !== Z_NO_FLUSH; + this._scheduledFlushFlag = maxFlush(kind, this._scheduledFlushFlag); + + // If a callback was passed, always register a new `drain` + flush handler, + // mostly because that’s simpler and flush callbacks piling up is a rare + // thing anyway. + if (!alreadyHadFlushScheduled || callback) { + const drainHandler = () => this.flush(this._scheduledFlushFlag, callback); this.once('drain', drainHandler); } } else { this._flushFlag = kind; this.write(Buffer.alloc(0), '', callback); + this._scheduledFlushFlag = Z_NO_FLUSH; } }; diff --git a/test/parallel/test-zlib-flush-drain-longblock.js b/test/parallel/test-zlib-flush-drain-longblock.js new file mode 100644 index 00000000000000..94d1d9d04d9369 --- /dev/null +++ b/test/parallel/test-zlib-flush-drain-longblock.js @@ -0,0 +1,27 @@ +'use strict'; + +// Regression test for https://github.com/nodejs/node/issues/14523. +// Checks that flushes interact properly with writableState.needDrain, +// even if no flush callback was passed. + +const common = require('../common'); +const assert = require('assert'); +const zlib = require('zlib'); + +const zipper = zlib.createGzip({ highWaterMark: 16384 }); +const unzipper = zlib.createGunzip(); +zipper.pipe(unzipper); + +zipper.write('A'.repeat(17000)); +zipper.flush(); + +let received = 0; +unzipper.on('data', common.mustCall((d) => { + received += d.length; +}, 2)); + +// Properly `.end()`ing the streams would interfere with checking that +// `.flush()` works. +process.on('exit', () => { + assert.strictEqual(received, 17000); +}); diff --git a/test/parallel/test-zlib-flush-multiple-scheduled.js b/test/parallel/test-zlib-flush-multiple-scheduled.js new file mode 100644 index 00000000000000..19548672389fde --- /dev/null +++ b/test/parallel/test-zlib-flush-multiple-scheduled.js @@ -0,0 +1,41 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const zlib = require('zlib'); + +const { + Z_PARTIAL_FLUSH, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH +} = zlib.constants; + +common.crashOnUnhandledRejection(); + +async function getOutput(...sequenceOfFlushes) { + const zipper = zlib.createGzip({ highWaterMark: 16384 }); + + zipper.write('A'.repeat(17000)); + for (const flush of sequenceOfFlushes) { + zipper.flush(flush); + } + + const data = []; + + return new Promise((resolve) => { + zipper.on('data', common.mustCall((d) => { + data.push(d); + if (data.length === 2) resolve(Buffer.concat(data)); + }, 2)); + }); +} + +(async function() { + assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH), + await getOutput(Z_SYNC_FLUSH, Z_PARTIAL_FLUSH)); + assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH), + await getOutput(Z_PARTIAL_FLUSH, Z_SYNC_FLUSH)); + + assert.deepStrictEqual(await getOutput(Z_FINISH), + await getOutput(Z_FULL_FLUSH, Z_FINISH)); + assert.deepStrictEqual(await getOutput(Z_FINISH), + await getOutput(Z_SYNC_FLUSH, Z_FINISH)); +})();