Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use whatwg streams #103

Merged
merged 6 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 6 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ npm install fetch-blob
- internal Buffer.from was replaced with TextEncoder/Decoder
- internal buffers was replaced with Uint8Arrays
- CommonJS was replaced with ESM
- The node stream returned by calling `blob.stream()` was replaced with a simple generator function that yields Uint8Array (Breaking change)
(Read "Differences from other blobs" for more info.)

All of this changes have made it dependency free of any core node modules, so it would be possible to just import it using http-import from a CDN without any bundling
- The node stream returned by calling `blob.stream()` was replaced with whatwg streams
- (Read "Differences from other blobs" for more info.)

</details>

Expand All @@ -36,48 +34,12 @@ npm install fetch-blob
- This blob version is more arbitrary, it can be constructed with blob parts that isn't a instance of itself
it has to look and behave as a blob to be accepted as a blob part.
- The benefit of this is that you can create other types of blobs that don't contain any internal data that has to be read in other ways, such as the `BlobDataItem` created in `from.js` that wraps a file path into a blob-like item and read lazily (nodejs plans to [implement this][fs-blobs] as well)
- The `blob.stream()` is the most noticeable differences. It returns a AsyncGeneratorFunction that yields Uint8Arrays

The reasoning behind `Blob.prototype.stream()` is that NodeJS readable stream
isn't spec compatible with whatwg streams and we didn't want to import the hole whatwg stream polyfill for node
or browserify NodeJS streams for the browsers and picking any flavor over the other. So we decided to opted out
of any stream and just implement the bear minium of what both streams have in common which is the asyncIterator
that both yields Uint8Array. this is the most isomorphic way with the use of `for-await-of` loops.
It would be redundant to convert anything to whatwg streams and than convert it back to
node streams since you work inside of Node.
It will probably stay like this until nodejs get native support for whatwg<sup>[1][https://github.com/nodejs/whatwg-stream]</sup> streams and whatwg stream add the node
equivalent for `Readable.from(iterable)`<sup>[2](https://github.com/whatwg/streams/issues/1018)</sup>

But for now if you really need a Node Stream then you can do so using this transformation
- The `blob.stream()` is the most noticeable differences. It returns a WHATWG stream now. to keep it as a node stream you would have to do:

```js
import {Readable} from 'stream'
const stream = Readable.from(blob.stream())
```
But if you don't need it to be a stream then you can just use the asyncIterator part of it that is isomorphic.
```js
for await (const chunk of blob.stream()) {
console.log(chunk) // uInt8Array
}
```
If you need to make some feature detection to fix this different behavior
```js
if (Blob.prototype.stream?.constructor?.name === 'AsyncGeneratorFunction') {
// not spec compatible, monkey patch it...
// (Alternative you could extend the Blob and use super.stream())
let orig = Blob.prototype.stream
Blob.prototype.stream = function () {
const iterator = orig.call(this)
return new ReadableStream({
async pull (ctrl) {
const next = await iterator.next()
return next.done ? ctrl.close() : ctrl.enqueue(next.value)
}
})
}
}
```
Possible feature whatwg version: `ReadableStream.from(iterator)`
It's also possible to delete this method and instead use `.slice()` and `.arrayBuffer()` since it has both a public and private stream method
</details>

## Usage
Expand All @@ -100,12 +62,8 @@ const blob = new Blob(['hello, world'])
await blob.text()
await blob.arrayBuffer()
for await (let chunk of blob.stream()) { ... }

// turn the async iterator into a node stream
stream.Readable.from(blob.stream())

// turn the async iterator into a whatwg stream (feature)
globalThis.ReadableStream.from(blob.stream())
blob.stream().getReader().read()
blob.stream().getReader({mode: 'byob'}).read(view)
```

### Blob part backed up by filesystem
Expand Down
15 changes: 7 additions & 8 deletions from.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import {statSync, createReadStream} from 'fs';
import {stat} from 'fs/promises';
import {statSync, createReadStream, promises as fs} from 'fs';
import {basename} from 'path';
import File from './file.js';
import Blob from './index.js';
import {MessageChannel} from 'worker_threads';

const {stat} = fs;

const DOMException = globalThis.DOMException || (() => {
const port = new MessageChannel().port1
const ab = new ArrayBuffer(0)
Expand Down Expand Up @@ -86,12 +87,10 @@ class BlobDataItem {
if (mtimeMs > this.lastModified) {
throw new DOMException('The requested file could not be read, typically due to permission problems that have occurred after a reference to a file was acquired.', 'NotReadableError');
}
if (this.size) {
yield * createReadStream(this.#path, {
start: this.#start,
end: this.#start + this.size - 1
});
}
yield * createReadStream(this.#path, {
start: this.#start,
end: this.#start + this.size - 1
});
}

get [Symbol.toStringTag]() {
Expand Down
78 changes: 63 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,43 @@

// TODO (jimmywarting): in the feature use conditional loading with top level await (requires 14.x)
// Node has recently added whatwg stream into core, want to use that instead when it becomes avalible.

import * as stream from 'web-streams-polyfill/dist/ponyfill.es2018.js'

const ReadableStream = globalThis.ReadableStream || stream.ReadableStream
const ByteLengthQueuingStrategy = globalThis.ByteLengthQueuingStrategy || stream.ReadableStream

/** @typedef {import('buffer').Blob} NodeBlob} */

// Fix buffer.Blob's missing stream implantation
import('buffer').then(m => {
if (m.Blob && !m.Blob.prototype.stream) {
m.Blob.prototype.stream = function name(params) {
let position = 0;
const blob = this;
const stratergy = new ByteLengthQueuingStrategy({ highWaterMark: POOL_SIZE });

return new ReadableStream({
type: "bytes",
async pull(ctrl) {
const chunk = blob.slice(position, Math.min(blob.size, position + POOL_SIZE));
const buffer = await chunk.arrayBuffer();
position += buffer.byteLength;
ctrl.enqueue(new Uint8Array(buffer))

if (position === blob.size) {
ctrl.close()
}
}
}, stratergy)
}
}
}, () => {})

// 64 KiB (same size chrome slice theirs blob into Uint8array's)
const POOL_SIZE = 65536;

/** @param {(Blob | Uint8Array)[]} parts */
/** @param {(Blob | NodeBlob | Uint8Array)[]} parts */
async function * toIterator (parts, clone = true) {
for (let part of parts) {
if ('stream' in part) {
Expand All @@ -20,6 +56,7 @@ async function * toIterator (parts, clone = true) {
yield part;
}
} else {
/* c8 ignore start */
// For blobs that have arrayBuffer but no stream method (nodes buffer.Blob)
let position = 0;
while (position !== part.size) {
Expand All @@ -28,6 +65,7 @@ async function * toIterator (parts, clone = true) {
position += buffer.byteLength;
yield new Uint8Array(buffer);
}
/* c8 ignore end */
}
}
}
Expand Down Expand Up @@ -116,6 +154,11 @@ export default class Blob {
* @return {Promise<ArrayBuffer>}
*/
async arrayBuffer() {
// Easier way... Just a unnecessary overhead
// const view = new Uint8Array(this.size);
// await this.stream().getReader({mode: 'byob'}).read(view);
// return view.buffer;

const data = new Uint8Array(this.size);
let offset = 0;
for await (const chunk of toIterator(this.#parts, false)) {
Expand All @@ -126,14 +169,17 @@ export default class Blob {
return data.buffer;
}

/**
* The Blob stream() implements partial support of the whatwg stream
* by only being async iterable.
*
* @returns {AsyncGenerator<Uint8Array>}
*/
async * stream() {
yield * toIterator(this.#parts, true);
stream() {
const it = toIterator(this.#parts, true);
const stratergy = new ByteLengthQueuingStrategy({ highWaterMark: POOL_SIZE });

return new ReadableStream({
type: "bytes",
async pull(ctrl) {
const chunk = await it.next();
chunk.done ? ctrl.close() : ctrl.enqueue(chunk.value);
}
}, stratergy)
}

/**
Expand All @@ -157,6 +203,11 @@ export default class Blob {
let added = 0;

for (const part of parts) {
// don't add the overflow to new blobParts
if (added >= span) {
break;
}

const size = ArrayBuffer.isView(part) ? part.byteLength : part.size;
if (relativeStart && size <= relativeStart) {
// Skip the beginning and change the relative
Expand All @@ -174,11 +225,6 @@ export default class Blob {
}
blobParts.push(chunk);
relativeStart = 0; // All next sequential parts should start at 0

// don't add the overflow to new blobParts
if (added >= span) {
break;
}
}
}

Expand All @@ -195,7 +241,9 @@ export default class Blob {

static [Symbol.hasInstance](object) {
return (
typeof object?.constructor === 'function' &&
object &&
typeof object === 'object' &&
typeof object.constructor === 'function' &&
(
typeof object.stream === 'function' ||
typeof object.arrayBuffer === 'function'
Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,8 @@
"type": "paypal",
"url": "https://paypal.me/jimmywarting"
}
]
],
"dependencies": {
"web-streams-polyfill": "^3.0.3"
}
}
12 changes: 9 additions & 3 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ test('blob part backed up by filesystem', async t => {
test('Reading after modified should fail', async t => {
const blob = blobFromSync('./LICENSE');
await new Promise(resolve => {
setTimeout(resolve, 100);
setTimeout(resolve, 500);
});
fs.closeSync(fs.openSync('./LICENSE', 'a'));
const error = await t.throwsAsync(blob.text());
Expand All @@ -174,7 +174,7 @@ test('Reading after modified should fail', async t => {
// The lastModifiedDate is deprecated and removed from spec
t.false('lastModifiedDate' in file);
const mod = file.lastModified - Date.now();
t.true(mod <= 0 && mod >= -100); // Close to tolerance: 0.100ms
t.true(mod <= 0 && mod >= -500); // Close to tolerance: 0.500ms
});

test('Reading file after modified should fail', async t => {
Expand Down Expand Up @@ -241,7 +241,7 @@ test('Parts are immutable', async t => {
test('Blobs are immutable', async t => {
const buf = new Uint8Array([97]);
const blob = new Blob([buf]);
const chunk = await blob.stream().next();
const chunk = await blob.stream().getReader().read();
t.is(chunk.value[0], 97);
chunk.value[0] = 98;
t.is(await blob.text(), 'a');
Expand Down Expand Up @@ -344,3 +344,9 @@ test('new File() throws with too few args', t => {
message: 'Failed to construct \'File\': 2 arguments required, but only 0 present.'
});
});

test('can slice zero sized blobs', async t => {
const blob = new Blob();
const txt = await blob.slice(0, 0).text();
t.is(txt, '');
});