Skip to content

Commit

Permalink
add (TeeTo|ReadFrom)ReadableStreamLink links (#387)
Browse files Browse the repository at this point in the history
* add `(TeeTo|ReadFrom)ReadableStreamLink` links

* add `skipDataTransport` function

* also export type `ReadableStreamLinkEvent`

* forgot file

* feedback from code review

* changeset

* ensure tests run

* Revert "ensure tests run"

This reverts commit 31b9d01.
  • Loading branch information
phryneas authored Dec 18, 2024
1 parent 9a8c872 commit 20ce0c8
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/old-squids-deny.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@apollo/client-react-streaming": minor
---

add `TeeToReadableStreamLink` and `ReadFromReadableStreamLink`
19 changes: 19 additions & 0 deletions packages/client-react-streaming/package-shape.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
"RemoveMultipartDirectivesLink",
"SSRMultipartLink",
"registerApolloClient",
"ReadFromReadableStreamLink",
"TeeToReadableStreamLink",
"readFromReadableStream",
"teeToReadableStream",
"built_for_rsc"
],
"browser": [
Expand All @@ -17,7 +21,12 @@
"RemoveMultipartDirectivesLink",
"SSRMultipartLink",
"WrapApolloProvider",
"skipDataTransport",
"resetApolloSingletons",
"ReadFromReadableStreamLink",
"TeeToReadableStreamLink",
"readFromReadableStream",
"teeToReadableStream",
"built_for_browser"
],
"node": [
Expand All @@ -28,7 +37,12 @@
"RemoveMultipartDirectivesLink",
"SSRMultipartLink",
"WrapApolloProvider",
"skipDataTransport",
"resetApolloSingletons",
"ReadFromReadableStreamLink",
"TeeToReadableStreamLink",
"readFromReadableStream",
"teeToReadableStream",
"built_for_ssr"
],
"edge-light,worker,browser": [
Expand All @@ -39,7 +53,12 @@
"RemoveMultipartDirectivesLink",
"SSRMultipartLink",
"WrapApolloProvider",
"skipDataTransport",
"resetApolloSingletons",
"ReadFromReadableStreamLink",
"TeeToReadableStreamLink",
"readFromReadableStream",
"teeToReadableStream",
"built_for_ssr"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,25 @@ export class ApolloClientClientBaseImpl extends ApolloClientBase {
};
}

const skipDataTransportKey = Symbol.for("apollo.dataTransport.skip");
interface InternalContext {
[skipDataTransportKey]?: boolean;
}

/**
* Apply to a context to prevent this operation from being transported over the SSR data transport mechanism.
* @param readableStream
* @param context
* @returns
*/
export function skipDataTransport<T extends Record<string, any>>(
context: T
): T & InternalContext {
return Object.assign(context, {
[skipDataTransportKey]: true,
});
}

class ApolloClientSSRImpl extends ApolloClientClientBaseImpl {
private forwardedQueries = new (getTrieConstructor(this))();

Expand All @@ -315,6 +334,9 @@ class ApolloClientSSRImpl extends ApolloClientClientBaseImpl {
if (
options.fetchPolicy !== "cache-only" &&
options.fetchPolicy !== "standby" &&
!(options.context as InternalContext | undefined)?.[
skipDataTransportKey
] &&
!this.forwardedQueries.peekArray(cacheKeyArr)
) {
// don't transport the same query over twice
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export { InMemoryCache } from "./WrappedInMemoryCache.js";
export { ApolloClient } from "./WrappedApolloClient.js";
export { ApolloClient, skipDataTransport } from "./WrappedApolloClient.js";

export { resetApolloSingletons } from "./testHelpers.js";

Expand Down
137 changes: 137 additions & 0 deletions packages/client-react-streaming/src/ReadableStreamLink.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { ApolloLink, Observable } from "@apollo/client/index.js";
import type { FetchResult } from "@apollo/client/index.js";

export type ReadableStreamLinkEvent =
| { type: "next"; value: FetchResult }
| { type: "completed" }
| { type: "error" };

interface InternalContext {
[teeToReadableStreamKey]?: ReadableStreamDefaultController<ReadableStreamLinkEvent>;
[readFromReadableStreamKey]?: ReadableStream<ReadableStreamLinkEvent>;
}

const teeToReadableStreamKey = Symbol.for(
"apollo.tee.readableStreamController"
);
const readFromReadableStreamKey = Symbol.for("apollo.read.readableStream");

/**
* Apply to a context that will be passed to a link chain containing `TeeToReadableStreamLink`.
* @param controller
* @param context
* @returns
*/
export function teeToReadableStream<T extends Record<string, any>>(
controller: ReadableStreamDefaultController<ReadableStreamLinkEvent>,
context: T
): T & InternalContext {
return Object.assign(context, {
[teeToReadableStreamKey]: controller,
});
}

/**
* Apply to a context that will be passed to a link chain containing `ReadFromReadableStreamLink`.
* @param readableStream
* @param context
* @returns
*/
export function readFromReadableStream<T extends Record<string, any>>(
readableStream: ReadableStream<ReadableStreamLinkEvent>,
context: T
): T & InternalContext {
return Object.assign(context, {
[readFromReadableStreamKey]: readableStream,
});
}

/**
* A link that allows the request to be cloned into a readable stream, e.g. for
* transport of multipart responses from RSC or a server loader to the browser.
*/
export const TeeToReadableStreamLink = new ApolloLink((operation, forward) => {
const context = operation.getContext() as InternalContext;

const controller = context[teeToReadableStreamKey];

if (controller) {
return new Observable((observer) => {
const subscription = forward(operation).subscribe({
next(result) {
controller.enqueue({ type: "next", value: result });
observer.next(result);
},
error(error) {
controller.enqueue({ type: "error" });
controller.close();
observer.error(error);
},
complete() {
controller.enqueue({ type: "completed" });
controller.close();
observer.complete();
},
});

return () => {
subscription.unsubscribe();
};
});
}

return forward(operation);
});

/**
* A link that allows the response to be read from a readable stream, e.g. for
* hydration of a multipart response from RSC or a server loader in the browser.
*/
export const ReadFromReadableStreamLink = new ApolloLink(
(operation, forward) => {
const context = operation.getContext() as InternalContext;

const eventSteam = context[readFromReadableStreamKey];
if (eventSteam) {
return new Observable((observer) => {
let aborted = false as boolean;
const reader = eventSteam.getReader();
consumeReader();

return () => {
aborted = true;
reader.cancel();
};

async function consumeReader() {
let event:
| ReadableStreamReadResult<ReadableStreamLinkEvent>
| undefined = undefined;
while (!aborted && !event?.done) {
event = await reader.read();
if (aborted) break;
if (event.value) {
switch (event.value.type) {
case "next":
observer.next(event.value.value);
break;
case "completed":
observer.complete();
break;
case "error":
observer.error(
new Error(
"Error from event stream. Redacted for security concerns."
)
);
break;
}
}
}
}
});
}

return forward(operation);
}
);
7 changes: 7 additions & 0 deletions packages/client-react-streaming/src/index.shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ export {
InMemoryCache,
} from "./DataTransportAbstraction/index.js";
export type { TransportedQueryRef } from "./transportedQueryRef.js";
export {
ReadFromReadableStreamLink,
TeeToReadableStreamLink,
readFromReadableStream,
teeToReadableStream,
type ReadableStreamLinkEvent,
} from "./ReadableStreamLink.js";
1 change: 1 addition & 0 deletions packages/client-react-streaming/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ export {
QueryEvent,
WrapApolloProvider,
WrappedApolloProvider,
skipDataTransport,
} from "./DataTransportAbstraction/index.js";
2 changes: 1 addition & 1 deletion scripts/verify-package-shape.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async function verifyESM(condition, entryPoint, pkg, shape) {
child.stdout?.on("data", (data) => (result += data.toString()));
child.stderr?.pipe(process.stderr);
await new Promise((resolve) => child.on("exit", resolve));
assert.deepStrictEqual(JSON.parse(result).sort(), shape);
assert.deepStrictEqual(JSON.parse(result).sort(), shape.sort());
}

/**
Expand Down

0 comments on commit 20ce0c8

Please sign in to comment.