Skip to content

Commit

Permalink
Migrate from the Node to the Web ReadableStream (#8410)
Browse files Browse the repository at this point in the history
  • Loading branch information
dlarocque committed Aug 14, 2024
1 parent cfca9c6 commit 6b0ca77
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 47 deletions.
6 changes: 6 additions & 0 deletions .changeset/itchy-boxes-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'firebase': minor
'@firebase/storage': minor
---

Migrate from the Node to Web ReadableStream interface
2 changes: 1 addition & 1 deletion common/api-review/storage.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export function getMetadata(ref: StorageReference): Promise<FullMetadata>;
export function getStorage(app?: FirebaseApp, bucketUrl?: string): FirebaseStorage;

// @public
export function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): NodeJS.ReadableStream;
export function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): ReadableStream;

// @internal (undocumented)
export function _invalidArgument(message: string): StorageError;
Expand Down
4 changes: 2 additions & 2 deletions docs-devsite/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ This API is only available in Node.
<b>Signature:</b>

```typescript
export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): NodeJS.ReadableStream;
export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): ReadableStream;
```

#### Parameters
Expand All @@ -291,7 +291,7 @@ export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?:

<b>Returns:</b>

NodeJS.ReadableStream
ReadableStream

A stream with the object's data as bytes

Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/api.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ export function getBlob(
export function getStream(
ref: StorageReference,
maxDownloadSizeBytes?: number
): NodeJS.ReadableStream {
): ReadableStream {
throw new Error('getStream() is only supported by NodeJS builds');
}
2 changes: 1 addition & 1 deletion packages/storage/src/api.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export function getBlob(
export function getStream(
ref: StorageReference,
maxDownloadSizeBytes?: number
): NodeJS.ReadableStream {
): ReadableStream {
ref = getModularInstance(ref);
return getStreamInternal(ref as Reference, maxDownloadSizeBytes);
}
3 changes: 1 addition & 2 deletions packages/storage/src/implementation/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Network headers */
export type Headers = Record<string, string>;

Expand All @@ -23,7 +22,7 @@ export type ConnectionType =
| string
| ArrayBuffer
| Blob
| NodeJS.ReadableStream;
| ReadableStream<Uint8Array>;

/**
* A lightweight wrapper around XMLHttpRequest with a
Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/platform/browser/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export function newBlobConnection(): Connection<Blob> {
return new XhrBlobConnection();
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream> {
throw new Error('Streams are only supported on Node');
}

Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/platform/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export function newBlobConnection(): Connection<Blob> {
return nodeNewBlobConnection();
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream<Uint8Array>> {
// This file is only used in Node.js tests using ts-node.
return nodeNewStreamConnection();
}
20 changes: 11 additions & 9 deletions packages/storage/src/platform/node/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class FetchConnection<T extends ConnectionType>
async send(
url: string,
method: string,
body?: ArrayBufferView | Blob | string,
body?: NodeJS.ArrayBufferView | Blob | string,
headers?: Record<string, string>
): Promise<void> {
if (this.sent_) {
Expand All @@ -62,7 +62,7 @@ abstract class FetchConnection<T extends ConnectionType>
const response = await this.fetch_(url, {
method,
headers: headers || {},
body: body as ArrayBufferView | string
body: body as NodeJS.ArrayBufferView | string
});
this.headers_ = response.headers;
this.statusCode_ = response.status;
Expand Down Expand Up @@ -146,13 +146,15 @@ export function newBytesConnection(): Connection<ArrayBuffer> {
return new FetchBytesConnection();
}

export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream> {
private stream_: NodeJS.ReadableStream | null = null;
export class FetchStreamConnection extends FetchConnection<
ReadableStream<Uint8Array>
> {
private stream_: ReadableStream<Uint8Array> | null = null;

async send(
url: string,
method: string,
body?: ArrayBufferView | Blob | string,
body?: NodeJS.ArrayBufferView | Blob | string,
headers?: Record<string, string>
): Promise<void> {
if (this.sent_) {
Expand All @@ -164,12 +166,12 @@ export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream
const response = await this.fetch_(url, {
method,
headers: headers || {},
body: body as ArrayBufferView | string
body: body as NodeJS.ArrayBufferView | string
});
this.headers_ = response.headers;
this.statusCode_ = response.status;
this.errorCode_ = ErrorCode.NO_ERROR;
this.stream_ = response.body;
this.stream_ = response.body as ReadableStream<Uint8Array>;
} catch (e) {
this.errorText_ = (e as Error)?.message;
// emulate XHR which sets status to 0 when encountering a network error
Expand All @@ -178,15 +180,15 @@ export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream
}
}

getResponse(): NodeJS.ReadableStream {
getResponse(): ReadableStream {
if (!this.stream_) {
throw internalError('cannot .getResponse() before sending');
}
return this.stream_;
}
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream<Uint8Array>> {
return new FetchStreamConnection();
}

Expand Down
33 changes: 16 additions & 17 deletions packages/storage/src/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
* @fileoverview Defines the Firebase StorageReference class.
*/

import { PassThrough, Transform, TransformOptions } from 'stream';

import { FbsBlob } from './implementation/blob';
import { Location } from './implementation/location';
import { getMappings } from './implementation/metadata';
Expand Down Expand Up @@ -48,6 +46,7 @@ import {
newStreamConnection,
newTextConnection
} from './platform/connection';
import { RequestInfo } from './implementation/requestinfo';

/**
* Provides methods to interact with a bucket in the Firebase Storage service.
Expand Down Expand Up @@ -203,42 +202,42 @@ export function getBlobInternal(
export function getStreamInternal(
ref: Reference,
maxDownloadSizeBytes?: number
): NodeJS.ReadableStream {
): ReadableStream {
ref._throwIfRoot('getStream');
const requestInfo = getBytes(
const requestInfo: RequestInfo<ReadableStream, ReadableStream> = getBytes(
ref.storage,
ref._location,
maxDownloadSizeBytes
);

/** A transformer that passes through the first n bytes. */
const newMaxSizeTransform: (n: number) => TransformOptions = n => {
// Transforms the stream so that only `maxDownloadSizeBytes` bytes are piped to the result
const newMaxSizeTransform = (n: number): Transformer => {
let missingBytes = n;
return {
transform(chunk, encoding, callback) {
transform(chunk, controller: TransformStreamDefaultController) {
// GCS may not honor the Range header for small files
if (chunk.length < missingBytes) {
this.push(chunk);
controller.enqueue(chunk);
missingBytes -= chunk.length;
} else {
this.push(chunk.slice(0, missingBytes));
this.emit('end');
controller.enqueue(chunk.slice(0, missingBytes));
controller.terminate();
}
callback();
}
} as TransformOptions;
};
};

const result =
maxDownloadSizeBytes !== undefined
? new Transform(newMaxSizeTransform(maxDownloadSizeBytes))
: new PassThrough();
? new TransformStream(newMaxSizeTransform(maxDownloadSizeBytes))
: new TransformStream(); // The default transformer forwards all chunks to its readable side

ref.storage
.makeRequestWithTokens(requestInfo, newStreamConnection)
.then(stream => (stream as NodeJS.ReadableStream).pipe(result))
.catch(e => result.destroy(e));
return result;
.then(readableStream => readableStream.pipeThrough(result))
.catch(err => result.writable.abort(err));

return result.readable;
}

/**
Expand Down
37 changes: 25 additions & 12 deletions packages/storage/test/node/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,27 @@ import { FirebaseApp, deleteApp } from '@firebase/app';
import { getStream, ref, uploadBytes } from '../../src/index.node';
import * as types from '../../src/public-types';

async function readData(reader: NodeJS.ReadableStream): Promise<number[]> {
return new Promise<number[]>((resolve, reject) => {
const data: number[] = [];
reader.on('error', e => reject(e));
reader.on('data', chunk => data.push(...Array.from(chunk as Buffer)));
reader.on('end', () => resolve(data));
// See: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/getReader
async function readData(readableStream: ReadableStream): Promise<Uint8Array> {
return new Promise<Uint8Array>((resolve, reject) => {
const reader: ReadableStreamDefaultReader = readableStream.getReader();
const result: any[] = [];
reader
.read()
.then(function processBytes({ done, value }): any {
if (done) {
resolve(new Uint8Array(result));
return;
}

result.push(...value);
return reader.read().then(processBytes);
})
.catch(err => {
console.error(err);
reject(err);
return;
});
});
}

Expand All @@ -46,19 +61,17 @@ describe('Firebase Storage > getStream', () => {
it('can get stream', async () => {
const reference = ref(storage, 'public/exp-bytes');
await uploadBytes(reference, new Uint8Array([0, 1, 3, 128, 255]));
const stream = await getStream(reference);
const stream = getStream(reference);
const data = await readData(stream);
expect(new Uint8Array(data)).to.deep.equal(
new Uint8Array([0, 1, 3, 128, 255])
);
expect(data).to.deep.equal(new Uint8Array([0, 1, 3, 128, 255]));
});

it('can get first n bytes of stream', async () => {
const reference = ref(storage, 'public/exp-bytes');
await uploadBytes(reference, new Uint8Array([0, 1, 3]));
const stream = await getStream(reference, 2);
const stream = getStream(reference, 2);
const data = await readData(stream);
expect(new Uint8Array(data)).to.deep.equal(new Uint8Array([0, 1]));
expect(data).to.deep.equal(new Uint8Array([0, 1]));
});

it('getStream() throws for missing file', async () => {
Expand Down

0 comments on commit 6b0ca77

Please sign in to comment.