Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE #2238

Merged
merged 47 commits into from
Dec 9, 2024
Merged

SSE #2238

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a6a2c7d
Add SSE content type.
RobinTail Dec 4, 2024
2ba2136
Early draft: working SSE protocol.
RobinTail Dec 4, 2024
f2d89af
Add res.end to the end.
RobinTail Dec 5, 2024
927ae9d
Moving the emitter into a middleware.
RobinTail Dec 5, 2024
7d182c5
Making a single entity that produces a factory so far.
RobinTail Dec 5, 2024
acd6e10
Ref: the entity returns endpoint.
RobinTail Dec 5, 2024
e6f4bd8
Minor: consistent naming.
RobinTail Dec 6, 2024
b438dab
Emitter constraints on the middleware.
RobinTail Dec 6, 2024
599c74e
Allowing input customization.
RobinTail Dec 6, 2024
20621ec
Extracting formatter, error resonding by format.
RobinTail Dec 6, 2024
4fd6268
System test for SSE.
RobinTail Dec 6, 2024
9261004
rm html file.
RobinTail Dec 6, 2024
a71b6e0
Minor: test naming.
RobinTail Dec 6, 2024
06b6a8d
Fixing and testing schema for negative scenario.
RobinTail Dec 6, 2024
5a30ead
Postpone sending headers until first emit() called, enable responding…
RobinTail Dec 6, 2024
6a819b9
Ensure stream headers in 10s.
RobinTail Dec 6, 2024
ab5bea8
rm redundant status set in rh.
RobinTail Dec 6, 2024
4ce9f6c
Flipping rh implementation for readability.
RobinTail Dec 6, 2024
0948fd6
REF: no fake event for errors, just plain text.
RobinTail Dec 6, 2024
98c8fbe
Testing makeEventSchema().
RobinTail Dec 6, 2024
a083851
Testing formatEvent().
RobinTail Dec 6, 2024
abc4450
Ref: using writeHead() in ensureStream() and testing it (there is an …
RobinTail Dec 7, 2024
678035d
Test for makeMiddleware().
RobinTail Dec 7, 2024
75592dc
Test for makeResultHandler.
RobinTail Dec 7, 2024
e74df7c
Tested unstable_createEventStream().
RobinTail Dec 7, 2024
0328061
Feat: exposing the feature.
RobinTail Dec 7, 2024
7b72999
Using discriminated union for positive schema.
RobinTail Dec 7, 2024
13cc4b0
Readme: listing the feature.
RobinTail Dec 7, 2024
279e96a
Changelog: 21.4.0.
RobinTail Dec 7, 2024
64e4661
jsdoc for Emitter.
RobinTail Dec 7, 2024
a4f973b
Update CHANGELOG.md
RobinTail Dec 7, 2024
c4c4665
MInor: md comment.
RobinTail Dec 7, 2024
73e9688
md: no logging.
RobinTail Dec 7, 2024
015661a
Merge branch 'master' into try-sse
RobinTail Dec 8, 2024
45ee90d
Merge branch 'master' into try-sse
RobinTail Dec 9, 2024
bf6064c
Utilizing buildVoid.
RobinTail Dec 9, 2024
9f356b2
FEAT: EventStreamFactory.
RobinTail Dec 9, 2024
1eeae14
Readme: adjusting implementation.
RobinTail Dec 9, 2024
0661de8
Minor: no instability.
RobinTail Dec 9, 2024
3fe9861
Tagging the new endpoint.
RobinTail Dec 9, 2024
244d9d5
Consumption example.
RobinTail Dec 9, 2024
b59b3e8
Readme: minor.
RobinTail Dec 9, 2024
6eb9c34
rm export.
RobinTail Dec 9, 2024
30e35ef
Add vitest serializer for ZodDiscriminatedUnion.
RobinTail Dec 9, 2024
9efd6ca
Merge branch 'master' into try-sse
RobinTail Dec 9, 2024
d808bd0
Fix: making Parsers partial in relation to ContentType.
RobinTail Dec 9, 2024
cddddce
FIX: avoid sending headers twice if error happened after stream started.
RobinTail Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,40 @@

## Version 21

### v21.5.0

- Feat: Introducing [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events):
- Basic implementation of the event streams feature is now available using `EventStreamFactory` class;
- The new factory is similar to `EndpointsFactory` including the middlewares support;
- Client application can subscribe to the event stream using `EventSource` class instance;
- `Documentation` and `Integration` do not have yet a special depiction of such endpoints;
- This feature is a lightweight alternative to [Zod Sockets](https://github.com/RobinTail/zod-sockets).

```ts
import { z } from "zod";
import { EventStreamFactory } from "express-zod-api";
import { setTimeout } from "node:timers/promises";

const subscriptionEndpoint = EventStreamFactory({
events: { time: z.number().int().positive() },
}).buildVoid({
input: z.object({}), // optional input schema
handler: async ({ options: { emit, isClosed } }) => {
while (!isClosed()) {
emit("time", Date.now());
await setTimeout(1000);
}
},
});
```

```js
const source = new EventSource("https://example.com/api/v1/time");
source.addEventListener("time", (event) => {
const data = JSON.parse(event.data); // number
});
```

### v21.4.0

- Return type of public methods `getTags()` and `getScopes()` of `Endpoint` corrected to `ReadyonlyArray<string>`;
Expand Down
37 changes: 31 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1195,13 +1195,38 @@ createConfig({

## Subscriptions

If you want the user of a client application to be able to subscribe to subsequent updates initiated by the server, the
capabilities of this framework and the HTTP protocol itself would not be enough in this case. I have developed an
additional websocket operating framework, [Zod Sockets](https://github.com/RobinTail/zod-sockets), which has similar
principles and capabilities. Check out an example of the synergy between two frameworks on handling subscription events
in order to emit (broadcast) the `time` event every second with a current time in its payload:
If you want the user of a client application to be able to subscribe to subsequent updates initiated by the server,
consider [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) (SSE) feature.
Client application can subscribe to the event stream using `EventSource` class instance. The following example
demonstrates the implementation emitting the `time` event each second.

https://github.com/RobinTail/zod-sockets#subscriptions
```typescript
import { z } from "zod";
import { EventStreamFactory } from "express-zod-api";
import { setTimeout } from "node:timers/promises";

const subscriptionEndpoint = EventStreamFactory({
events: { time: z.number().int().positive() },
}).buildVoid({
input: z.object({}), // optional input schema
handler: async ({ options: { emit, isClosed } }) => {
while (!isClosed()) {
emit("time", Date.now());
await setTimeout(1000);
}
},
});
```

```js
const source = new EventSource("https://example.com/api/v1/time");
source.addEventListener("time", (event) => {
const data = JSON.parse(event.data); // number
});
```

If you need more capabilities, such as bidirectional event sending, I have developed an additional websocket operating
framework, [Zod Sockets](https://github.com/RobinTail/zod-sockets), which has similar principles and capabilities.

# Integration and Documentation

Expand Down
1 change: 1 addition & 0 deletions example/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const config = createConfig({
tags: {
users: "Everything about the users",
files: "Everything about the files processing",
subscriptions: "Everything about the subscriptions",
},
});

Expand Down
23 changes: 23 additions & 0 deletions example/endpoints/time-subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { z } from "zod";
import { setTimeout } from "node:timers/promises";
import { eventsFactory } from "../factories";

export const subscriptionEndpoint = eventsFactory.buildVoid({
tag: "subscriptions",
input: z.object({
trigger: z.string().optional(),
}),
handler: async ({
input: { trigger },
options: { emit, isClosed },
logger,
}) => {
if (trigger === "failure") throw new Error("Intentional failure");
while (!isClosed()) {
logger.debug("emitting");
emit("time", Date.now());
await setTimeout(1000);
}
logger.debug("closed");
},
});
25 changes: 24 additions & 1 deletion example/example.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@ type PostV1AvatarRawResponse =
| PostV1AvatarRawPositiveResponse
| PostV1AvatarRawNegativeResponse;

type GetV1EventsTimeInput = {
trigger?: string | undefined;
};

type GetV1EventsTimePositiveResponse = {
data: number;
event: "time";
id?: string | undefined;
retry?: number | undefined;
};

type GetV1EventsTimeNegativeResponse = string;

type GetV1EventsTimeResponse =
| GetV1EventsTimePositiveResponse
| GetV1EventsTimeNegativeResponse;

export type Path =
| "/v1/user/retrieve"
| "/v1/user/:id/remove"
Expand All @@ -186,7 +203,8 @@ export type Path =
| "/v1/avatar/send"
| "/v1/avatar/stream"
| "/v1/avatar/upload"
| "/v1/avatar/raw";
| "/v1/avatar/raw"
| "/v1/events/time";

export type Method = "get" | "post" | "put" | "delete" | "patch";

Expand All @@ -200,6 +218,7 @@ export interface Input {
"get /v1/avatar/stream": GetV1AvatarStreamInput;
"post /v1/avatar/upload": PostV1AvatarUploadInput;
"post /v1/avatar/raw": PostV1AvatarRawInput;
"get /v1/events/time": GetV1EventsTimeInput;
}

export interface PositiveResponse {
Expand All @@ -212,6 +231,7 @@ export interface PositiveResponse {
"get /v1/avatar/stream": GetV1AvatarStreamPositiveResponse;
"post /v1/avatar/upload": PostV1AvatarUploadPositiveResponse;
"post /v1/avatar/raw": PostV1AvatarRawPositiveResponse;
"get /v1/events/time": GetV1EventsTimePositiveResponse;
}

export interface NegativeResponse {
Expand All @@ -224,6 +244,7 @@ export interface NegativeResponse {
"get /v1/avatar/stream": GetV1AvatarStreamNegativeResponse;
"post /v1/avatar/upload": PostV1AvatarUploadNegativeResponse;
"post /v1/avatar/raw": PostV1AvatarRawNegativeResponse;
"get /v1/events/time": GetV1EventsTimeNegativeResponse;
}

export interface Response {
Expand All @@ -236,6 +257,7 @@ export interface Response {
"get /v1/avatar/stream": GetV1AvatarStreamResponse;
"post /v1/avatar/upload": PostV1AvatarUploadResponse;
"post /v1/avatar/raw": PostV1AvatarRawResponse;
"get /v1/events/time": GetV1EventsTimeResponse;
}

export type MethodPath = keyof Input;
Expand All @@ -259,6 +281,7 @@ export const endpointTags = {
"get /v1/avatar/stream": ["users", "files"],
"post /v1/avatar/upload": ["files"],
"post /v1/avatar/raw": ["files"],
"get /v1/events/time": ["subscriptions"],
};

export type Implementation = (
Expand Down
46 changes: 46 additions & 0 deletions example/example.documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,50 @@ paths:
status: error
error:
message: Sample error message
/v1/events/time:
get:
operationId: GetV1EventsTime
tags:
- subscriptions
parameters:
- name: trigger
in: query
required: false
description: GET /v1/events/time Parameter
schema:
type: string
responses:
"200":
description: GET /v1/events/time Positive response
content:
text/event-stream:
schema:
type: object
properties:
data:
type: integer
format: int64
exclusiveMinimum: 0
maximum: 9007199254740991
event:
type: string
const: time
id:
type: string
retry:
type: integer
format: int64
exclusiveMinimum: 0
maximum: 9007199254740991
required:
- data
- event
"400":
description: GET /v1/events/time Negative response
content:
text/plain:
schema:
type: string
components:
schemas:
Schema1:
Expand Down Expand Up @@ -603,5 +647,7 @@ tags:
description: Everything about the users
- name: files
description: Everything about the files processing
- name: subscriptions
description: Everything about the subscriptions
servers:
- url: https://example.com
6 changes: 6 additions & 0 deletions example/factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
defaultResultHandler,
ez,
ensureHttpError,
EventStreamFactory,
} from "../src";
import { config } from "./config";
import { authMiddleware } from "./middlewares";
Expand Down Expand Up @@ -110,3 +111,8 @@ export const noContentFactory = new EndpointsFactory({
},
}),
});

export const eventsFactory = new EventStreamFactory({
config,
events: { time: z.number().int().positive() },
});
4 changes: 4 additions & 0 deletions example/routing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { rawAcceptingEndpoint } from "./endpoints/accept-raw";
import { createUserEndpoint } from "./endpoints/create-user";
import { deleteUserEndpoint } from "./endpoints/delete-user";
import { listUsersEndpoint } from "./endpoints/list-users";
import { subscriptionEndpoint } from "./endpoints/time-subscription";
import { uploadAvatarEndpoint } from "./endpoints/upload-avatar";
import { retrieveUserEndpoint } from "./endpoints/retrieve-user";
import { sendAvatarEndpoint } from "./endpoints/send-avatar";
Expand Down Expand Up @@ -35,6 +36,9 @@ export const routing: Routing = {
// raw body acceptance example
raw: rawAcceptingEndpoint,
},
events: {
time: subscriptionEndpoint,
},
},
// path /public serves static files from /example/assets
public: new ServeStatic(join("example", "assets"), {
Expand Down
1 change: 1 addition & 0 deletions src/content-type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export const contentTypes = {
json: "application/json",
upload: "multipart/form-data",
raw: "application/octet-stream",
sse: "text/event-stream",
};

export type ContentType = keyof typeof contentTypes;
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export {
} from "./errors";
export { testEndpoint, testMiddleware } from "./testing";
export { Integration } from "./integration";
export { EventStreamFactory } from "./sse";

export { ez } from "./proprietary-schemas";

Expand Down
2 changes: 1 addition & 1 deletion src/routing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface Routing {
[SEGMENT: string]: Routing | DependsOnMethod | AbstractEndpoint | ServeStatic;
}

export type Parsers = Record<ContentType, RequestHandler[]>;
export type Parsers = Partial<Record<ContentType, RequestHandler[]>>;

export const initRouting = ({
app,
Expand Down
Loading
Loading