diff --git a/lib/compress-stream.js b/lib/compress-stream.js index cf4f72a..f542e89 100644 --- a/lib/compress-stream.js +++ b/lib/compress-stream.js @@ -1,3 +1,11 @@ +/** + * As per the snappy framing format for streams, the size of any uncompressed chunk can be + * no longer than 65536 bytes. + * + * From: https://github.com/google/snappy/blob/main/framing_format.txt#L90:L92 + */ +const UNCOMPRESSED_CHUNK_SIZE = 65536; + var Transform = require('stream').Transform , util = require('util') @@ -50,20 +58,39 @@ CompressStream.prototype._uncompressed = function (chunk) { ) } -CompressStream.prototype._transform = function (chunk, enc, callback) { - var self = this - - snappy.compress(chunk, function (err, compressed) { - if (err) - return callback(err) - - if (compressed.length < chunk.length) - self._compressed(chunk, compressed) - else - self._uncompressed(chunk) - - callback() - }) +/** + * Some compression benchmarks : + * + * i) Chunking in transform with snappy.compressSync (the new implementation) + * ii) Chunking from outside with compressStream.write (using original snappy.compress) + * iii) No chunking (Original) + * + * | Size | in transform | compressStream.write | orginal (no chunking) | + * |-------------------|--------------|----------------------|-----------------------| + * | 10kb (1 chunk) | 0.0229 ms | 0.0385 ms | 0.0388 ms | + * | 100kb (2 chunks) | 0.0562 ms | 0.1051 ms | 0.0844 ms | + * | 1000kb (16 chunks)| 0.382 ms | 0.7971 ms | 0.1998 ms | + * + */ + +CompressStream.prototype._transform = function(chunk, enc, callback) { + new Promise(() => { + try { + for (let startFrom = 0; startFrom < chunk.length; startFrom += UNCOMPRESSED_CHUNK_SIZE) { + const endAt = startFrom + Math.min(chunk.length - startFrom, UNCOMPRESSED_CHUNK_SIZE); + const bytesChunk = chunk.slice(startFrom, endAt); + const compressed = snappy.compressSync(bytesChunk) + if (compressed.length < bytesChunk.length) + this._compressed(bytesChunk, compressed) + else + this._uncompressed(bytesChunk) + + } + callback(); + } catch (err) { + return callback(err); + } + }).catch(e => console.log(e)) } -module.exports = CompressStream \ No newline at end of file +module.exports = CompressStream diff --git a/test/compress-test.js b/test/compress-test.js index 8e0b233..d520ee2 100644 --- a/test/compress-test.js +++ b/test/compress-test.js @@ -1,10 +1,16 @@ var spawn = require('child_process').spawn - , createCompressStream = require('../').createCompressStream , test = require('tap').test , largerInput = require('fs').readFileSync(__filename) , largerInputString = largerInput.toString() +const UNCOMPRESSED_CHUNK_SIZE = 65536 +let superLargeInput = largerInput; +for (let i = largerInput.length; i <= UNCOMPRESSED_CHUNK_SIZE; i += largerInput.length) { + superLargeInput = Buffer.concat([superLargeInput, largerInput]); +} +const superLargeInputString = superLargeInput.toString(); + test('compress small string', function (t) { var child = spawn('python', [ '-m', 'snappy', '-d' ]) , compressStream = createCompressStream() @@ -48,3 +54,26 @@ test('compress large string', function (t) { compressStream.write(largerInputString) compressStream.end() }) + + +test('compress very very large string', function (t) { + var child = spawn('python', [ '-m', 'snappy', '-d' ]) + , compressStream = createCompressStream() + , data = '' + + child.stdout.on('data', function (chunk) { + data = data + chunk.toString() + }) + + child.stdout.on('end', function () { + t.equal(data, superLargeInputString) + t.end() + }) + + child.stderr.pipe(process.stderr) + + compressStream.pipe(child.stdin) + + compressStream.write(superLargeInputString) + compressStream.end() +})