Skip to content

Commit

Permalink
Extracting formatter, error resonding by format.
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTail committed Dec 6, 2024
1 parent 599c74e commit 20621ec
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 22 deletions.
2 changes: 1 addition & 1 deletion coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
57 changes: 36 additions & 21 deletions src/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,30 @@ interface Emitter<E extends EventsMap> extends FlatObject {
emit: <K extends keyof E>(event: K, data: z.input<E[K]>) => void;
}

const makeEventSchema = (event: string, data: z.ZodTypeAny) =>
z.object({
data,
event: z.literal(event),
id: z.string().optional(),
retry: z.number().int().positive().optional(),
});

const formatEvent = <E extends EventsMap>(
events: E,
event: keyof E,
data: unknown,
) =>
makeEventSchema(String(event), events[event])
.transform((props) =>
[
`event: ${props.event}`,
`data: ${JSON.stringify(props.data)}`,
"",
"", // empty line: events separator
].join("\n"),
)
.parse({ event, data });

const makeMiddleware = <E extends EventsMap>(events: E) =>
new Middleware({
handler: async ({ response }): Promise<Emitter<E>> => {
Expand All @@ -30,26 +54,16 @@ const makeMiddleware = <E extends EventsMap>(events: E) =>
return {
isClosed: () => response.writableEnded || response.closed,
emit: (event, data) => {
response.write(
`event: ${String(event)}\ndata: ${JSON.stringify(events[event].parse(data))}\n\n`,
"utf-8",
);
response.write(formatEvent(events, event, data), "utf-8");
response.flush();
},
};
},
});

const makeEventSchema = (event: string, data: z.ZodTypeAny) =>
z.object({
data,
event: z.literal(event),
id: z.string().optional(),
retry: z.number().int().positive().optional(),
});

const makeResultHandler = <E extends EventsMap>(events: E) =>
new ResultHandler({
const makeResultHandler = <E extends EventsMap>(events: E) => {
const negativeSchema = makeEventSchema("error", z.string());
return new ResultHandler({
positive: {
mimeType: contentTypes.sse,
schema: Object.entries(events)
Expand All @@ -58,21 +72,22 @@ const makeResultHandler = <E extends EventsMap>(events: E) =>
)
.reduce((agg, schema) => agg.or(schema)),
},
negative: {
mimeType: contentTypes.sse,
schema: makeEventSchema("error", z.string()),
},
negative: { mimeType: contentTypes.sse, schema: negativeSchema },
handler: async ({ response, error, logger, request, input }) => {
if (error) {
const httpError = ensureHttpError(error);
logServerError(httpError, logger, request, input);
return void response
.status(httpError.statusCode)
.end(getPublicErrorMessage(httpError));
const output = formatEvent(
{ error: negativeSchema },
"error",
getPublicErrorMessage(httpError),
);
return void response.status(httpError.statusCode).end(output);
}
response.status(200).end();
},
});
};

export const unstable_createEventStream = <
E extends EventsMap,
Expand Down

0 comments on commit 20621ec

Please sign in to comment.