Skip to content

Commit

Permalink
refactor(experimental): make abort signals mandatory in subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
steveluscher committed Sep 29, 2023
1 parent d9d004b commit e3fbbcf
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ describe('slotNotifications', () => {

it('produces slot notifications', async () => {
expect.assertions(1);
const slotNotifications = await rpc.slotNotifications().subscribe();
const slotNotifications = await rpc
.slotNotifications()
.subscribe({ abortSignal: new AbortController().signal });
const iterator = slotNotifications[Symbol.asyncIterator]();
await expect(iterator.next()).resolves.toHaveProperty('value', {
parent: expect.any(BigInt),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { SolanaRpcSubscriptions } from '../index';

async () => {
const rpcSubcriptions = null as unknown as RpcSubscriptions<SolanaRpcSubscriptions>;
const slotNotifications = await rpcSubcriptions.slotNotifications().subscribe();
const slotNotifications = await rpcSubcriptions
.slotNotifications()
.subscribe({ abortSignal: new AbortController().signal });

slotNotifications satisfies AsyncIterable<
Readonly<{
Expand Down
36 changes: 24 additions & 12 deletions packages/rpc-transport/src/__tests__/json-rpc-subscription-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
});
});
it('sends a subscription request to the transport', () => {
rpc.thingNotifications(123).subscribe();
rpc.thingNotifications(123).subscribe({ abortSignal: new AbortController().signal });
expect(createWebSocketConnection).toHaveBeenCalledWith(
expect.objectContaining({
payload: {
Expand All @@ -53,7 +53,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
yield { id: 0, result: 42 /* subscription id */ };
return;
});
const thingNotifications = await rpc.thingNotifications(123).subscribe();
const thingNotifications = await rpc
.thingNotifications(123)
.subscribe({ abortSignal: new AbortController().signal });
const iterator = thingNotifications[Symbol.asyncIterator]();
const thingNotificationPromise = iterator.next();
await expect(thingNotificationPromise).resolves.toMatchObject({
Expand All @@ -67,7 +69,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
yield { id: 0, result: 42 /* subscription id */ };
throw new Error('o no');
});
const thingNotifications = await rpc.thingNotifications(123).subscribe();
const thingNotifications = await rpc
.thingNotifications(123)
.subscribe({ abortSignal: new AbortController().signal });
const iterator = thingNotifications[Symbol.asyncIterator]();
const thingNotificationPromise = iterator.next();
await expect(thingNotificationPromise).rejects.toThrow('o no');
Expand Down Expand Up @@ -194,7 +198,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
yield { params: { result: 123, subscription: 41 } };
yield { params: { result: 456, subscription: 42 } };
});
const thingNotifications = await rpc.thingNotifications().subscribe();
const thingNotifications = await rpc
.thingNotifications()
.subscribe({ abortSignal: new AbortController().signal });
const iterator = thingNotifications[Symbol.asyncIterator]();
await expect(iterator.next()).resolves.toHaveProperty('value', 456);
});
Expand All @@ -205,13 +211,15 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
iterable.mockImplementation(async function* () {
yield { id: 0, result: subscriptionId /* subscription id */ };
});
const thingNotificationsPromise = rpc.thingNotifications().subscribe();
const thingNotificationsPromise = rpc
.thingNotifications()
.subscribe({ abortSignal: new AbortController().signal });
await expect(thingNotificationsPromise).rejects.toThrow('Failed to obtain a subscription id');
}
);
it("fatals when called with a method that does not end in 'Notifications'", () => {
expect(() => {
rpc.nonConformingNotif().subscribe();
rpc.nonConformingNotif().subscribe({ abortSignal: new AbortController().signal });
}).toThrow();
});
it('fatals when called with an already aborted signal', async () => {
Expand All @@ -226,7 +234,7 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
iterable.mockImplementation(async function* () {
yield { id: 0, result: undefined /* subscription id */ };
});
const subscribePromise = rpc.thingNotifications().subscribe();
const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal });
await expect(subscribePromise).rejects.toThrow(/Failed to obtain a subscription id from the server/);
});
it('fatals when the server responds with an error', async () => {
Expand All @@ -237,15 +245,15 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
id: 0,
};
});
const subscribePromise = rpc.thingNotifications().subscribe();
const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal });
await expect(subscribePromise).rejects.toThrow(SolanaJsonRpcError);
await expect(subscribePromise).rejects.toThrow(/o no/);
await expect(subscribePromise).rejects.toMatchObject({ code: 123, data: 'abc' });
});
it('throws errors when the connection fails to construct', async () => {
expect.assertions(1);
createWebSocketConnection.mockRejectedValue(new Error('o no'));
const subscribePromise = rpc.thingNotifications().subscribe();
const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal });
await expect(subscribePromise).rejects.toThrow(/o no/);
});
describe('when calling a method having a concrete implementation', () => {
Expand All @@ -265,7 +273,7 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
});
});
it('converts the returned subscription to a JSON-RPC 2.0 message and sends it to the transport', () => {
rpc.nonConformingNotif(123).subscribe();
rpc.nonConformingNotif(123).subscribe({ abortSignal: new AbortController().signal });
expect(createWebSocketConnection).toHaveBeenCalledWith(
expect.objectContaining({
payload: {
Expand Down Expand Up @@ -316,7 +324,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
yield { id: 0, result: 42 /* subscription id */ };
yield { params: { result: 123, subscription: 42 } };
});
const thingNotifications = await rpc.thingNotifications().subscribe();
const thingNotifications = await rpc
.thingNotifications()
.subscribe({ abortSignal: new AbortController().signal });
await thingNotifications[Symbol.asyncIterator]().next();
expect(responseProcessor).toHaveBeenCalledWith(123);
});
Expand All @@ -326,7 +336,9 @@ describe('JSON-RPC 2.0 Subscriptions', () => {
yield { id: 0, result: 42 /* subscription id */ };
yield { params: { result: 123, subscription: 42 } };
});
const thingNotifications = await rpc.thingNotifications().subscribe();
const thingNotifications = await rpc
.thingNotifications()
.subscribe({ abortSignal: new AbortController().signal });
await expect(thingNotifications[Symbol.asyncIterator]().next()).resolves.toHaveProperty(
'value',
'123 processed response'
Expand Down
8 changes: 4 additions & 4 deletions packages/rpc-transport/src/json-rpc-subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ function createPendingRpcSubscription<TRpcSubscriptionMethods, TNotification>(
{ params, subscribeMethodName, unsubscribeMethodName, responseProcessor }: RpcSubscription<TNotification>
): PendingRpcSubscription<TNotification> {
return {
async subscribe(options?: SubscribeOptions): Promise<AsyncIterable<TNotification>> {
options?.abortSignal?.throwIfAborted();
async subscribe({ abortSignal }: SubscribeOptions): Promise<AsyncIterable<TNotification>> {
abortSignal.throwIfAborted();
let subscriptionId: number | undefined;
function handleCleanup() {
if (subscriptionId !== undefined) {
Expand All @@ -49,7 +49,7 @@ function createPendingRpcSubscription<TRpcSubscriptionMethods, TNotification>(
connectionAbortController.abort();
}
}
options?.abortSignal?.addEventListener('abort', handleCleanup);
abortSignal.addEventListener('abort', handleCleanup);
/**
* STEP 1: Send the subscribe message.
*/
Expand All @@ -60,7 +60,7 @@ function createPendingRpcSubscription<TRpcSubscriptionMethods, TNotification>(
signal: connectionAbortController.signal,
});
function handleConnectionCleanup() {
options?.abortSignal?.removeEventListener('abort', handleCleanup);
abortSignal.removeEventListener('abort', handleCleanup);
}
registerIterableCleanup(connection, handleConnectionCleanup);
/**
Expand Down
4 changes: 2 additions & 2 deletions packages/rpc-transport/src/json-rpc-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ export type PendingRpcRequest<TResponse> = {
send(options?: SendOptions): Promise<TResponse>;
};
export type PendingRpcSubscription<TNotification> = {
subscribe(options?: SubscribeOptions): Promise<AsyncIterable<TNotification>>;
subscribe(options: SubscribeOptions): Promise<AsyncIterable<TNotification>>;
};
export type SendOptions = Readonly<{
abortSignal?: AbortSignal;
}>;
export type SubscribeOptions = Readonly<{
abortSignal?: AbortSignal;
abortSignal: AbortSignal;
}>;

/**
Expand Down

0 comments on commit e3fbbcf

Please sign in to comment.