From e3fbbcf8c2c7983c2e921d60edacde00e772463d Mon Sep 17 00:00:00 2001 From: steveluscher Date: Fri, 29 Sep 2023 00:54:43 +0000 Subject: [PATCH] refactor(experimental): make abort signals mandatory in subscriptions --- .../__tests__/slot-notifications-test.ts | 4 ++- .../slot-notifications-type-test.ts | 4 ++- .../__tests__/json-rpc-subscription-test.ts | 36 ++++++++++++------- .../src/json-rpc-subscription.ts | 8 ++--- packages/rpc-transport/src/json-rpc-types.ts | 4 +-- 5 files changed, 36 insertions(+), 20 deletions(-) diff --git a/packages/rpc-core/src/rpc-subscriptions/__tests__/slot-notifications-test.ts b/packages/rpc-core/src/rpc-subscriptions/__tests__/slot-notifications-test.ts index 9280849d3245..e93ff3a40246 100644 --- a/packages/rpc-core/src/rpc-subscriptions/__tests__/slot-notifications-test.ts +++ b/packages/rpc-core/src/rpc-subscriptions/__tests__/slot-notifications-test.ts @@ -20,7 +20,9 @@ describe('slotNotifications', () => { it('produces slot notifications', async () => { expect.assertions(1); - const slotNotifications = await rpc.slotNotifications().subscribe(); + const slotNotifications = await rpc + .slotNotifications() + .subscribe({ abortSignal: new AbortController().signal }); const iterator = slotNotifications[Symbol.asyncIterator](); await expect(iterator.next()).resolves.toHaveProperty('value', { parent: expect.any(BigInt), diff --git a/packages/rpc-core/src/rpc-subscriptions/__typetests__/slot-notifications-type-test.ts b/packages/rpc-core/src/rpc-subscriptions/__typetests__/slot-notifications-type-test.ts index 6261f5ed8da7..adf21363f28b 100644 --- a/packages/rpc-core/src/rpc-subscriptions/__typetests__/slot-notifications-type-test.ts +++ b/packages/rpc-core/src/rpc-subscriptions/__typetests__/slot-notifications-type-test.ts @@ -7,7 +7,9 @@ import { SolanaRpcSubscriptions } from '../index'; async () => { const rpcSubcriptions = null as unknown as RpcSubscriptions; - const slotNotifications = await rpcSubcriptions.slotNotifications().subscribe(); + const slotNotifications = await rpcSubcriptions + .slotNotifications() + .subscribe({ abortSignal: new AbortController().signal }); slotNotifications satisfies AsyncIterable< Readonly<{ diff --git a/packages/rpc-transport/src/__tests__/json-rpc-subscription-test.ts b/packages/rpc-transport/src/__tests__/json-rpc-subscription-test.ts index deba708961ff..626c10ee5541 100644 --- a/packages/rpc-transport/src/__tests__/json-rpc-subscription-test.ts +++ b/packages/rpc-transport/src/__tests__/json-rpc-subscription-test.ts @@ -37,7 +37,7 @@ describe('JSON-RPC 2.0 Subscriptions', () => { }); }); it('sends a subscription request to the transport', () => { - rpc.thingNotifications(123).subscribe(); + rpc.thingNotifications(123).subscribe({ abortSignal: new AbortController().signal }); expect(createWebSocketConnection).toHaveBeenCalledWith( expect.objectContaining({ payload: { @@ -53,7 +53,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => { yield { id: 0, result: 42 /* subscription id */ }; return; }); - const thingNotifications = await rpc.thingNotifications(123).subscribe(); + const thingNotifications = await rpc + .thingNotifications(123) + .subscribe({ abortSignal: new AbortController().signal }); const iterator = thingNotifications[Symbol.asyncIterator](); const thingNotificationPromise = iterator.next(); await expect(thingNotificationPromise).resolves.toMatchObject({ @@ -67,7 +69,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => { yield { id: 0, result: 42 /* subscription id */ }; throw new Error('o no'); }); - const thingNotifications = await rpc.thingNotifications(123).subscribe(); + const thingNotifications = await rpc + .thingNotifications(123) + .subscribe({ abortSignal: new AbortController().signal }); const iterator = thingNotifications[Symbol.asyncIterator](); const thingNotificationPromise = iterator.next(); await expect(thingNotificationPromise).rejects.toThrow('o no'); @@ -194,7 +198,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => { yield { params: { result: 123, subscription: 41 } }; yield { params: { result: 456, subscription: 42 } }; }); - const thingNotifications = await rpc.thingNotifications().subscribe(); + const thingNotifications = await rpc + .thingNotifications() + .subscribe({ abortSignal: new AbortController().signal }); const iterator = thingNotifications[Symbol.asyncIterator](); await expect(iterator.next()).resolves.toHaveProperty('value', 456); }); @@ -205,13 +211,15 @@ describe('JSON-RPC 2.0 Subscriptions', () => { iterable.mockImplementation(async function* () { yield { id: 0, result: subscriptionId /* subscription id */ }; }); - const thingNotificationsPromise = rpc.thingNotifications().subscribe(); + const thingNotificationsPromise = rpc + .thingNotifications() + .subscribe({ abortSignal: new AbortController().signal }); await expect(thingNotificationsPromise).rejects.toThrow('Failed to obtain a subscription id'); } ); it("fatals when called with a method that does not end in 'Notifications'", () => { expect(() => { - rpc.nonConformingNotif().subscribe(); + rpc.nonConformingNotif().subscribe({ abortSignal: new AbortController().signal }); }).toThrow(); }); it('fatals when called with an already aborted signal', async () => { @@ -226,7 +234,7 @@ describe('JSON-RPC 2.0 Subscriptions', () => { iterable.mockImplementation(async function* () { yield { id: 0, result: undefined /* subscription id */ }; }); - const subscribePromise = rpc.thingNotifications().subscribe(); + const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal }); await expect(subscribePromise).rejects.toThrow(/Failed to obtain a subscription id from the server/); }); it('fatals when the server responds with an error', async () => { @@ -237,7 +245,7 @@ describe('JSON-RPC 2.0 Subscriptions', () => { id: 0, }; }); - const subscribePromise = rpc.thingNotifications().subscribe(); + const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal }); await expect(subscribePromise).rejects.toThrow(SolanaJsonRpcError); await expect(subscribePromise).rejects.toThrow(/o no/); await expect(subscribePromise).rejects.toMatchObject({ code: 123, data: 'abc' }); @@ -245,7 +253,7 @@ describe('JSON-RPC 2.0 Subscriptions', () => { it('throws errors when the connection fails to construct', async () => { expect.assertions(1); createWebSocketConnection.mockRejectedValue(new Error('o no')); - const subscribePromise = rpc.thingNotifications().subscribe(); + const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal }); await expect(subscribePromise).rejects.toThrow(/o no/); }); describe('when calling a method having a concrete implementation', () => { @@ -265,7 +273,7 @@ describe('JSON-RPC 2.0 Subscriptions', () => { }); }); it('converts the returned subscription to a JSON-RPC 2.0 message and sends it to the transport', () => { - rpc.nonConformingNotif(123).subscribe(); + rpc.nonConformingNotif(123).subscribe({ abortSignal: new AbortController().signal }); expect(createWebSocketConnection).toHaveBeenCalledWith( expect.objectContaining({ payload: { @@ -316,7 +324,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => { yield { id: 0, result: 42 /* subscription id */ }; yield { params: { result: 123, subscription: 42 } }; }); - const thingNotifications = await rpc.thingNotifications().subscribe(); + const thingNotifications = await rpc + .thingNotifications() + .subscribe({ abortSignal: new AbortController().signal }); await thingNotifications[Symbol.asyncIterator]().next(); expect(responseProcessor).toHaveBeenCalledWith(123); }); @@ -326,7 +336,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => { yield { id: 0, result: 42 /* subscription id */ }; yield { params: { result: 123, subscription: 42 } }; }); - const thingNotifications = await rpc.thingNotifications().subscribe(); + const thingNotifications = await rpc + .thingNotifications() + .subscribe({ abortSignal: new AbortController().signal }); await expect(thingNotifications[Symbol.asyncIterator]().next()).resolves.toHaveProperty( 'value', '123 processed response' diff --git a/packages/rpc-transport/src/json-rpc-subscription.ts b/packages/rpc-transport/src/json-rpc-subscription.ts index 40f61b02892c..efbb44edd476 100644 --- a/packages/rpc-transport/src/json-rpc-subscription.ts +++ b/packages/rpc-transport/src/json-rpc-subscription.ts @@ -36,8 +36,8 @@ function createPendingRpcSubscription( { params, subscribeMethodName, unsubscribeMethodName, responseProcessor }: RpcSubscription ): PendingRpcSubscription { return { - async subscribe(options?: SubscribeOptions): Promise> { - options?.abortSignal?.throwIfAborted(); + async subscribe({ abortSignal }: SubscribeOptions): Promise> { + abortSignal.throwIfAborted(); let subscriptionId: number | undefined; function handleCleanup() { if (subscriptionId !== undefined) { @@ -49,7 +49,7 @@ function createPendingRpcSubscription( connectionAbortController.abort(); } } - options?.abortSignal?.addEventListener('abort', handleCleanup); + abortSignal.addEventListener('abort', handleCleanup); /** * STEP 1: Send the subscribe message. */ @@ -60,7 +60,7 @@ function createPendingRpcSubscription( signal: connectionAbortController.signal, }); function handleConnectionCleanup() { - options?.abortSignal?.removeEventListener('abort', handleCleanup); + abortSignal.removeEventListener('abort', handleCleanup); } registerIterableCleanup(connection, handleConnectionCleanup); /** diff --git a/packages/rpc-transport/src/json-rpc-types.ts b/packages/rpc-transport/src/json-rpc-types.ts index bd1e93ff2515..4c971b370b27 100644 --- a/packages/rpc-transport/src/json-rpc-types.ts +++ b/packages/rpc-transport/src/json-rpc-types.ts @@ -42,13 +42,13 @@ export type PendingRpcRequest = { send(options?: SendOptions): Promise; }; export type PendingRpcSubscription = { - subscribe(options?: SubscribeOptions): Promise>; + subscribe(options: SubscribeOptions): Promise>; }; export type SendOptions = Readonly<{ abortSignal?: AbortSignal; }>; export type SubscribeOptions = Readonly<{ - abortSignal?: AbortSignal; + abortSignal: AbortSignal; }>; /**