From c913f59e7d17ba10661b97663331f377af814695 Mon Sep 17 00:00:00 2001 From: steveluscher Date: Fri, 22 Sep 2023 20:37:48 +0000 Subject: [PATCH] refactor(experimental): a transport that coalesces multiple subscriptions behind a single subscription --- packages/library/package.json | 3 +- .../rpc-request-deduplication-test.ts | 68 +--- .../rpc-subscription-coalescer-test.ts | 325 ++++++++++++++++++ .../library/src/rpc-request-deduplication.ts | 6 - .../library/src/rpc-subscription-coalescer.ts | 132 +++++++ pnpm-lock.yaml | 3 - 6 files changed, 459 insertions(+), 78 deletions(-) create mode 100644 packages/library/src/__tests__/rpc-subscription-coalescer-test.ts create mode 100644 packages/library/src/rpc-subscription-coalescer.ts diff --git a/packages/library/package.json b/packages/library/package.json index f149ddadf1e3..654740aab88c 100644 --- a/packages/library/package.json +++ b/packages/library/package.json @@ -71,8 +71,7 @@ "@solana/keys": "workspace:*", "@solana/rpc-core": "workspace:*", "@solana/rpc-transport": "workspace:*", - "@solana/transactions": "workspace:*", - "fast-stable-stringify": "^1.0.0" + "@solana/transactions": "workspace:*" }, "devDependencies": { "@solana/eslint-config-solana": "^1.0.2", diff --git a/packages/library/src/__tests__/rpc-request-deduplication-test.ts b/packages/library/src/__tests__/rpc-request-deduplication-test.ts index 34f62c495544..cf5b8fefede5 100644 --- a/packages/library/src/__tests__/rpc-request-deduplication-test.ts +++ b/packages/library/src/__tests__/rpc-request-deduplication-test.ts @@ -1,7 +1,4 @@ -import { - getSolanaRpcPayloadDeduplicationKey, - getSolanaRpcSubscriptionPayloadDeduplicationKey, -} from '../rpc-request-deduplication'; +import { getSolanaRpcPayloadDeduplicationKey } from '../rpc-request-deduplication'; describe('getSolanaRpcPayloadDeduplicationKey', () => { it('produces no key for undefined payloads', () => { @@ -55,66 +52,3 @@ describe('getSolanaRpcPayloadDeduplicationKey', () => { ); }); }); - -describe('getSolanaRpcSubscriptionPayloadDeduplicationKey', () => { - it('produces no key for undefined payloads', () => { - expect(getSolanaRpcSubscriptionPayloadDeduplicationKey(undefined)).toBeUndefined(); - }); - it('produces no key for null payloads', () => { - expect(getSolanaRpcSubscriptionPayloadDeduplicationKey(null)).toBeUndefined(); - }); - it('produces no key for array payloads', () => { - expect(getSolanaRpcSubscriptionPayloadDeduplicationKey([])).toBeUndefined(); - }); - it('produces no key for string payloads', () => { - expect(getSolanaRpcSubscriptionPayloadDeduplicationKey('o hai')).toBeUndefined(); - }); - it('produces no key for numeric payloads', () => { - expect(getSolanaRpcSubscriptionPayloadDeduplicationKey(123)).toBeUndefined(); - }); - it('produces no key for bigint payloads', () => { - expect(getSolanaRpcSubscriptionPayloadDeduplicationKey(123n)).toBeUndefined(); - }); - it('produces no key for object payloads that are not JSON-RPC payloads', () => { - expect(getSolanaRpcSubscriptionPayloadDeduplicationKey({})).toBeUndefined(); - }); - it("produces a key for a JSON-RPC payload whose method ends in 'Subscribe'", () => { - expect( - getSolanaRpcSubscriptionPayloadDeduplicationKey({ - id: 1, - jsonrpc: '2.0', - method: 'fooSubscribe', - params: 'foo', - }) - ).toMatchInlineSnapshot(`"["fooSubscribe","foo"]"`); - }); - it("produces no key for a JSON-RPC payload whose method does not end in 'Subscribe'", () => { - expect( - getSolanaRpcSubscriptionPayloadDeduplicationKey({ - id: 1, - jsonrpc: '2.0', - method: 'getFoo', - params: 'foo', - }) - ).toBeUndefined(); - }); - it('produces identical keys for two materially identical JSON-RPC payloads', () => { - expect( - getSolanaRpcSubscriptionPayloadDeduplicationKey({ - id: 1, - jsonrpc: '2.0', - method: 'fooSubscribe', - params: { a: 1, b: { c: [2, 3], d: 4 } }, - }) - ).toEqual( - /* eslint-disable sort-keys-fix/sort-keys-fix */ - getSolanaRpcSubscriptionPayloadDeduplicationKey({ - jsonrpc: '2.0', - method: 'fooSubscribe', - params: { b: { d: 4, c: [2, 3] }, a: 1 }, - id: 2, - }) - /* eslint-enable sort-keys-fix/sort-keys-fix */ - ); - }); -}); diff --git a/packages/library/src/__tests__/rpc-subscription-coalescer-test.ts b/packages/library/src/__tests__/rpc-subscription-coalescer-test.ts new file mode 100644 index 000000000000..3ae9ca714b1d --- /dev/null +++ b/packages/library/src/__tests__/rpc-subscription-coalescer-test.ts @@ -0,0 +1,325 @@ +import { RpcSubscriptions } from '@solana/rpc-transport/dist/types/json-rpc-types'; + +import { getRpcSubscriptionsWithSubscriptionCoalescing } from '../rpc-subscription-coalescer'; + +interface TestRpcSubscriptionNotifications { + nonFunctionProperty: string; + thingNotifications(...args: unknown[]): unknown; +} + +describe('getRpcSubscriptionsWithSubscriptionCoalescing', () => { + let asyncGenerator: jest.Mock>; + let createPendingSubscription: jest.Mock; + let getDeduplicationKey: jest.Mock; + let subscribe: jest.Mock; + let rpcSubscriptions: RpcSubscriptions; + beforeEach(() => { + jest.useFakeTimers(); + asyncGenerator = jest.fn().mockImplementation(async function* () { + yield await new Promise(() => { + /* never resolve */ + }); + }); + getDeduplicationKey = jest.fn(); + subscribe = jest.fn().mockResolvedValue({ + [Symbol.asyncIterator]: asyncGenerator, + }); + createPendingSubscription = jest.fn().mockReturnValue({ subscribe }); + rpcSubscriptions = getRpcSubscriptionsWithSubscriptionCoalescing({ + getDeduplicationKey, + rpcSubscriptions: { + nonFunctionProperty: 'foo', + thingNotifications: createPendingSubscription, + } as RpcSubscriptions, + }); + }); + describe('given invocations that produce the same deduplication key', () => { + beforeEach(() => { + getDeduplicationKey.mockReturnValue('deduplication-key'); + }); + it("creates a pending subscription once, with the first invocation's config", async () => { + expect.assertions(2); + await Promise.all([ + rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }), + rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }), + ]); + expect(createPendingSubscription).toHaveBeenCalledTimes(1); + expect(createPendingSubscription).toHaveBeenCalledWith({ + payload: 'hello', + }); + }); + it('only calls subscribe once, in the same runloop', async () => { + expect.assertions(1); + await Promise.all([ + rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }), + rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }), + ]); + expect(subscribe).toHaveBeenCalledTimes(1); + }); + it('only calls subscribe once, in different runloops', async () => { + expect.assertions(1); + await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }); + await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + expect(subscribe).toHaveBeenCalledTimes(1); + }); + it('delivers different iterables to each subscription, in the same runloop', async () => { + expect.assertions(1); + const [iterableA, iterableB] = await Promise.all([ + rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }), + rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }), + ]); + expect(iterableA).not.toBe(iterableB); + }); + it('delivers different iterables to each subscription, in different runloops', async () => { + expect.assertions(1); + const iterableA = await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }); + const iterableB = await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + expect(iterableA).not.toBe(iterableB); + }); + it('publishes the same messages through both iterables', async () => { + expect.assertions(2); + asyncGenerator.mockImplementation(async function* () { + yield 'hello'; + }); + const iterableA = await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }); + const iterableB = await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + const iteratorA = iterableA[Symbol.asyncIterator](); + const iteratorB = iterableB[Symbol.asyncIterator](); + const messagePromiseA = iteratorA.next(); + const messagePromiseB = iteratorB.next(); + await jest.runAllTimersAsync(); + await expect(messagePromiseA).resolves.toHaveProperty('value', 'hello'); + await expect(messagePromiseB).resolves.toHaveProperty('value', 'hello'); + }); + it('aborting a subscription causes it to return', async () => { + expect.assertions(1); + asyncGenerator.mockImplementation(async function* () { + yield 'hello'; + }); + const abortController = new AbortController(); + const iterable = await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: abortController.signal }); + const iterator = iterable[Symbol.asyncIterator](); + const messagePromise = iterator.next(); + abortController.abort(); + await expect(messagePromise).resolves.toHaveProperty('done', true); + }); + it('aborting one subscription does not abort the other', async () => { + expect.assertions(1); + asyncGenerator.mockImplementation(async function* () { + yield 'hello'; + }); + const abortControllerA = new AbortController(); + await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: abortControllerA.signal }); + const iterableB = await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + const iteratorB = iterableB[Symbol.asyncIterator](); + const messagePromiseB = iteratorB.next(); + abortControllerA.abort(); + await jest.runAllTimersAsync(); + await expect(messagePromiseB).resolves.toHaveProperty('value', 'hello'); + }); + }); + describe('given payloads that produce different deduplication keys', () => { + beforeEach(() => { + let deduplicationKey = 0; + getDeduplicationKey.mockImplementation(() => `${++deduplicationKey}`); + }); + it('creates a pending subscription for each', async () => { + expect.assertions(3); + await Promise.all([ + rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }), + rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }), + ]); + expect(createPendingSubscription).toHaveBeenCalledTimes(2); + expect(createPendingSubscription).toHaveBeenNthCalledWith(1, { + payload: 'hello', + }); + expect(createPendingSubscription).toHaveBeenNthCalledWith(2, { + payload: 'world', + }); + }); + it('calls subscribe once for each subscription, in the same runloop', async () => { + expect.assertions(1); + await Promise.all([ + rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }), + rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }), + ]); + expect(subscribe).toHaveBeenCalledTimes(2); + }); + it('calls subscribe once for each subscription, in different runloops', async () => { + expect.assertions(1); + await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }); + await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + expect(subscribe).toHaveBeenCalledTimes(2); + }); + it('delivers different iterables to each subscription, in the same runloop', async () => { + expect.assertions(1); + const [iterableA, iterableB] = await Promise.all([ + rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }), + rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }), + ]); + expect(iterableA).not.toBe(iterableB); + }); + it('delivers different iterables to each subscription, in different runloops', async () => { + expect.assertions(1); + const iterableA = await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }); + const iterableB = await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + expect(iterableA).not.toBe(iterableB); + }); + it('publishes messages through the correct iterable', async () => { + expect.assertions(2); + subscribe.mockResolvedValueOnce({ + async *[Symbol.asyncIterator]() { + yield 'hello'; + }, + }); + const iterableA = await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }); + subscribe.mockResolvedValueOnce({ + async *[Symbol.asyncIterator]() { + yield 'world'; + }, + }); + const iterableB = await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + const iteratorA = iterableA[Symbol.asyncIterator](); + const iteratorB = iterableB[Symbol.asyncIterator](); + const messagePromiseA = iteratorA.next(); + const messagePromiseB = iteratorB.next(); + await jest.runAllTimersAsync(); + await expect(messagePromiseA).resolves.toHaveProperty('value', 'hello'); + await expect(messagePromiseB).resolves.toHaveProperty('value', 'world'); + }); + it('aborting a subscription causes it to return', async () => { + expect.assertions(1); + asyncGenerator.mockImplementation(async function* () { + yield 'hello'; + }); + const abortController = new AbortController(); + const iterable = await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: abortController.signal }); + const iterator = iterable[Symbol.asyncIterator](); + const messagePromise = iterator.next(); + abortController.abort(); + await expect(messagePromise).resolves.toHaveProperty('done', true); + }); + it('aborting one subscription does not abort the other', async () => { + expect.assertions(1); + asyncGenerator.mockImplementation(async function* () { + yield 'hello'; + }); + const abortControllerA = new AbortController(); + await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: abortControllerA.signal }); + const iterableB = await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + const iteratorB = iterableB[Symbol.asyncIterator](); + const messagePromiseB = iteratorB.next(); + abortControllerA.abort(); + await jest.runAllTimersAsync(); + await expect(messagePromiseB).resolves.toHaveProperty('value', 'hello'); + }); + }); + describe('given payloads that produce no deduplcation key', () => { + beforeEach(() => { + getDeduplicationKey.mockReturnValue(undefined); + }); + it('creates a pending subscription for each', async () => { + expect.assertions(3); + await Promise.all([ + rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }), + rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }), + ]); + expect(createPendingSubscription).toHaveBeenCalledTimes(2); + expect(createPendingSubscription).toHaveBeenNthCalledWith(1, { + payload: 'hello', + }); + expect(createPendingSubscription).toHaveBeenNthCalledWith(2, { + payload: 'world', + }); + }); + it('calls subscribe once for each subscription, in the same runloop', async () => { + expect.assertions(1); + await Promise.all([ + rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }), + rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }), + ]); + expect(subscribe).toHaveBeenCalledTimes(2); + }); + it('calls subscribe once for each subscription, in different runloops', async () => { + expect.assertions(1); + await rpcSubscriptions + .thingNotifications({ payload: 'hello' }) + .subscribe({ abortSignal: new AbortController().signal }); + await rpcSubscriptions + .thingNotifications({ payload: 'world' }) + .subscribe({ abortSignal: new AbortController().signal }); + expect(subscribe).toHaveBeenCalledTimes(2); + }); + }); + it('does not shim non-function properties on the RPC', () => { + expect(rpcSubscriptions.nonFunctionProperty).toBe('foo'); + }); +}); diff --git a/packages/library/src/rpc-request-deduplication.ts b/packages/library/src/rpc-request-deduplication.ts index 8618e9896668..e30c9140022a 100644 --- a/packages/library/src/rpc-request-deduplication.ts +++ b/packages/library/src/rpc-request-deduplication.ts @@ -18,9 +18,3 @@ function isJsonRpcPayload(payload: unknown): payload is Readonly<{ method: strin export function getSolanaRpcPayloadDeduplicationKey(payload: unknown): string | undefined { return isJsonRpcPayload(payload) ? fastStableStringify([payload.method, payload.params]) : undefined; } - -export function getSolanaRpcSubscriptionPayloadDeduplicationKey(payload: unknown): string | undefined { - return isJsonRpcPayload(payload) && payload.method.endsWith('Subscribe') - ? fastStableStringify([payload.method, payload.params]) - : undefined; -} diff --git a/packages/library/src/rpc-subscription-coalescer.ts b/packages/library/src/rpc-subscription-coalescer.ts new file mode 100644 index 000000000000..9ae02b655011 --- /dev/null +++ b/packages/library/src/rpc-subscription-coalescer.ts @@ -0,0 +1,132 @@ +import { PendingRpcSubscription, RpcSubscriptions } from '@solana/rpc-transport/dist/types/json-rpc-types'; + +import { getCachedAbortableIterableFactory } from './cached-abortable-iterable'; + +type CacheKey = string | undefined; +type Config = Readonly<{ + getDeduplicationKey: GetDeduplicationKeyFn; + rpcSubscriptions: RpcSubscriptions; +}>; +type GetDeduplicationKeyFn = (subscriptionMethod: string | symbol, payload: unknown) => CacheKey; + +const EXPLICIT_ABORT_TOKEN = Symbol( + __DEV__ + ? "This symbol is thrown from a subscription's iterator when the subscription is " + + 'explicitly aborted by the user' + : undefined +); + +function registerIterableCleanup(iterable: AsyncIterable, cleanupFn: CallableFunction) { + (async () => { + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of iterable); + } catch { + /* empty */ + } finally { + // Run the cleanup function. + cleanupFn(); + } + })(); +} + +export function getRpcSubscriptionsWithSubscriptionCoalescing({ + getDeduplicationKey, + rpcSubscriptions, +}: Config): RpcSubscriptions { + const cache = new Map>(); + return new Proxy(rpcSubscriptions, { + defineProperty() { + return false; + }, + deleteProperty() { + return false; + }, + get(target, p, receiver) { + const subscriptionMethod = Reflect.get(target, p, receiver); + if (typeof subscriptionMethod !== 'function') { + return subscriptionMethod; + } + return function (...rawParams: unknown[]) { + const deduplicationKey = getDeduplicationKey(p, rawParams); + if (deduplicationKey === undefined) { + return (subscriptionMethod as CallableFunction)(...rawParams); + } + if (cache.has(deduplicationKey)) { + return cache.get(deduplicationKey)!; + } + const iterableFactory = getCachedAbortableIterableFactory< + Parameters['subscribe']>, + AsyncIterable + >({ + getAbortSignalFromInputArgs: ({ abortSignal }) => abortSignal, + getCacheEntryMissingError(deduplicationKey) { + // TODO: Coded error. + return new Error( + `Found no cache entry for subscription with deduplication key \`${deduplicationKey?.toString()}\`` + ); + }, + getCacheKeyFromInputArgs: () => deduplicationKey, + async onCacheHit(_iterable, _config) { + /** + * This transport's goal is to prevent duplicate subscriptions from + * being made. If a cached iterable] is found, do not send the subscribe + * message again. + */ + }, + async onCreateIterable(abortSignal, config) { + const pendingSubscription = (subscriptionMethod as CallableFunction)( + ...rawParams + ) as PendingRpcSubscription; + const iterable = await pendingSubscription.subscribe({ + ...config, + abortSignal, + }); + registerIterableCleanup(iterable, () => { + cache.delete(deduplicationKey); + }); + return iterable; + }, + }); + const pendingSubscription: PendingRpcSubscription = { + async subscribe(...args) { + const iterable = await iterableFactory(...args); + const { abortSignal } = args[0]; + let abortPromise; + return { + ...iterable, + async *[Symbol.asyncIterator]() { + abortPromise ||= abortSignal.aborted + ? Promise.reject(EXPLICIT_ABORT_TOKEN) + : new Promise((_, reject) => { + abortSignal.addEventListener('abort', () => { + reject(EXPLICIT_ABORT_TOKEN); + }); + }); + try { + const iterator = iterable[Symbol.asyncIterator](); + while (true) { + const iteratorResult = await Promise.race([iterator.next(), abortPromise]); + if (iteratorResult.done) { + return; + } else { + yield iteratorResult.value; + } + } + } catch (e) { + if (e === EXPLICIT_ABORT_TOKEN) { + return; + } + cache.delete(deduplicationKey); + throw e; + } + }, + }; + }, + }; + cache.set(deduplicationKey, pendingSubscription); + return pendingSubscription; + }; + }, + }); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0f024c8ee3c5..b7457e33a7dd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -451,9 +451,6 @@ importers: '@solana/transactions': specifier: workspace:* version: link:../transactions - fast-stable-stringify: - specifier: ^1.0.0 - version: 1.0.0 devDependencies: '@solana/eslint-config-solana': specifier: ^1.0.2