Skip to content

Commit

Permalink
stream: add CompressionStream and DecompressionStream
Browse files Browse the repository at this point in the history
Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: #39348
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
jasnell committed Jul 14, 2021
1 parent 25e2f17 commit 09ad64d
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 0 deletions.
51 changes: 51 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -1217,5 +1217,56 @@ added: REPLACEME
* Type: {WritableStream}
### Class: `CompressionStream`
<!-- YAML
added: REPLACEME
-->
#### `new CompressionStream(format)`
<!-- YAML
added: REPLACEME
-->
* `format` {string} One of either `'deflate'` or `'gzip'`.
#### `compressionStream.readable`
<!-- YAML
added: REPLACEME
-->
* Type: {ReadableStream}
#### `compressionStream.writable`
<!-- YAML
added: REPLACEME
-->
* Type: {WritableStream}
### Class: `DecompressionStream`
<!-- YAML
added: REPLACEME
-->
#### `new DecompressionStream(format)`
<!-- YAML
added: REPLACEME
-->
* `format` {string} One of either `'deflate'` or `'gzip'`.
#### `decompressionStream.readable`
<!-- YAML
added: REPLACEME
-->
* Type: {ReadableStream}
#### `deccompressionStream.writable`

This comment has been minimized.

Copy link
@TrySound

TrySound Jan 24, 2022

deCcompressionStream

<!-- YAML
added: REPLACEME
-->
* Type: {WritableStream}
[Streams]: stream.md
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
164 changes: 164 additions & 0 deletions lib/internal/webstreams/compression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
'use strict';

const {
ObjectDefineProperties,
Symbol,
} = primordials;

const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_INVALID_THIS,
},
} = require('internal/errors');

const {
newReadableWritablePairFromDuplex,
} = require('internal/webstreams/adapters');

const {
customInspect,
kEnumerableProperty,
} = require('internal/webstreams/util');

const {
customInspectSymbol: kInspect,
} = require('internal/util');

let zlib;
function lazyZlib() {
zlib ??= require('zlib');
return zlib;
}

const kHandle = Symbol('kHandle');
const kTransform = Symbol('kTransform');
const kType = Symbol('kType');

/**
* @typedef {import('./readablestream').ReadableStream} ReadableStream
* @typedef {import('./writablestream').WritableStream} WritableStream
*/

function isCompressionStream(value) {
return typeof value?.[kHandle] === 'object' &&
value?.[kType] === 'CompressionStream';
}

function isDecompressionStream(value) {
return typeof value?.[kHandle] === 'object' &&
value?.[kType] === 'DecompressionStream';

This comment has been minimized.

Copy link
@jimmywarting

jimmywarting Jan 24, 2022

Not only dose the Compression and Decompression stream lacks a Symbol.toStringTag...

But i do think the way to check if it is of type x would be to do:
value?.[Symbol.toStringTag] === 'DecompressionStream';

}

class CompressionStream {
/**
* @param {'deflate'|'gzip'} format
*/
constructor(format) {
this[kType] = 'CompressionStream';
switch (format) {
case 'deflate':
this[kHandle] = lazyZlib().createDeflate();
break;
case 'gzip':
this[kHandle] = lazyZlib().createGzip();
break;
default:
throw new ERR_INVALID_ARG_VALUE('format', format);
}
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
}

/**
* @readonly
* @type {ReadableStream}
*/
get readable() {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
return this[kTransform].readable;
}

/**
* @readonly
* @type {WritableStream}
*/
get writable() {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
return this[kTransform].writable;
}

[kInspect](depth, options) {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
customInspect(depth, options, 'CompressionStream', {
readable: this[kTransform].readable,
writable: this[kTransform].writable,
});
}
}

class DecompressionStream {
/**
* @param {'deflate'|'gzip'} format
*/
constructor(format) {
this[kType] = 'DecompressionStream';
switch (format) {
case 'deflate':
this[kHandle] = lazyZlib().createInflate();
break;
case 'gzip':
this[kHandle] = lazyZlib().createGunzip();
break;
default:
throw new ERR_INVALID_ARG_VALUE('format', format);
}
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
}

/**
* @readonly
* @type {ReadableStream}
*/
get readable() {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
return this[kTransform].readable;
}

/**
* @readonly
* @type {WritableStream}
*/
get writable() {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
return this[kTransform].writable;
}

[kInspect](depth, options) {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
customInspect(depth, options, 'DecompressionStream', {
readable: this[kTransform].readable,
writable: this[kTransform].writable,
});
}
}

ObjectDefineProperties(CompressionStream.prototype, {
readable: kEnumerableProperty,
writable: kEnumerableProperty,
});

ObjectDefineProperties(DecompressionStream.prototype, {
readable: kEnumerableProperty,
writable: kEnumerableProperty,
});

module.exports = {
CompressionStream,
DecompressionStream,
};
7 changes: 7 additions & 0 deletions lib/stream/web.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const {
TextDecoderStream,
} = require('internal/webstreams/encoding');

const {
CompressionStream,
DecompressionStream,
} = require('internal/webstreams/compression');

module.exports = {
ReadableStream,
ReadableStreamDefaultReader,
Expand All @@ -52,4 +57,6 @@ module.exports = {
CountQueuingStrategy,
TextEncoderStream,
TextDecoderStream,
CompressionStream,
DecompressionStream,
};
59 changes: 59 additions & 0 deletions test/parallel/test-whatwg-webstreams-compression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Flags: --no-warnings
'use strict';

const common = require('../common');

const {
CompressionStream,
DecompressionStream,
} = require('stream/web');

const assert = require('assert');
const dec = new TextDecoder();

async function test(format) {
const gzip = new CompressionStream(format);
const gunzip = new DecompressionStream(format);

gzip.readable.pipeTo(gunzip.writable).then(common.mustCall());

const reader = gunzip.readable.getReader();
const writer = gzip.writable.getWriter();

await Promise.all([
reader.read().then(({ value, done }) => {
assert.strictEqual(dec.decode(value), 'hello');
}),
reader.read().then(({ done }) => assert(done)),
writer.write('hello'),
writer.close(),
]);
}

Promise.all(['gzip', 'deflate'].map((i) => test(i))).then(common.mustCall());

[1, 'hello', false, {}].forEach((i) => {
assert.throws(() => new CompressionStream(i), {
code: 'ERR_INVALID_ARG_VALUE',
});
assert.throws(() => new DecompressionStream(i), {
code: 'ERR_INVALID_ARG_VALUE',
});
});

assert.throws(
() => Reflect.get(CompressionStream.prototype, 'readable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(CompressionStream.prototype, 'writable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(DecompressionStream.prototype, 'readable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(DecompressionStream.prototype, 'writable', {}), {
code: 'ERR_INVALID_THIS',
});

0 comments on commit 09ad64d

Please sign in to comment.