Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: compressor didn't chunkify big payload #3

Merged
merged 8 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions lib/compress-stream.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/**
* As per the snappy framing format for streams, the size of any uncompressed chunk can be
* no longer than 65536 bytes.
*
* From: https://github.com/google/snappy/blob/main/framing_format.txt#L90:L92
*/
const UNCOMPRESSED_CHUNK_SIZE = 65536;

var Transform = require('stream').Transform
, util = require('util')

Expand Down Expand Up @@ -50,20 +58,40 @@ CompressStream.prototype._uncompressed = function (chunk) {
)
}

CompressStream.prototype._transform = function (chunk, enc, callback) {
/**
* Some compression benchmarks :
*
* i) Chunking in transform with snappy.compressSync (the new implementation)
* ii) Chunking from outside with compressStream.write (using original snappy.compress)
* iii) No chunking (Original)
*
* | Size | in transform | compressStream.write | orginal (no chunking) |
* |-------------------|--------------|----------------------|-----------------------|
* | 10kb (1 chunk) | 0.0229 ms | 0.0385 ms | 0.0388 ms |
* | 100kb (2 chunks) | 0.0562 ms | 0.1051 ms | 0.0844 ms |
* | 1000kb (16 chunks)| 0.382 ms | 0.7971 ms | 0.1998 ms |
*
*/

CompressStream.prototype._transform = function(chunk, enc, callback) {
var self = this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't need self anymore now


snappy.compress(chunk, function (err, compressed) {
if (err)
return callback(err)

if (compressed.length < chunk.length)
self._compressed(chunk, compressed)
else
self._uncompressed(chunk)

callback()
})
new Promise(() => {
try {
for (let startFrom = 0; startFrom < chunk.length; startFrom += UNCOMPRESSED_CHUNK_SIZE) {
const endAt = startFrom + Math.min(chunk.length - startFrom, UNCOMPRESSED_CHUNK_SIZE);
const bytesChunk = chunk.slice(startFrom, endAt);
const compressed = snappy.compressSync(bytesChunk)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why switch to the sync version? If we want to do that we should do it in another PR as study in depth the implications

Copy link
Author

@g11tech g11tech Apr 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dapplion the async version where I chunk bytes from outside (like call compressStream.write(chunk)) which uses the original flow, compared to this where chunking happen in the transform using this way is 2X better slower

(i.e. compressSync based solution is coming out ahead):
image
(orig-snappy is the original with data chunked at compressStream.write level)

If I use async version inside with await Promise((resolve)=>... wrap to make sure the async returned data is written in a proper serial order, its 10-5% slower because of the overhead, if I don't care about the serial order in which chunks are written (which leads to incorrect stream), then async chunking version inside transform is 5% better.

I think for now this is our best bet 🙂

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay sounds good to me. Can you commit a benchmark tho? To inform us of the cost of compressing an uncompressing objects of 10Kb, 100Kb, 1000Kb

if (compressed.length < bytesChunk.length)
self._compressed(bytesChunk, compressed)
else
self._uncompressed(bytesChunk)

}
callback();
} catch (err) {
return callback(err);
}
}).catch(e => console.log(e))
}

module.exports = CompressStream
module.exports = CompressStream
31 changes: 30 additions & 1 deletion test/compress-test.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
var spawn = require('child_process').spawn

, createCompressStream = require('../').createCompressStream
, test = require('tap').test
, largerInput = require('fs').readFileSync(__filename)
, largerInputString = largerInput.toString()

const UNCOMPRESSED_CHUNK_SIZE = 65536
let superLargeInput = largerInput;
for (let i = largerInput.length; i <= UNCOMPRESSED_CHUNK_SIZE; i += largerInput.length) {
superLargeInput = Buffer.concat([superLargeInput, largerInput]);
}
const superLargeInputString = superLargeInput.toString();

test('compress small string', function (t) {
var child = spawn('python', [ '-m', 'snappy', '-d' ])
, compressStream = createCompressStream()
Expand Down Expand Up @@ -48,3 +54,26 @@ test('compress large string', function (t) {
compressStream.write(largerInputString)
compressStream.end()
})


test('compress very very large string', function (t) {
var child = spawn('python', [ '-m', 'snappy', '-d' ])
, compressStream = createCompressStream()
, data = ''

child.stdout.on('data', function (chunk) {
data = data + chunk.toString()
})

child.stdout.on('end', function () {
t.equal(data, superLargeInputString)
t.end()
})

child.stderr.pipe(process.stderr)

compressStream.pipe(child.stdin)

compressStream.write(superLargeInputString)
compressStream.end()
})