-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add (TeeTo|ReadFrom)ReadableStreamLink links #387
Changes from all commits
f96ec74
6890bd9
e77d7d7
f3a27e6
400846a
f3611e2
8c9ab28
31b9d01
0cfc00a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"@apollo/client-react-streaming": minor | ||
--- | ||
|
||
add `TeeToReadableStreamLink` and `ReadFromReadableStreamLink` |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,137 @@ | ||||||||||||||||||||||||||
import { ApolloLink, Observable } from "@apollo/client/index.js"; | ||||||||||||||||||||||||||
import type { FetchResult } from "@apollo/client/index.js"; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
export type ReadableStreamLinkEvent = | ||||||||||||||||||||||||||
| { type: "next"; value: FetchResult } | ||||||||||||||||||||||||||
| { type: "completed" } | ||||||||||||||||||||||||||
| { type: "error" }; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
interface InternalContext { | ||||||||||||||||||||||||||
[teeToReadableStreamKey]?: ReadableStreamDefaultController<ReadableStreamLinkEvent>; | ||||||||||||||||||||||||||
[readFromReadableStreamKey]?: ReadableStream<ReadableStreamLinkEvent>; | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
const teeToReadableStreamKey = Symbol.for( | ||||||||||||||||||||||||||
"apollo.tee.readableStreamController" | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
const readFromReadableStreamKey = Symbol.for("apollo.read.readableStream"); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||
* Apply to a context that will be passed to a link chain containing `TeeToReadableStreamLink`. | ||||||||||||||||||||||||||
* @param controller | ||||||||||||||||||||||||||
* @param context | ||||||||||||||||||||||||||
* @returns | ||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||
export function teeToReadableStream<T extends Record<string, any>>( | ||||||||||||||||||||||||||
controller: ReadableStreamDefaultController<ReadableStreamLinkEvent>, | ||||||||||||||||||||||||||
context: T | ||||||||||||||||||||||||||
): T & InternalContext { | ||||||||||||||||||||||||||
return Object.assign(context, { | ||||||||||||||||||||||||||
[teeToReadableStreamKey]: controller, | ||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||
* Apply to a context that will be passed to a link chain containing `ReadFromReadableStreamLink`. | ||||||||||||||||||||||||||
* @param readableStream | ||||||||||||||||||||||||||
* @param context | ||||||||||||||||||||||||||
* @returns | ||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||
export function readFromReadableStream<T extends Record<string, any>>( | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question as the link. Should these be marked as |
||||||||||||||||||||||||||
readableStream: ReadableStream<ReadableStreamLinkEvent>, | ||||||||||||||||||||||||||
context: T | ||||||||||||||||||||||||||
): T & InternalContext { | ||||||||||||||||||||||||||
return Object.assign(context, { | ||||||||||||||||||||||||||
[readFromReadableStreamKey]: readableStream, | ||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||
* A link that allows the request to be cloned into a readable stream, e.g. for | ||||||||||||||||||||||||||
* transport of multipart responses from RSC or a server loader to the browser. | ||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||
export const TeeToReadableStreamLink = new ApolloLink((operation, forward) => { | ||||||||||||||||||||||||||
jerelmiller marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+49
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at #389, it looks like these are used internally and not actually set by the user. If that is the case, should we mark these links as
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say that everything in this package could be considered "internal" or as a building block for "build-your-own-framework". These links will e.g. be used outside of the ApolloClient implementations of this package in the React-Router implementation. |
||||||||||||||||||||||||||
const context = operation.getContext() as InternalContext; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
const controller = context[teeToReadableStreamKey]; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
if (controller) { | ||||||||||||||||||||||||||
return new Observable((observer) => { | ||||||||||||||||||||||||||
const subscription = forward(operation).subscribe({ | ||||||||||||||||||||||||||
next(result) { | ||||||||||||||||||||||||||
controller.enqueue({ type: "next", value: result }); | ||||||||||||||||||||||||||
observer.next(result); | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
error(error) { | ||||||||||||||||||||||||||
controller.enqueue({ type: "error" }); | ||||||||||||||||||||||||||
controller.close(); | ||||||||||||||||||||||||||
observer.error(error); | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
complete() { | ||||||||||||||||||||||||||
controller.enqueue({ type: "completed" }); | ||||||||||||||||||||||||||
controller.close(); | ||||||||||||||||||||||||||
observer.complete(); | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return () => { | ||||||||||||||||||||||||||
subscription.unsubscribe(); | ||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return forward(operation); | ||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||
* A link that allows the response to be read from a readable stream, e.g. for | ||||||||||||||||||||||||||
* hydration of a multipart response from RSC or a server loader in the browser. | ||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||
export const ReadFromReadableStreamLink = new ApolloLink( | ||||||||||||||||||||||||||
(operation, forward) => { | ||||||||||||||||||||||||||
const context = operation.getContext() as InternalContext; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
const eventSteam = context[readFromReadableStreamKey]; | ||||||||||||||||||||||||||
if (eventSteam) { | ||||||||||||||||||||||||||
return new Observable((observer) => { | ||||||||||||||||||||||||||
let aborted = false as boolean; | ||||||||||||||||||||||||||
const reader = eventSteam.getReader(); | ||||||||||||||||||||||||||
consumeReader(); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return () => { | ||||||||||||||||||||||||||
aborted = true; | ||||||||||||||||||||||||||
reader.cancel(); | ||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
async function consumeReader() { | ||||||||||||||||||||||||||
let event: | ||||||||||||||||||||||||||
| ReadableStreamReadResult<ReadableStreamLinkEvent> | ||||||||||||||||||||||||||
| undefined = undefined; | ||||||||||||||||||||||||||
while (!aborted && !event?.done) { | ||||||||||||||||||||||||||
event = await reader.read(); | ||||||||||||||||||||||||||
if (aborted) break; | ||||||||||||||||||||||||||
if (event.value) { | ||||||||||||||||||||||||||
switch (event.value.type) { | ||||||||||||||||||||||||||
case "next": | ||||||||||||||||||||||||||
observer.next(event.value.value); | ||||||||||||||||||||||||||
break; | ||||||||||||||||||||||||||
case "completed": | ||||||||||||||||||||||||||
observer.complete(); | ||||||||||||||||||||||||||
break; | ||||||||||||||||||||||||||
case "error": | ||||||||||||||||||||||||||
observer.error( | ||||||||||||||||||||||||||
new Error( | ||||||||||||||||||||||||||
"Error from event stream. Redacted for security concerns." | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
break; | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return forward(operation); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anything else we'd consider putting on
apollo.tee.*
? If not, I'd simplify this to a single namespace (same goes for the read key)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are common enough that I see them colliding with something else in the future - it's so close to a web standard type that I would combine it with the method calling it: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController