From d475032c981d8ede3de7cc14a0f5468c49b7206b Mon Sep 17 00:00:00 2001 From: Ryo Igarashi Date: Tue, 15 Oct 2024 12:38:50 +0900 Subject: [PATCH] fix: Unsubscribe from WebSocket stream when iterator ended --- src/adapters/action/dispatcher-ws.spec.ts | 8 ++-- src/adapters/ws/web-socket-connector.spec.ts | 7 ++-- src/adapters/ws/web-socket-connector.ts | 14 +++++-- src/adapters/ws/web-socket-subscription.ts | 40 +++++++++++--------- src/interfaces/ws.ts | 4 +- 5 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/adapters/action/dispatcher-ws.spec.ts b/src/adapters/action/dispatcher-ws.spec.ts index 864261be..2b0a545b 100644 --- a/src/adapters/action/dispatcher-ws.spec.ts +++ b/src/adapters/action/dispatcher-ws.spec.ts @@ -1,4 +1,4 @@ -import { MastoUnexpectedError } from "../errors"; +import { MastoUnexpectedError, MastoWebSocketError } from "../errors"; import { createLogger } from "../logger"; import { SerializerNativeImpl } from "../serializers"; import { @@ -29,7 +29,7 @@ describe("DispatcherWs", () => { }).toThrow(MastoUnexpectedError); }); - it("can be disposed", () => { + it("can be disposed", async () => { const connector = new WebSocketConnectorImpl({ constructorParameters: ["wss://example.com"], }); @@ -41,6 +41,8 @@ describe("DispatcherWs", () => { ); dispatcher[Symbol.dispose](); - expect(connector.canAcquire()).toBe(false); + await expect(() => connector.acquire()).rejects.toThrow( + MastoWebSocketError, + ); }); }); diff --git a/src/adapters/ws/web-socket-connector.spec.ts b/src/adapters/ws/web-socket-connector.spec.ts index 7d261f40..f6f15e98 100644 --- a/src/adapters/ws/web-socket-connector.spec.ts +++ b/src/adapters/ws/web-socket-connector.spec.ts @@ -8,7 +8,7 @@ describe("WebSocketConnector", () => { it("returns existing connection if it exists", async () => { const port = await getPort(); const server = new WebSocketServer({ port }); - const connector = new WebSocketConnectorImpl({ + using connector = new WebSocketConnectorImpl({ constructorParameters: [`ws://localhost:${port}`], }); @@ -18,7 +18,6 @@ describe("WebSocketConnector", () => { expect(ws1).toBe(ws2); server.close(); - connector.close(); }); it("rejects if WebSocket closes", async () => { @@ -26,13 +25,13 @@ describe("WebSocketConnector", () => { constructorParameters: [`ws://localhost:0`], }); const promise = connector.acquire(); - connector.close(); + connector[Symbol.dispose](); await expect(promise).rejects.toBeInstanceOf(MastoWebSocketError); }); it("rejects if it reaches max attempts", async () => { - const connector = new WebSocketConnectorImpl({ + using connector = new WebSocketConnectorImpl({ constructorParameters: [`ws://localhost:0`], maxAttempts: 1, }); diff --git a/src/adapters/ws/web-socket-connector.ts b/src/adapters/ws/web-socket-connector.ts index b7337364..819ee851 100644 --- a/src/adapters/ws/web-socket-connector.ts +++ b/src/adapters/ws/web-socket-connector.ts @@ -33,11 +33,11 @@ export class WebSocketConnectorImpl implements WebSocketConnector { }); } - canAcquire(): boolean { - return !this.closed; - } - async acquire(): Promise { + if (this.closed) { + throw new MastoWebSocketError("WebSocket closed"); + } + this.init(); if (this.ws != undefined) { @@ -49,6 +49,12 @@ export class WebSocketConnectorImpl implements WebSocketConnector { return await promiseWithResolvers.promise; } + async *[Symbol.asyncIterator](): AsyncIterableIterator { + while (!this.closed) { + yield await this.acquire(); + } + } + close(): void { this.closed = true; this.ws?.close(); diff --git a/src/adapters/ws/web-socket-subscription.ts b/src/adapters/ws/web-socket-subscription.ts index f025f464..c7957509 100644 --- a/src/adapters/ws/web-socket-subscription.ts +++ b/src/adapters/ws/web-socket-subscription.ts @@ -25,31 +25,35 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription { async *values(): AsyncIterableIterator { this.logger?.log("info", "Subscribing to stream", this.stream); - while (this.connector.canAcquire()) { - this.connection = await this.connector.acquire(); + try { + for await (const connection of this.connector) { + this.connection = connection; - const data = this.serializer.serialize("json", { - type: "subscribe", - stream: this.stream, - ...this.params, - }); + const data = this.serializer.serialize("json", { + type: "subscribe", + stream: this.stream, + ...this.params, + }); - this.logger?.log("debug", "↑ WEBSOCKET", data); - this.connection.send(data); - this.counter.increment(this.stream, this.params); + this.logger?.log("debug", "↑ WEBSOCKET", data); + this.connection.send(data); + this.counter.increment(this.stream, this.params); - const messages = toAsyncIterable(this.connection); + const messages = toAsyncIterable(this.connection); - for await (const message of messages) { - const event = await this.parseMessage(message.data as string); + for await (const message of messages) { + const event = await this.parseMessage(message.data as string); - if (!this.test(event)) { - continue; - } + if (!this.test(event)) { + continue; + } - this.logger?.log("debug", "↓ WEBSOCKET", event); - yield event; + this.logger?.log("debug", "↓ WEBSOCKET", event); + yield event; + } } + } finally { + this.unsubscribe(); } } diff --git a/src/interfaces/ws.ts b/src/interfaces/ws.ts index 1b091497..79d664c8 100644 --- a/src/interfaces/ws.ts +++ b/src/interfaces/ws.ts @@ -1,9 +1,9 @@ import { type WebSocket } from "isomorphic-ws"; -export interface WebSocketConnector { +// eslint-disable-next-line prettier/prettier +export interface WebSocketConnector extends AsyncIterable { acquire(): Promise; close(): void; - canAcquire(): boolean; } export interface WebSocketSubscriptionCounter {