Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flight] Split Streaming from Relay Implemenation #18260

Merged
merged 5 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/react-client/flight.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
* @flow
*/

export * from './src/ReactFlightClient';
export * from './src/ReactFlightClientStream';
141 changes: 28 additions & 113 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,17 @@
* @flow
*/

import type {Source, StringDecoder} from './ReactFlightClientHostConfig';

import {
supportsBinaryStreams,
createStringDecoder,
readPartialStringChunk,
readFinalStringChunk,
} from './ReactFlightClientHostConfig';

export type ReactModelRoot<T> = {|
model: T,
|};

type JSONValue =
export type JSONValue =
| number
| null
| boolean
| string
| {[key: string]: JSONValue, ...};
| {[key: string]: JSONValue}
| Array<JSONValue>;

const PENDING = 0;
const RESOLVED = 1;
Expand All @@ -48,39 +40,23 @@ type ErroredChunk = {|
|};
type Chunk = PendingChunk | ResolvedChunk | ErroredChunk;

type OpaqueResponseWithoutDecoder = {
source: Source,
export type Response = {
partialRow: string,
modelRoot: ReactModelRoot<any>,
chunks: Map<number, Chunk>,
fromJSON: (key: string, value: JSONValue) => any,
...
};

type OpaqueResponse = OpaqueResponseWithoutDecoder & {
stringDecoder: StringDecoder,
...
};

export function createResponse(source: Source): OpaqueResponse {
export function createResponse(): Response {
let modelRoot: ReactModelRoot<any> = ({}: any);
let rootChunk: Chunk = createPendingChunk();
definePendingProperty(modelRoot, 'model', rootChunk);
let chunks: Map<number, Chunk> = new Map();
chunks.set(0, rootChunk);

let response: OpaqueResponse = (({
source,
let response = {
partialRow: '',
modelRoot,
chunks: chunks,
fromJSON: function(key, value) {
return parseFromJSON(response, this, key, value);
},
}: OpaqueResponseWithoutDecoder): any);
if (supportsBinaryStreams) {
response.stringDecoder = createStringDecoder();
}
};
return response;
}

Expand Down Expand Up @@ -138,10 +114,7 @@ function resolveChunk(chunk: Chunk, value: mixed): void {

// Report that any missing chunks in the model is now going to throw this
// error upon read. Also notify any pending promises.
export function reportGlobalError(
response: OpaqueResponse,
error: Error,
): void {
export function reportGlobalError(response: Response, error: Error): void {
response.chunks.forEach(chunk => {
// If this chunk was already resolved or errored, it won't
// trigger an error but if it wasn't then we need to
Expand All @@ -168,8 +141,8 @@ function definePendingProperty(
});
}

function parseFromJSON(
response: OpaqueResponse,
export function parseModelFromJSON(
response: Response,
targetObj: Object,
key: string,
value: JSONValue,
Expand All @@ -195,12 +168,11 @@ function parseFromJSON(
return value;
}

function resolveJSONRow(
response: OpaqueResponse,
export function resolveModelChunk<T>(
response: Response,
id: number,
json: string,
model: T,
): void {
let model = JSON.parse(json, response.fromJSON);
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
Expand All @@ -210,88 +182,31 @@ function resolveJSONRow(
}
}

function processFullRow(response: OpaqueResponse, row: string): void {
if (row === '') {
return;
}
let tag = row[0];
switch (tag) {
case 'J': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
resolveJSONRow(response, id, json);
return;
}
case 'E': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let errorInfo = JSON.parse(json);
let error = new Error(errorInfo.message);
error.stack = errorInfo.stack;
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createErrorChunk(error));
} else {
triggerErrorOnChunk(chunk, error);
}
return;
}
default: {
// Assume this is the root model.
resolveJSONRow(response, 0, row);
return;
}
}
}

export function processStringChunk(
response: OpaqueResponse,
chunk: string,
offset: number,
): void {
let linebreak = chunk.indexOf('\n', offset);
while (linebreak > -1) {
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
processFullRow(response, fullrow);
response.partialRow = '';
offset = linebreak + 1;
linebreak = chunk.indexOf('\n', offset);
}
response.partialRow += chunk.substring(offset);
}

export function processBinaryChunk(
response: OpaqueResponse,
chunk: Uint8Array,
export function resolveErrorChunk(
response: Response,
id: number,
message: string,
stack: string,
): void {
if (!supportsBinaryStreams) {
throw new Error("This environment don't support binary chunks.");
}
let stringDecoder = response.stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
let fullrow =
response.partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response.partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
let error = new Error(message);
error.stack = stack;
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createErrorChunk(error));
} else {
triggerErrorOnChunk(chunk, error);
}
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
}

export function complete(response: OpaqueResponse): void {
export function close(response: Response): void {
// In case there are any remaining unresolved chunks, they won't
// be resolved now. So we need to issue an error to those.
// Ideally we should be able to early bail out if we kept a
// ref count of pending chunks.
reportGlobalError(response, new Error('Connection closed.'));
}

export function getModelRoot<T>(response: OpaqueResponse): ReactModelRoot<T> {
export function getModelRoot<T>(response: Response): ReactModelRoot<T> {
return response.modelRoot;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
* @flow
*/

export type Source = Promise<Response> | ReadableStream | XMLHttpRequest;

export type StringDecoder = TextDecoder;

export const supportsBinaryStreams = true;
Expand Down
116 changes: 116 additions & 0 deletions packages/react-client/src/ReactFlightClientStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

import type {Response as ResponseBase, JSONValue} from './ReactFlightClient';

import type {StringDecoder} from './ReactFlightClientHostConfig';

import {
createResponse as createResponseImpl,
resolveModelChunk,
resolveErrorChunk,
parseModelFromJSON,
} from './ReactFlightClient';

import {
supportsBinaryStreams,
createStringDecoder,
readPartialStringChunk,
readFinalStringChunk,
} from './ReactFlightClientHostConfig';

export type ReactModelRoot<T> = {|
model: T,
|};

type Response = ResponseBase & {
fromJSON: (key: string, value: JSONValue) => any,
stringDecoder: StringDecoder,
};

export function createResponse(): Response {
let response: Response = (createResponseImpl(): any);
response.fromJSON = function(key: string, value: JSONValue) {
return parseModelFromJSON(response, this, key, value);
};
if (supportsBinaryStreams) {
response.stringDecoder = createStringDecoder();
}
return response;
}

function processFullRow(response: Response, row: string): void {
if (row === '') {
return;
}
let tag = row[0];
switch (tag) {
case 'J': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let model = JSON.parse(json, response.fromJSON);
resolveModelChunk(response, id, model);
return;
}
case 'E': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let errorInfo = JSON.parse(json);
resolveErrorChunk(response, id, errorInfo.message, errorInfo.stack);
return;
}
default: {
// Assume this is the root model.
let model = JSON.parse(row, response.fromJSON);
resolveModelChunk(response, 0, model);
return;
}
}
}

export function processStringChunk(
response: Response,
chunk: string,
offset: number,
): void {
let linebreak = chunk.indexOf('\n', offset);
while (linebreak > -1) {
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
processFullRow(response, fullrow);
response.partialRow = '';
offset = linebreak + 1;
linebreak = chunk.indexOf('\n', offset);
}
response.partialRow += chunk.substring(offset);
}

export function processBinaryChunk(
response: Response,
chunk: Uint8Array,
): void {
if (!supportsBinaryStreams) {
throw new Error("This environment don't support binary chunks.");
}
let stringDecoder = response.stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
let fullrow =
response.partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response.partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
}
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
}

export {reportGlobalError, close, getModelRoot} from './ReactFlightClient';
Loading