Skip to content

Commit

Permalink
zlib: do not coalesce multiple .flush() calls
Browse files Browse the repository at this point in the history
This is an approach to address the issue linked below. Previously,
when `.write()` and `.flush()` calls to a zlib stream were interleaved
synchronously (i.e. without waiting for these operations to finish),
multiple flush calls would have been coalesced into a single flushing
operation.

This patch changes behaviour so that each `.flush()` all corresponds
to one flushing operation on the underlying zlib resource, and the
order of operations is as if the `.flush()` call were a `.write()`
call.

One test had to be removed because it specifically tested the previous
behaviour.

As a drive-by fix, this also makes sure that all flush callbacks are
called. Previously, that was not the case.

Fixes: #28478

PR-URL: #28520
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
addaleax committed Jul 14, 2019
1 parent efc3946 commit 518ffc1
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 53 deletions.
35 changes: 22 additions & 13 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const {
} = require('buffer');
const { owner_symbol } = require('internal/async_hooks').symbols;

const kFlushFlag = Symbol('kFlushFlag');

const constants = internalBinding('constants').zlib;
const {
// Zlib flush levels
Expand Down Expand Up @@ -261,7 +263,6 @@ function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
this._chunkSize = chunkSize;
this._defaultFlushFlag = flush;
this._finishFlushFlag = finishFlush;
this._nextFlush = -1;
this._defaultFullFlushFlag = fullFlush;
this.once('end', this.close);
this._info = opts && opts.info;
Expand Down Expand Up @@ -308,21 +309,35 @@ ZlibBase.prototype._flush = function(callback) {

// If a flush is scheduled while another flush is still pending, a way to figure
// out which one is the "stronger" flush is needed.
// This is currently only used to figure out which flush flag to use for the
// last chunk.
// Roughly, the following holds:
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
const flushiness = [];
let i = 0;
for (const flushFlag of [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]) {
const kFlushFlagList = [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH];
for (const flushFlag of kFlushFlagList) {
flushiness[flushFlag] = i++;
}

function maxFlush(a, b) {
return flushiness[a] > flushiness[b] ? a : b;
}

const flushBuffer = Buffer.alloc(0);
// Set up a list of 'special' buffers that can be written using .write()
// from the .flush() code as a way of introducing flushing operations into the
// write sequence.
const kFlushBuffers = [];
{
const dummyArrayBuffer = new ArrayBuffer();
for (const flushFlag of kFlushFlagList) {
kFlushBuffers[flushFlag] = Buffer.from(dummyArrayBuffer);
kFlushBuffers[flushFlag][kFlushFlag] = flushFlag;
}
}

ZlibBase.prototype.flush = function(kind, callback) {
const ws = this._writableState;

Expand All @@ -337,13 +352,8 @@ ZlibBase.prototype.flush = function(kind, callback) {
} else if (ws.ending) {
if (callback)
this.once('end', callback);
} else if (this._nextFlush !== -1) {
// This means that there is a flush currently in the write queue.
// We currently coalesce this flush into the pending one.
this._nextFlush = maxFlush(this._nextFlush, kind);
} else {
this._nextFlush = kind;
this.write(flushBuffer, '', callback);
this.write(kFlushBuffers[kind], '', callback);
}
};

Expand All @@ -361,9 +371,8 @@ ZlibBase.prototype._transform = function(chunk, encoding, cb) {
var flushFlag = this._defaultFlushFlag;
// We use a 'fake' zero-length chunk to carry information about flushes from
// the public API to the actual stream implementation.
if (chunk === flushBuffer) {
flushFlag = this._nextFlush;
this._nextFlush = -1;
if (typeof chunk[kFlushFlag] === 'number') {
flushFlag = chunk[kFlushFlag];
}

// For the last chunk, also apply `_finishFlushFlag`.
Expand Down
39 changes: 0 additions & 39 deletions test/parallel/test-zlib-flush-multiple-scheduled.js

This file was deleted.

57 changes: 57 additions & 0 deletions test/parallel/test-zlib-flush-write-sync-interleaved.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { createGzip, createGunzip, Z_PARTIAL_FLUSH } = require('zlib');

// Verify that .flush() behaves like .write() in terms of ordering, e.g. in
// a sequence like .write() + .flush() + .write() + .flush() each .flush() call
// only affects the data written before it.
// Refs: https://github.com/nodejs/node/issues/28478

const compress = createGzip();
const decompress = createGunzip();
decompress.setEncoding('utf8');

const events = [];
const compressedChunks = [];

for (const chunk of ['abc', 'def', 'ghi']) {
compress.write(chunk, common.mustCall(() => events.push({ written: chunk })));
compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => {
events.push('flushed');
const chunk = compress.read();
if (chunk !== null)
compressedChunks.push(chunk);
}));
}

compress.end(common.mustCall(() => {
events.push('compress end');
writeToDecompress();
}));

function writeToDecompress() {
// Write the compressed chunks to a decompressor, one by one, in order to
// verify that the flushes actually worked.
const chunk = compressedChunks.shift();
if (chunk === undefined) return decompress.end();
decompress.write(chunk, common.mustCall(() => {
events.push({ read: decompress.read() });
writeToDecompress();
}));
}

process.on('exit', () => {
assert.deepStrictEqual(events, [
{ written: 'abc' },
'flushed',
{ written: 'def' },
'flushed',
{ written: 'ghi' },
'flushed',
'compress end',
{ read: 'abc' },
{ read: 'def' },
{ read: 'ghi' }
]);
});
1 change: 0 additions & 1 deletion test/parallel/test-zlib-write-after-flush.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ for (const [ createCompress, createDecompress ] of [
gunz.on('data', (c) => output += c);
gunz.on('end', common.mustCall(() => {
assert.strictEqual(output, input);
assert.strictEqual(gzip._nextFlush, -1);
}));

// Make sure that flush/write doesn't trigger an assert failure
Expand Down

0 comments on commit 518ffc1

Please sign in to comment.