diff --git a/.changeset/old-squids-deny.md b/.changeset/old-squids-deny.md new file mode 100644 index 00000000..4e5db860 --- /dev/null +++ b/.changeset/old-squids-deny.md @@ -0,0 +1,5 @@ +--- +"@apollo/client-react-streaming": minor +--- + +add `TeeToReadableStreamLink` and `ReadFromReadableStreamLink` diff --git a/packages/client-react-streaming/package-shape.json b/packages/client-react-streaming/package-shape.json index c4ec3d36..cd71fc14 100644 --- a/packages/client-react-streaming/package-shape.json +++ b/packages/client-react-streaming/package-shape.json @@ -7,6 +7,10 @@ "RemoveMultipartDirectivesLink", "SSRMultipartLink", "registerApolloClient", + "ReadFromReadableStreamLink", + "TeeToReadableStreamLink", + "readFromReadableStream", + "teeToReadableStream", "built_for_rsc" ], "browser": [ @@ -17,7 +21,12 @@ "RemoveMultipartDirectivesLink", "SSRMultipartLink", "WrapApolloProvider", + "skipDataTransport", "resetApolloSingletons", + "ReadFromReadableStreamLink", + "TeeToReadableStreamLink", + "readFromReadableStream", + "teeToReadableStream", "built_for_browser" ], "node": [ @@ -28,7 +37,12 @@ "RemoveMultipartDirectivesLink", "SSRMultipartLink", "WrapApolloProvider", + "skipDataTransport", "resetApolloSingletons", + "ReadFromReadableStreamLink", + "TeeToReadableStreamLink", + "readFromReadableStream", + "teeToReadableStream", "built_for_ssr" ], "edge-light,worker,browser": [ @@ -39,7 +53,12 @@ "RemoveMultipartDirectivesLink", "SSRMultipartLink", "WrapApolloProvider", + "skipDataTransport", "resetApolloSingletons", + "ReadFromReadableStreamLink", + "TeeToReadableStreamLink", + "readFromReadableStream", + "teeToReadableStream", "built_for_ssr" ] }, diff --git a/packages/client-react-streaming/src/DataTransportAbstraction/WrappedApolloClient.tsx b/packages/client-react-streaming/src/DataTransportAbstraction/WrappedApolloClient.tsx index a2541628..40c26230 100644 --- a/packages/client-react-streaming/src/DataTransportAbstraction/WrappedApolloClient.tsx +++ b/packages/client-react-streaming/src/DataTransportAbstraction/WrappedApolloClient.tsx @@ -298,6 +298,25 @@ export class ApolloClientClientBaseImpl extends ApolloClientBase { }; } +const skipDataTransportKey = Symbol.for("apollo.dataTransport.skip"); +interface InternalContext { + [skipDataTransportKey]?: boolean; +} + +/** + * Apply to a context to prevent this operation from being transported over the SSR data transport mechanism. + * @param readableStream + * @param context + * @returns + */ +export function skipDataTransport>( + context: T +): T & InternalContext { + return Object.assign(context, { + [skipDataTransportKey]: true, + }); +} + class ApolloClientSSRImpl extends ApolloClientClientBaseImpl { private forwardedQueries = new (getTrieConstructor(this))(); @@ -315,6 +334,9 @@ class ApolloClientSSRImpl extends ApolloClientClientBaseImpl { if ( options.fetchPolicy !== "cache-only" && options.fetchPolicy !== "standby" && + !(options.context as InternalContext | undefined)?.[ + skipDataTransportKey + ] && !this.forwardedQueries.peekArray(cacheKeyArr) ) { // don't transport the same query over twice diff --git a/packages/client-react-streaming/src/DataTransportAbstraction/index.ts b/packages/client-react-streaming/src/DataTransportAbstraction/index.ts index 1c16db28..a75be06d 100644 --- a/packages/client-react-streaming/src/DataTransportAbstraction/index.ts +++ b/packages/client-react-streaming/src/DataTransportAbstraction/index.ts @@ -1,5 +1,5 @@ export { InMemoryCache } from "./WrappedInMemoryCache.js"; -export { ApolloClient } from "./WrappedApolloClient.js"; +export { ApolloClient, skipDataTransport } from "./WrappedApolloClient.js"; export { resetApolloSingletons } from "./testHelpers.js"; diff --git a/packages/client-react-streaming/src/ReadableStreamLink.tsx b/packages/client-react-streaming/src/ReadableStreamLink.tsx new file mode 100644 index 00000000..3f6372f7 --- /dev/null +++ b/packages/client-react-streaming/src/ReadableStreamLink.tsx @@ -0,0 +1,137 @@ +import { ApolloLink, Observable } from "@apollo/client/index.js"; +import type { FetchResult } from "@apollo/client/index.js"; + +export type ReadableStreamLinkEvent = + | { type: "next"; value: FetchResult } + | { type: "completed" } + | { type: "error" }; + +interface InternalContext { + [teeToReadableStreamKey]?: ReadableStreamDefaultController; + [readFromReadableStreamKey]?: ReadableStream; +} + +const teeToReadableStreamKey = Symbol.for( + "apollo.tee.readableStreamController" +); +const readFromReadableStreamKey = Symbol.for("apollo.read.readableStream"); + +/** + * Apply to a context that will be passed to a link chain containing `TeeToReadableStreamLink`. + * @param controller + * @param context + * @returns + */ +export function teeToReadableStream>( + controller: ReadableStreamDefaultController, + context: T +): T & InternalContext { + return Object.assign(context, { + [teeToReadableStreamKey]: controller, + }); +} + +/** + * Apply to a context that will be passed to a link chain containing `ReadFromReadableStreamLink`. + * @param readableStream + * @param context + * @returns + */ +export function readFromReadableStream>( + readableStream: ReadableStream, + context: T +): T & InternalContext { + return Object.assign(context, { + [readFromReadableStreamKey]: readableStream, + }); +} + +/** + * A link that allows the request to be cloned into a readable stream, e.g. for + * transport of multipart responses from RSC or a server loader to the browser. + */ +export const TeeToReadableStreamLink = new ApolloLink((operation, forward) => { + const context = operation.getContext() as InternalContext; + + const controller = context[teeToReadableStreamKey]; + + if (controller) { + return new Observable((observer) => { + const subscription = forward(operation).subscribe({ + next(result) { + controller.enqueue({ type: "next", value: result }); + observer.next(result); + }, + error(error) { + controller.enqueue({ type: "error" }); + controller.close(); + observer.error(error); + }, + complete() { + controller.enqueue({ type: "completed" }); + controller.close(); + observer.complete(); + }, + }); + + return () => { + subscription.unsubscribe(); + }; + }); + } + + return forward(operation); +}); + +/** + * A link that allows the response to be read from a readable stream, e.g. for + * hydration of a multipart response from RSC or a server loader in the browser. + */ +export const ReadFromReadableStreamLink = new ApolloLink( + (operation, forward) => { + const context = operation.getContext() as InternalContext; + + const eventSteam = context[readFromReadableStreamKey]; + if (eventSteam) { + return new Observable((observer) => { + let aborted = false as boolean; + const reader = eventSteam.getReader(); + consumeReader(); + + return () => { + aborted = true; + reader.cancel(); + }; + + async function consumeReader() { + let event: + | ReadableStreamReadResult + | undefined = undefined; + while (!aborted && !event?.done) { + event = await reader.read(); + if (aborted) break; + if (event.value) { + switch (event.value.type) { + case "next": + observer.next(event.value.value); + break; + case "completed": + observer.complete(); + break; + case "error": + observer.error( + new Error( + "Error from event stream. Redacted for security concerns." + ) + ); + break; + } + } + } + } + }); + } + + return forward(operation); + } +); diff --git a/packages/client-react-streaming/src/index.shared.ts b/packages/client-react-streaming/src/index.shared.ts index ecee6a4c..ed6b16cc 100644 --- a/packages/client-react-streaming/src/index.shared.ts +++ b/packages/client-react-streaming/src/index.shared.ts @@ -6,3 +6,10 @@ export { InMemoryCache, } from "./DataTransportAbstraction/index.js"; export type { TransportedQueryRef } from "./transportedQueryRef.js"; +export { + ReadFromReadableStreamLink, + TeeToReadableStreamLink, + readFromReadableStream, + teeToReadableStream, + type ReadableStreamLinkEvent, +} from "./ReadableStreamLink.js"; diff --git a/packages/client-react-streaming/src/index.ts b/packages/client-react-streaming/src/index.ts index 22c3c327..3ebc9e42 100644 --- a/packages/client-react-streaming/src/index.ts +++ b/packages/client-react-streaming/src/index.ts @@ -6,4 +6,5 @@ export { QueryEvent, WrapApolloProvider, WrappedApolloProvider, + skipDataTransport, } from "./DataTransportAbstraction/index.js"; diff --git a/scripts/verify-package-shape.mjs b/scripts/verify-package-shape.mjs index 1dcccb88..68ce73ef 100644 --- a/scripts/verify-package-shape.mjs +++ b/scripts/verify-package-shape.mjs @@ -54,7 +54,7 @@ async function verifyESM(condition, entryPoint, pkg, shape) { child.stdout?.on("data", (data) => (result += data.toString())); child.stderr?.pipe(process.stderr); await new Promise((resolve) => child.on("exit", resolve)); - assert.deepStrictEqual(JSON.parse(result).sort(), shape); + assert.deepStrictEqual(JSON.parse(result).sort(), shape.sort()); } /**