Skip to content

Commit

Permalink
fix: Unsubscribe from WebSocket stream when iterator ended
Browse files Browse the repository at this point in the history
  • Loading branch information
neet committed Oct 15, 2024
1 parent f0796fc commit d475032
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 31 deletions.
8 changes: 5 additions & 3 deletions src/adapters/action/dispatcher-ws.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MastoUnexpectedError } from "../errors";
import { MastoUnexpectedError, MastoWebSocketError } from "../errors";
import { createLogger } from "../logger";
import { SerializerNativeImpl } from "../serializers";
import {
Expand Down Expand Up @@ -29,7 +29,7 @@ describe("DispatcherWs", () => {
}).toThrow(MastoUnexpectedError);
});

it("can be disposed", () => {
it("can be disposed", async () => {
const connector = new WebSocketConnectorImpl({
constructorParameters: ["wss://example.com"],
});
Expand All @@ -41,6 +41,8 @@ describe("DispatcherWs", () => {
);

dispatcher[Symbol.dispose]();
expect(connector.canAcquire()).toBe(false);
await expect(() => connector.acquire()).rejects.toThrow(
MastoWebSocketError,
);
});
});
7 changes: 3 additions & 4 deletions src/adapters/ws/web-socket-connector.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ describe("WebSocketConnector", () => {
it("returns existing connection if it exists", async () => {
const port = await getPort();
const server = new WebSocketServer({ port });
const connector = new WebSocketConnectorImpl({
using connector = new WebSocketConnectorImpl({
constructorParameters: [`ws://localhost:${port}`],
});

Expand All @@ -18,21 +18,20 @@ describe("WebSocketConnector", () => {
expect(ws1).toBe(ws2);

server.close();
connector.close();
});

it("rejects if WebSocket closes", async () => {
const connector = new WebSocketConnectorImpl({
constructorParameters: [`ws://localhost:0`],
});
const promise = connector.acquire();
connector.close();
connector[Symbol.dispose]();

await expect(promise).rejects.toBeInstanceOf(MastoWebSocketError);
});

it("rejects if it reaches max attempts", async () => {
const connector = new WebSocketConnectorImpl({
using connector = new WebSocketConnectorImpl({
constructorParameters: [`ws://localhost:0`],
maxAttempts: 1,
});
Expand Down
14 changes: 10 additions & 4 deletions src/adapters/ws/web-socket-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ export class WebSocketConnectorImpl implements WebSocketConnector {
});
}

canAcquire(): boolean {
return !this.closed;
}

async acquire(): Promise<WebSocket> {
if (this.closed) {
throw new MastoWebSocketError("WebSocket closed");

Check warning on line 38 in src/adapters/ws/web-socket-connector.ts

View check run for this annotation

Codecov / codecov/patch

src/adapters/ws/web-socket-connector.ts#L38

Added line #L38 was not covered by tests
}

this.init();

if (this.ws != undefined) {
Expand All @@ -49,6 +49,12 @@ export class WebSocketConnectorImpl implements WebSocketConnector {
return await promiseWithResolvers.promise;
}

async *[Symbol.asyncIterator](): AsyncIterableIterator<WebSocket> {
while (!this.closed) {
yield await this.acquire();
}
}

close(): void {
this.closed = true;
this.ws?.close();
Expand Down
40 changes: 22 additions & 18 deletions src/adapters/ws/web-socket-subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,35 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
async *values(): AsyncIterableIterator<mastodon.streaming.Event> {
this.logger?.log("info", "Subscribing to stream", this.stream);

while (this.connector.canAcquire()) {
this.connection = await this.connector.acquire();
try {
for await (const connection of this.connector) {
this.connection = connection;

const data = this.serializer.serialize("json", {
type: "subscribe",
stream: this.stream,
...this.params,
});
const data = this.serializer.serialize("json", {
type: "subscribe",
stream: this.stream,
...this.params,
});

this.logger?.log("debug", "↑ WEBSOCKET", data);
this.connection.send(data);
this.counter.increment(this.stream, this.params);
this.logger?.log("debug", "↑ WEBSOCKET", data);
this.connection.send(data);
this.counter.increment(this.stream, this.params);

const messages = toAsyncIterable(this.connection);
const messages = toAsyncIterable(this.connection);

for await (const message of messages) {
const event = await this.parseMessage(message.data as string);
for await (const message of messages) {
const event = await this.parseMessage(message.data as string);

if (!this.test(event)) {
continue;
}
if (!this.test(event)) {
continue;
}

this.logger?.log("debug", "↓ WEBSOCKET", event);
yield event;
this.logger?.log("debug", "↓ WEBSOCKET", event);
yield event;
}
}
} finally {
this.unsubscribe();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/ws.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { type WebSocket } from "isomorphic-ws";

export interface WebSocketConnector {
// eslint-disable-next-line prettier/prettier
export interface WebSocketConnector extends AsyncIterable<WebSocket> {
acquire(): Promise<WebSocket>;
close(): void;
canAcquire(): boolean;
}

export interface WebSocketSubscriptionCounter {
Expand Down

0 comments on commit d475032

Please sign in to comment.