Skip to content

Commit

Permalink
Merge pull request #1219 from neet/unsubscribe-generator-finally
Browse files Browse the repository at this point in the history
  • Loading branch information
neet authored Oct 15, 2024
2 parents f0796fc + b7732ed commit a96ac58
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 27 deletions.
8 changes: 5 additions & 3 deletions src/adapters/action/dispatcher-ws.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MastoUnexpectedError } from "../errors";
import { MastoUnexpectedError, MastoWebSocketError } from "../errors";
import { createLogger } from "../logger";
import { SerializerNativeImpl } from "../serializers";
import {
Expand Down Expand Up @@ -29,7 +29,7 @@ describe("DispatcherWs", () => {
}).toThrow(MastoUnexpectedError);
});

it("can be disposed", () => {
it("can be disposed", async () => {
const connector = new WebSocketConnectorImpl({
constructorParameters: ["wss://example.com"],
});
Expand All @@ -41,6 +41,8 @@ describe("DispatcherWs", () => {
);

dispatcher[Symbol.dispose]();
expect(connector.canAcquire()).toBe(false);
await expect(() => connector.acquire()).rejects.toThrow(
MastoWebSocketError,
);
});
});
14 changes: 10 additions & 4 deletions src/adapters/ws/web-socket-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ export class WebSocketConnectorImpl implements WebSocketConnector {
});
}

canAcquire(): boolean {
return !this.closed;
}

async acquire(): Promise<WebSocket> {
if (this.closed) {
throw new MastoWebSocketError("WebSocket closed");
}

this.init();

if (this.ws != undefined) {
Expand All @@ -49,6 +49,12 @@ export class WebSocketConnectorImpl implements WebSocketConnector {
return await promiseWithResolvers.promise;
}

async *[Symbol.asyncIterator](): AsyncIterableIterator<WebSocket> {
while (!this.closed) {
yield await this.acquire();
}
}

close(): void {
this.closed = true;
this.ws?.close();
Expand Down
40 changes: 22 additions & 18 deletions src/adapters/ws/web-socket-subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,35 @@ export class WebSocketSubscription implements mastodon.streaming.Subscription {
async *values(): AsyncIterableIterator<mastodon.streaming.Event> {
this.logger?.log("info", "Subscribing to stream", this.stream);

while (this.connector.canAcquire()) {
this.connection = await this.connector.acquire();
try {
for await (const connection of this.connector) {
this.connection = connection;

const data = this.serializer.serialize("json", {
type: "subscribe",
stream: this.stream,
...this.params,
});
const data = this.serializer.serialize("json", {
type: "subscribe",
stream: this.stream,
...this.params,
});

this.logger?.log("debug", "↑ WEBSOCKET", data);
this.connection.send(data);
this.counter.increment(this.stream, this.params);
this.logger?.log("debug", "↑ WEBSOCKET", data);
this.connection.send(data);
this.counter.increment(this.stream, this.params);

const messages = toAsyncIterable(this.connection);
const messages = toAsyncIterable(this.connection);

for await (const message of messages) {
const event = await this.parseMessage(message.data as string);
for await (const message of messages) {
const event = await this.parseMessage(message.data as string);

if (!this.test(event)) {
continue;
}
if (!this.test(event)) {
continue;
}

this.logger?.log("debug", "↓ WEBSOCKET", event);
yield event;
this.logger?.log("debug", "↓ WEBSOCKET", event);
yield event;
}
}
} finally {
this.unsubscribe();
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/interfaces/ws.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { type WebSocket } from "isomorphic-ws";

export interface WebSocketConnector {
export interface WebSocketConnector extends AsyncIterable<WebSocket> {
acquire(): Promise<WebSocket>;
close(): void;
canAcquire(): boolean;
}

export interface WebSocketSubscriptionCounter {
Expand Down

0 comments on commit a96ac58

Please sign in to comment.