Skip to content

Commit

Permalink
Split FlightClient into a basic part and a stream part
Browse files Browse the repository at this point in the history
Same split as the server.
  • Loading branch information
sebmarkbage committed Mar 10, 2020
1 parent 2162d25 commit a64c5e1
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 133 deletions.
2 changes: 1 addition & 1 deletion packages/react-client/flight.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
* @flow
*/

export * from './src/ReactFlightClient';
export * from './src/ReactFlightClientStream';
141 changes: 28 additions & 113 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,17 @@
* @flow
*/

import type {Source, StringDecoder} from './ReactFlightClientHostConfig';

import {
supportsBinaryStreams,
createStringDecoder,
readPartialStringChunk,
readFinalStringChunk,
} from './ReactFlightClientHostConfig';

export type ReactModelRoot<T> = {|
model: T,
|};

type JSONValue =
export type JSONValue =
| number
| null
| boolean
| string
| {[key: string]: JSONValue, ...};
| {[key: string]: JSONValue}
| Array<JSONValue>;

const PENDING = 0;
const RESOLVED = 1;
Expand All @@ -48,39 +40,23 @@ type ErroredChunk = {|
|};
type Chunk = PendingChunk | ResolvedChunk | ErroredChunk;

type OpaqueResponseWithoutDecoder = {
source: Source,
export type Response = {
partialRow: string,
modelRoot: ReactModelRoot<any>,
chunks: Map<number, Chunk>,
fromJSON: (key: string, value: JSONValue) => any,
...
};

type OpaqueResponse = OpaqueResponseWithoutDecoder & {
stringDecoder: StringDecoder,
...
};

export function createResponse(source: Source): OpaqueResponse {
export function createResponse(): Response {
let modelRoot: ReactModelRoot<any> = ({}: any);
let rootChunk: Chunk = createPendingChunk();
definePendingProperty(modelRoot, 'model', rootChunk);
let chunks: Map<number, Chunk> = new Map();
chunks.set(0, rootChunk);

let response: OpaqueResponse = (({
source,
let response = {
partialRow: '',
modelRoot,
chunks: chunks,
fromJSON: function(key, value) {
return parseFromJSON(response, this, key, value);
},
}: OpaqueResponseWithoutDecoder): any);
if (supportsBinaryStreams) {
response.stringDecoder = createStringDecoder();
}
};
return response;
}

Expand Down Expand Up @@ -138,10 +114,7 @@ function resolveChunk(chunk: Chunk, value: mixed): void {

// Report that any missing chunks in the model is now going to throw this
// error upon read. Also notify any pending promises.
export function reportGlobalError(
response: OpaqueResponse,
error: Error,
): void {
export function reportGlobalError(response: Response, error: Error): void {
response.chunks.forEach(chunk => {
// If this chunk was already resolved or errored, it won't
// trigger an error but if it wasn't then we need to
Expand All @@ -168,8 +141,8 @@ function definePendingProperty(
});
}

function parseFromJSON(
response: OpaqueResponse,
export function parseModelFromJSON(
response: Response,
targetObj: Object,
key: string,
value: JSONValue,
Expand All @@ -195,12 +168,11 @@ function parseFromJSON(
return value;
}

function resolveJSONRow(
response: OpaqueResponse,
export function resolveModelChunk<T>(
response: Response,
id: number,
json: string,
model: T,
): void {
let model = JSON.parse(json, response.fromJSON);
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
Expand All @@ -210,88 +182,31 @@ function resolveJSONRow(
}
}

function processFullRow(response: OpaqueResponse, row: string): void {
if (row === '') {
return;
}
let tag = row[0];
switch (tag) {
case 'J': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
resolveJSONRow(response, id, json);
return;
}
case 'E': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let errorInfo = JSON.parse(json);
let error = new Error(errorInfo.message);
error.stack = errorInfo.stack;
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createErrorChunk(error));
} else {
triggerErrorOnChunk(chunk, error);
}
return;
}
default: {
// Assume this is the root model.
resolveJSONRow(response, 0, row);
return;
}
}
}

export function processStringChunk(
response: OpaqueResponse,
chunk: string,
offset: number,
): void {
let linebreak = chunk.indexOf('\n', offset);
while (linebreak > -1) {
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
processFullRow(response, fullrow);
response.partialRow = '';
offset = linebreak + 1;
linebreak = chunk.indexOf('\n', offset);
}
response.partialRow += chunk.substring(offset);
}

export function processBinaryChunk(
response: OpaqueResponse,
chunk: Uint8Array,
export function resolveErrorChunk(
response: Response,
id: number,
message: string,
stack: string,
): void {
if (!supportsBinaryStreams) {
throw new Error("This environment don't support binary chunks.");
}
let stringDecoder = response.stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
let fullrow =
response.partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response.partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
let error = new Error(message);
error.stack = stack;
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createErrorChunk(error));
} else {
triggerErrorOnChunk(chunk, error);
}
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
}

export function complete(response: OpaqueResponse): void {
export function close(response: Response): void {
// In case there are any remaining unresolved chunks, they won't
// be resolved now. So we need to issue an error to those.
// Ideally we should be able to early bail out if we kept a
// ref count of pending chunks.
reportGlobalError(response, new Error('Connection closed.'));
}

export function getModelRoot<T>(response: OpaqueResponse): ReactModelRoot<T> {
export function getModelRoot<T>(response: Response): ReactModelRoot<T> {
return response.modelRoot;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
* @flow
*/

export type Source = Promise<Response> | ReadableStream | XMLHttpRequest;

export type StringDecoder = TextDecoder;

export const supportsBinaryStreams = true;
Expand Down
116 changes: 116 additions & 0 deletions packages/react-client/src/ReactFlightClientStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

import type {Response as ResponseBase, JSONValue} from './ReactFlightClient';

import type {StringDecoder} from './ReactFlightClientHostConfig';

import {
createResponse as createResponseImpl,
resolveModelChunk,
resolveErrorChunk,
parseModelFromJSON,
} from './ReactFlightClient';

import {
supportsBinaryStreams,
createStringDecoder,
readPartialStringChunk,
readFinalStringChunk,
} from './ReactFlightClientHostConfig';

export type ReactModelRoot<T> = {|
model: T,
|};

type Response = ResponseBase & {
fromJSON: (key: string, value: JSONValue) => any,
stringDecoder: StringDecoder,
};

export function createResponse(): Response {
let response: Response = (createResponseImpl(): any);
response.fromJSON = function(key: string, value: JSONValue) {
return parseModelFromJSON(response, this, key, value);
};
if (supportsBinaryStreams) {
response.stringDecoder = createStringDecoder();
}
return response;
}

function processFullRow(response: Response, row: string): void {
if (row === '') {
return;
}
let tag = row[0];
switch (tag) {
case 'J': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let model = JSON.parse(json, response.fromJSON);
resolveModelChunk(response, id, model);
return;
}
case 'E': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let errorInfo = JSON.parse(json);
resolveErrorChunk(response, id, errorInfo.message, errorInfo.stack);
return;
}
default: {
// Assume this is the root model.
let model = JSON.parse(row, response.fromJSON);
resolveModelChunk(response, 0, model);
return;
}
}
}

export function processStringChunk(
response: Response,
chunk: string,
offset: number,
): void {
let linebreak = chunk.indexOf('\n', offset);
while (linebreak > -1) {
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
processFullRow(response, fullrow);
response.partialRow = '';
offset = linebreak + 1;
linebreak = chunk.indexOf('\n', offset);
}
response.partialRow += chunk.substring(offset);
}

export function processBinaryChunk(
response: Response,
chunk: Uint8Array,
): void {
if (!supportsBinaryStreams) {
throw new Error("This environment don't support binary chunks.");
}
let stringDecoder = response.stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
let fullrow =
response.partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response.partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
}
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
}

export {reportGlobalError, close, getModelRoot} from './ReactFlightClient';
Loading

0 comments on commit a64c5e1

Please sign in to comment.