From d2f14e7745a900b54a4e83d4d327a63f30876a8e Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 16 May 2024 19:51:11 +0300 Subject: [PATCH 1/9] Notify for changes in a shape subscription --- clients/typescript/src/notifiers/event.ts | 55 ++++++++++++ clients/typescript/src/notifiers/index.ts | 26 ++++++ clients/typescript/src/satellite/process.ts | 46 +++++++++- .../typescript/src/satellite/shapes/index.ts | 87 +------------------ .../src/satellite/shapes/shapeManager.ts | 25 ++++-- clients/typescript/test/satellite/process.ts | 9 +- 6 files changed, 150 insertions(+), 98 deletions(-) diff --git a/clients/typescript/src/notifiers/event.ts b/clients/typescript/src/notifiers/event.ts index 7dd408aee1..60cc004c30 100644 --- a/clients/typescript/src/notifiers/event.ts +++ b/clients/typescript/src/notifiers/event.ts @@ -19,14 +19,18 @@ import { Notifier, PotentialChangeCallback, PotentialChangeNotification, + ShapeSubscriptionSyncStatusChangeCallback, + ShapeSubscriptionSyncStatusChangeNotification, UnsubscribeFunction, } from './index' +import { SyncStatus } from '../client/model/shapes' export const EVENT_NAMES = { authChange: 'auth:changed', actualDataChange: 'data:actually:changed', potentialDataChange: 'data:potentially:changed', connectivityStateChange: 'network:connectivity:changed', + shapeSubscriptionStatusChange: 'shape:status:changed', } // Initialise global emitter to be shared between all @@ -201,6 +205,41 @@ export class EventNotifier implements Notifier { } } + shapeSubscriptionSyncStatusChanged( + dbName: string, + key: string, + status: SyncStatus + ): void { + if (!this._hasDbName(dbName)) { + return + } + + this._emitShapeSubscriptionSyncStatusChange(dbName, key, status) + } + + subscribeToShapeSubscriptionSyncStatusChanges( + callback: ShapeSubscriptionSyncStatusChangeCallback + ): UnsubscribeFunction { + const thisHasDbName = this._hasDbName.bind(this) + + const wrappedCallback = ( + notification: ShapeSubscriptionSyncStatusChangeNotification + ) => { + if (thisHasDbName(notification.dbName)) { + callback(notification) + } + } + + this._subscribe(EVENT_NAMES.shapeSubscriptionStatusChange, wrappedCallback) + + return () => { + this._unsubscribe( + EVENT_NAMES.shapeSubscriptionStatusChange, + wrappedCallback + ) + } + } + _getDbNames(): DbName[] { const idx = this.attachedDbIndex @@ -261,6 +300,22 @@ export class EventNotifier implements Notifier { return notification } + _emitShapeSubscriptionSyncStatusChange( + dbName: DbName, + key: string, + status: SyncStatus + ): ShapeSubscriptionSyncStatusChangeNotification { + const notification = { + dbName: dbName, + key: key, + status: status, + } + + this._emit(EVENT_NAMES.shapeSubscriptionStatusChange, notification) + + return notification + } + _emit(eventName: string, notification: Notification) { this.events.emit(eventName, notification) } diff --git a/clients/typescript/src/notifiers/index.ts b/clients/typescript/src/notifiers/index.ts index 0f6a6bfefa..b681e324cc 100644 --- a/clients/typescript/src/notifiers/index.ts +++ b/clients/typescript/src/notifiers/index.ts @@ -1,4 +1,5 @@ import { AuthState } from '../auth/index' +import { SyncStatus } from '../client/model/shapes' import { QualifiedTablename } from '../util/tablename' import { ConnectivityState, @@ -39,11 +40,18 @@ export interface ConnectivityStateChangeNotification { connectivityState: ConnectivityState } +export interface ShapeSubscriptionSyncStatusChangeNotification { + dbName: DbName + key: string + status: SyncStatus +} + export type Notification = | AuthStateNotification | ChangeNotification | PotentialChangeNotification | ConnectivityStateChangeNotification + | ShapeSubscriptionSyncStatusChangeNotification export type AuthStateCallback = (notification: AuthStateNotification) => void export type ChangeCallback = (notification: ChangeNotification) => void @@ -54,11 +62,16 @@ export type ConnectivityStateChangeCallback = ( notification: ConnectivityStateChangeNotification ) => void +export type ShapeSubscriptionSyncStatusChangeCallback = ( + notification: ShapeSubscriptionSyncStatusChangeNotification +) => void + export type NotificationCallback = | AuthStateCallback | ChangeCallback | PotentialChangeCallback | ConnectivityStateChangeCallback + | ShapeSubscriptionSyncStatusChangeCallback export type UnsubscribeFunction = () => void @@ -129,4 +142,17 @@ export interface Notifier { subscribeToConnectivityStateChanges( callback: ConnectivityStateChangeCallback ): UnsubscribeFunction + + // Notification for shape subscription sync status changes. + // Each subscription is associated with a key a notification will fire + // on every status change + shapeSubscriptionSyncStatusChanged( + dbName: DbName, + key: string, + status: SyncStatus + ): void + + subscribeToShapeSubscriptionSyncStatusChanges( + callback: ShapeSubscriptionSyncStatusChangeCallback + ): UnsubscribeFunction } diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index e8404329a5..40e0a30824 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -446,6 +446,14 @@ export class SatelliteProcess implements Satellite { if (error) throw error + // notify subscribers of change + this.notifier.shapeSubscriptionSyncStatusChanged( + this.dbName, + request.key, + this.syncStatus(request.key) + ) + + // persist subscription metadata await this._setMeta('subscriptions', this.subscriptionManager.serialize()) return { @@ -470,7 +478,7 @@ export class SatelliteProcess implements Satellite { ) } else { return this.unsubscribeIds( - this.subscriptionManager.getServerID(target.shapes) + this.subscriptionManager.getServerIDsForShapes(target.shapes) ) } } @@ -482,6 +490,19 @@ export class SatelliteProcess implements Satellite { // If the server didn't send an error, we persist the fact the subscription was deleted. this.subscriptionManager.unsubscribeMade(subscriptionIds) + + // notify subscribers of change + subscriptionIds.forEach((subId) => { + const key = this.subscriptionManager.getKeyForServerID(subId) + if (!key) return + this.notifier.shapeSubscriptionSyncStatusChanged( + this.dbName, + key, + this.syncStatus(key) + ) + }) + + // persist subscription metadata await this.adapter.run( this._setMetaStatement( 'subscriptions', @@ -501,6 +522,18 @@ export class SatelliteProcess implements Satellite { [], subsData.subscriptionId ) + + // notify subscribers of change + const key = this.subscriptionManager.getKeyForServerID( + subsData.subscriptionId + ) + if (key) + this.notifier.shapeSubscriptionSyncStatusChanged( + this.dbName, + key, + this.syncStatus(key) + ) + const toBeUnsubbed = afterApply() if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed) } @@ -1551,6 +1584,17 @@ export class SatelliteProcess implements Satellite { ) this.subscriptionManager.goneBatchDelivered(subscriptionIds) + // notify subscribers of change + subscriptionIds.forEach((subId) => { + const key = this.subscriptionManager.getKeyForServerID(subId) + if (!key) return + this.notifier.shapeSubscriptionSyncStatusChanged( + this.dbName, + key, + this.syncStatus(key) + ) + }) + this._notifyChanges(fakeOplogEntries, 'remote') } diff --git a/clients/typescript/src/satellite/shapes/index.ts b/clients/typescript/src/satellite/shapes/index.ts index 917c8e8573..bb998fa469 100644 --- a/clients/typescript/src/satellite/shapes/index.ts +++ b/clients/typescript/src/satellite/shapes/index.ts @@ -1,93 +1,8 @@ import uniqWith from 'lodash.uniqwith' -import { - Shape, - ShapeDefinition, - ShapeRequest, - SubscriptionData, - SubscriptionId, -} from './types' +import { Shape } from './types' import { QualifiedTablename } from '../../util' -/** - * Manages the state of satellite shape subscriptions - */ -export interface SubscriptionsManager { - /** - * Stores the identifier for a subscription - * request that was accepted by the server - * - * @param subId the identifier of the subscription - * @param shapeRequests the shapes definitions of the request - */ - subscriptionRequested(subId: string, shapeRequests: ShapeRequest[]): void - - /** - * Cancel the subscription with the given subscription id - * - * @param subId the identifier of the subscription - */ - subscriptionCancelled(subId: string): void - - /** - * Registers that a subsciption that was in-flight is now - * delivered. - * @param data the data for the subscription - */ - subscriptionDelivered(data: SubscriptionData): void - - /** - * Returns the shape definitions for subscriptions avalailable locally - * @param subId the identifier of the subscription - */ - shapesForActiveSubscription(subId: string): ShapeDefinition[] | undefined - - /** - * @returns An array of fulfilled subscriptions that are active. - */ - getFulfilledSubscriptions(): SubscriptionId[] - - /** - * Check if a subscription with exactly the same shape requests has already been issued - * @param shapes Shapes for a potential request - */ - getDuplicatingSubscription( - shapes: Shape[] - ): null | { inFlight: string } | { fulfilled: string } - - /** - * Deletes the subscription(s) from the manager. - * @param subId the identifier of the subscription or an array of subscription identifiers - */ - unsubscribe(subId: SubscriptionId[]): void - - /** - * Delete the subscriptions from the manager and call a GC function - */ - unsubscribeAndGC(subIds: SubscriptionId[]): Promise - - /** - * Deletes all subscriptions from the manager. Useful to - * reset the state of the manager. Calls the configured GC. - * Returns the subscription identifiers of all subscriptions - * that were deleted. - */ - unsubscribeAllAndGC(): Promise - - /** - * Converts the state of the manager to a string format that - * can be used to persist it - */ - serialize(): string - - /** - * loads the subscription manager state from a text representation - */ - setState(serialized: string): void - - hash(shapes: Shape[]): string -} - /** List all tables covered by a given shape */ export function getAllTablesForShape( shape: Shape, diff --git a/clients/typescript/src/satellite/shapes/shapeManager.ts b/clients/typescript/src/satellite/shapes/shapeManager.ts index 60bb31b686..145c9c8a42 100644 --- a/clients/typescript/src/satellite/shapes/shapeManager.ts +++ b/clients/typescript/src/satellite/shapes/shapeManager.ts @@ -191,11 +191,7 @@ export class ShapeManager { syncFailed: () => void promise: Promise } { - // TODO: This sorts the shapes objects for hashing to make sure that order of includes - // does not affect the hash. This has the unfortunate consequence of sorting the FK spec, - // but the chance of a table having two multi-column FKs over same columns BUT in a - // different order feels much lower than people using includes in an arbitrary order. - const shapeHash = hash(shapes, { unorderedArrays: true }) + const shapeHash = this.hashShapes(shapes) const keyOrHash = key ?? shapeHash /* Since multiple requests may have the same key, we'll need to differentiate them * based on both hash and key. We use `:` to join them because hash is base64 that @@ -367,12 +363,27 @@ export class ShapeManager { .filter(onlyDefined) } - public getServerID(shapes: Shape[]): string[] { - const shapeHash = hash(shapes, { unorderedArrays: true }) + public getServerIDsForShapes(shapes: Shape[]): string[] { + const shapeHash = this.hashShapes(shapes) const fullKey = makeFullKey(shapeHash, shapeHash) const serverId = this.knownSubscriptions[fullKey]?.serverId return serverId ? [serverId] : [] } + + public getKeyForServerID(serverId: string): string | undefined { + const fullKey = this.serverIds.get(serverId) + if (fullKey === undefined) return + const [_hash, key] = splitFullKey(fullKey) + return key + } + + public hashShapes(shapes: Shape[]): string { + // TODO: This sorts the shapes objects for hashing to make sure that order of includes + // does not affect the hash. This has the unfortunate consequence of sorting the FK spec, + // but the chance of a table having two multi-column FKs over same columns BUT in a + // different order feels much lower than people using includes in an arbitrary order. + return hash(shapes, { unorderedArrays: true }) + } } function onlyDefined(x: T | undefined): x is T { diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index 85d03b0c0f..743df65ed7 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -1683,10 +1683,11 @@ export const processTests = (test: TestFn) => { const { synced } = await satellite.subscribe([shapeDef]) await synced - // first notification is 'connected' - t.is(notifier.notifications.length, 2) - t.is(notifier.notifications[1].changes.length, 1) - t.deepEqual(notifier.notifications[1].changes[0], { + // first notification is 'connected', second is establishing shape, + // third one is initial sync, last one is shape established + t.is(notifier.notifications.length, 4) + t.is(notifier.notifications[2].changes.length, 1) + t.deepEqual(notifier.notifications[2].changes[0], { qualifiedTablename: qualifiedTableName, recordChanges: [ { From ddf8eb1777fa390ae94cab86a8c828eade161c9b Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 16 May 2024 19:55:43 +0300 Subject: [PATCH 2/9] Add notifier tests for shape sub --- .../typescript/test/notifiers/event.test.ts | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/clients/typescript/test/notifiers/event.test.ts b/clients/typescript/test/notifiers/event.test.ts index 19cdfb3739..600fe66118 100644 --- a/clients/typescript/test/notifiers/event.test.ts +++ b/clients/typescript/test/notifiers/event.test.ts @@ -1,5 +1,8 @@ import test from 'ava' -import { ConnectivityStateChangeNotification } from '../../src/notifiers' +import { + ConnectivityStateChangeNotification, + ShapeSubscriptionSyncStatusChangeNotification, +} from '../../src/notifiers' import { EventNotifier } from '../../src/notifiers/event' import { QualifiedTablename } from '../../src/util/tablename' @@ -152,3 +155,51 @@ test('empty changes should not emit', async (t) => { source.actuallyChanged('foo.db', [], 'local') t.is(notifications.length, 0) }) + +test('subscribe to shape subscription status changes', async (t) => { + const eventEmitter = new EventEmitter() + const source = new EventNotifier('test.db', eventEmitter) + const target = new EventNotifier('test.db', eventEmitter) + + const notifications: ShapeSubscriptionSyncStatusChangeNotification[] = [] + + target.subscribeToShapeSubscriptionSyncStatusChanges((x) => { + notifications.push(x) + }) + + source.shapeSubscriptionSyncStatusChanged('test.db', 'foo', { + status: 'establishing', + serverId: 'foo', + progress: 'receiving_data', + }) + + t.is(notifications.length, 1) +}) + +test('subscribe to shape subscription status changes is scoped by db name', async (t) => { + const eventEmitter = new EventEmitter() + const source = new EventNotifier('test.db', eventEmitter) + const target = new EventNotifier('test.db', eventEmitter) + + const notifications: ShapeSubscriptionSyncStatusChangeNotification[] = [] + + target.subscribeToShapeSubscriptionSyncStatusChanges((x) => { + notifications.push(x) + }) + + source.shapeSubscriptionSyncStatusChanged('test.db', 'foo', { + status: 'establishing', + serverId: 'foo', + progress: 'receiving_data', + }) + + t.is(notifications.length, 1) + + source.shapeSubscriptionSyncStatusChanged('non-existing.db', 'foo', { + status: 'establishing', + serverId: 'foo', + progress: 'receiving_data', + }) + + t.is(notifications.length, 1) +}) From acb19700b1c500ea8c5f9c29d18822c1ded9013f Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 16 May 2024 20:37:53 +0300 Subject: [PATCH 3/9] Add satellite test for shape notifications --- clients/typescript/src/satellite/process.ts | 27 ++++-- clients/typescript/test/satellite/process.ts | 97 ++++++++++++++++++++ 2 files changed, 115 insertions(+), 9 deletions(-) diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index 40e0a30824..4d5b58b1fa 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -523,16 +523,20 @@ export class SatelliteProcess implements Satellite { subsData.subscriptionId ) - // notify subscribers of change + // notify subscribers of change (if finished delivering data) const key = this.subscriptionManager.getKeyForServerID( subsData.subscriptionId ) - if (key) - this.notifier.shapeSubscriptionSyncStatusChanged( - this.dbName, - key, - this.syncStatus(key) - ) + if (key) { + const syncStatus = this.syncStatus(key) + if (syncStatus?.status === 'active') { + this.notifier.shapeSubscriptionSyncStatusChanged( + this.dbName, + key, + syncStatus + ) + } + } const toBeUnsubbed = afterApply() if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed) @@ -1582,11 +1586,16 @@ export class SatelliteProcess implements Satellite { ...stmts, ...this._enableTriggers(affectedTables) ) + + // retrieve sub keys before they get removed + const subKeys = subscriptionIds.map((x) => + this.subscriptionManager.getKeyForServerID(x) + ) + this.subscriptionManager.goneBatchDelivered(subscriptionIds) // notify subscribers of change - subscriptionIds.forEach((subId) => { - const key = this.subscriptionManager.getKeyForServerID(subId) + subKeys.forEach((key) => { if (!key) return this.notifier.shapeSubscriptionSyncStatusChanged( this.dbName, diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index 743df65ed7..5890b593af 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -2661,4 +2661,101 @@ export const processTests = (test: TestFn) => { t.pass() }) + + test('notifies for shape lifecycle', async (t) => { + const { client, satellite, notifier, token } = t.context + const { runMigrations, authState } = t.context + await runMigrations() + + const shapeSubKey = 'foo' + const shapeNotifications = () => + notifier.notifications.filter((n) => n.key !== undefined) + + // relations must be present at subscription delivery + client.setRelations(relations) + client.setRelationData('parent', parentRecord) + client.setRelationData('child', childRecord) + + const conn = await startSatellite(satellite, authState, token) + await conn.connectionPromise + + satellite!.relations = relations + const { synced: syncedFirst } = await satellite.subscribe( + [{ tablename: 'parent' }], + shapeSubKey + ) + + // first one is establishing + + t.is(shapeNotifications().length, 1) + const firstNotification = shapeNotifications()[0] + t.is(firstNotification.key, shapeSubKey) + t.is(firstNotification.status.status, 'establishing') + t.is(firstNotification.status.progress, 'receiving_data') + const firstServerId = firstNotification.status.serverId + t.true(typeof firstServerId === 'string') + + await syncedFirst + + // second one is active + t.is(shapeNotifications().length, 2) + const secondNotification = shapeNotifications()[1] + t.is(secondNotification.key, shapeSubKey) + t.is(secondNotification.status.status, 'active') + t.is(secondNotification.status.serverId, firstServerId) + + // change existing sub to different shape + const { synced: syncedSecond } = await satellite.subscribe( + [{ tablename: 'child' }], + shapeSubKey + ) + + // third one is a "mutation" one, receiving new data + t.is(shapeNotifications().length, 3) + const thirdNotifictiaon = shapeNotifications()[2] + t.is(thirdNotifictiaon.key, shapeSubKey) + t.is(thirdNotifictiaon.status.status, 'establishing') + t.is(thirdNotifictiaon.status.progress, 'receiving_data') + t.is(thirdNotifictiaon.status.oldServerId, firstServerId) + const secondServerId = thirdNotifictiaon.status.serverId + t.true(typeof secondServerId === 'string') + + await sleepAsync(0) + + // fourth one is another "mutation" one, removing old data + t.true(shapeNotifications().length >= 4) + const fourthNotifictiaon = shapeNotifications()[3] + t.is(fourthNotifictiaon.key, shapeSubKey) + t.is(fourthNotifictiaon.status.status, 'establishing') + t.is(fourthNotifictiaon.status.progress, 'removing_data') + t.is(fourthNotifictiaon.status.serverId, secondServerId) + + await syncedSecond + + // fifth one should eventually get back to active + t.is(shapeNotifications().length, 5) + const fifthNotification = shapeNotifications()[4] + + t.is(fifthNotification.key, shapeSubKey) + t.is(fifthNotification.status.status, 'active') + t.is(fifthNotification.status.serverId, secondServerId) + + // cancel subscription + await satellite.unsubscribe([shapeSubKey]) + + // sixth one first notifies of cancellation + t.is(shapeNotifications().length, 6) + const sixthNotification = shapeNotifications()[5] + t.is(sixthNotification.key, shapeSubKey) + t.is(sixthNotification.status.status, 'cancelling') + t.is(sixthNotification.status.serverId, secondServerId) + + await sleepAsync(0) + + // last one should indicate that it is gone + t.is(shapeNotifications().length, 7) + const seventhNotification = shapeNotifications()[6] + t.is(seventhNotification.key, shapeSubKey) + t.is(seventhNotification.status, undefined) + }) } From ff6fe67ea1fbc1770ed1b1b6c3c1beb2ea27841d Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 16 May 2024 20:42:38 +0300 Subject: [PATCH 4/9] Fix tests for postgres --- clients/typescript/test/satellite/process.ts | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index 5890b593af..f927609a92 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -2720,40 +2720,35 @@ export const processTests = (test: TestFn) => { const secondServerId = thirdNotifictiaon.status.serverId t.true(typeof secondServerId === 'string') - await sleepAsync(0) + await syncedSecond // fourth one is another "mutation" one, removing old data - t.true(shapeNotifications().length >= 4) + t.is(shapeNotifications().length, 5) + const fourthNotifictiaon = shapeNotifications()[3] t.is(fourthNotifictiaon.key, shapeSubKey) t.is(fourthNotifictiaon.status.status, 'establishing') t.is(fourthNotifictiaon.status.progress, 'removing_data') t.is(fourthNotifictiaon.status.serverId, secondServerId) - await syncedSecond - // fifth one should eventually get back to active - t.is(shapeNotifications().length, 5) const fifthNotification = shapeNotifications()[4] - t.is(fifthNotification.key, shapeSubKey) t.is(fifthNotification.status.status, 'active') t.is(fifthNotification.status.serverId, secondServerId) // cancel subscription await satellite.unsubscribe([shapeSubKey]) + await sleepAsync(100) // sixth one first notifies of cancellation - t.is(shapeNotifications().length, 6) + t.is(shapeNotifications().length, 7) const sixthNotification = shapeNotifications()[5] t.is(sixthNotification.key, shapeSubKey) t.is(sixthNotification.status.status, 'cancelling') t.is(sixthNotification.status.serverId, secondServerId) - await sleepAsync(0) - // last one should indicate that it is gone - t.is(shapeNotifications().length, 7) const seventhNotification = shapeNotifications()[6] t.is(seventhNotification.key, shapeSubKey) t.is(seventhNotification.status, undefined) From 7d8eca2baf647eaa1d8f40551713c2b144ad02a8 Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 16 May 2024 20:42:51 +0300 Subject: [PATCH 5/9] Add changeset --- .changeset/young-plants-try.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/young-plants-try.md diff --git a/.changeset/young-plants-try.md b/.changeset/young-plants-try.md new file mode 100644 index 0000000000..1a8fe7c928 --- /dev/null +++ b/.changeset/young-plants-try.md @@ -0,0 +1,5 @@ +--- +"electric-sql": patch +--- + +Add notifier method `subscribeToShapeSubscriptionSyncStatusChanges` for listening to shape subscription status updates From 0e549a74f3cafab1f13248d379d1de4ad34db27a Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 20 May 2024 11:08:03 +0300 Subject: [PATCH 6/9] Simplify notifier for shape sync status by adding callback to shape manager --- clients/typescript/src/satellite/process.ts | 55 ++----------------- .../src/satellite/shapes/shapeManager.ts | 27 ++++++++- clients/typescript/test/satellite/process.ts | 10 ++-- 3 files changed, 36 insertions(+), 56 deletions(-) diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index 4d5b58b1fa..aa5475f6ee 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -176,7 +176,12 @@ export class SatelliteProcess implements Satellite { this.relations = {} this.previousShapeSubscriptions = [] - this.subscriptionManager = new ShapeManager() + this.subscriptionManager = new ShapeManager( + this.notifier.shapeSubscriptionSyncStatusChanged.bind( + this.notifier, + this.dbName + ) + ) this._throttledSnapshot = throttle( this._mutexSnapshot.bind(this), @@ -446,13 +451,6 @@ export class SatelliteProcess implements Satellite { if (error) throw error - // notify subscribers of change - this.notifier.shapeSubscriptionSyncStatusChanged( - this.dbName, - request.key, - this.syncStatus(request.key) - ) - // persist subscription metadata await this._setMeta('subscriptions', this.subscriptionManager.serialize()) @@ -491,17 +489,6 @@ export class SatelliteProcess implements Satellite { // If the server didn't send an error, we persist the fact the subscription was deleted. this.subscriptionManager.unsubscribeMade(subscriptionIds) - // notify subscribers of change - subscriptionIds.forEach((subId) => { - const key = this.subscriptionManager.getKeyForServerID(subId) - if (!key) return - this.notifier.shapeSubscriptionSyncStatusChanged( - this.dbName, - key, - this.syncStatus(key) - ) - }) - // persist subscription metadata await this.adapter.run( this._setMetaStatement( @@ -523,21 +510,6 @@ export class SatelliteProcess implements Satellite { subsData.subscriptionId ) - // notify subscribers of change (if finished delivering data) - const key = this.subscriptionManager.getKeyForServerID( - subsData.subscriptionId - ) - if (key) { - const syncStatus = this.syncStatus(key) - if (syncStatus?.status === 'active') { - this.notifier.shapeSubscriptionSyncStatusChanged( - this.dbName, - key, - syncStatus - ) - } - } - const toBeUnsubbed = afterApply() if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed) } @@ -1587,23 +1559,8 @@ export class SatelliteProcess implements Satellite { ...this._enableTriggers(affectedTables) ) - // retrieve sub keys before they get removed - const subKeys = subscriptionIds.map((x) => - this.subscriptionManager.getKeyForServerID(x) - ) - this.subscriptionManager.goneBatchDelivered(subscriptionIds) - // notify subscribers of change - subKeys.forEach((key) => { - if (!key) return - this.notifier.shapeSubscriptionSyncStatusChanged( - this.dbName, - key, - this.syncStatus(key) - ) - }) - this._notifyChanges(fakeOplogEntries, 'remote') } diff --git a/clients/typescript/src/satellite/shapes/shapeManager.ts b/clients/typescript/src/satellite/shapes/shapeManager.ts index 145c9c8a42..2fdc67c8bf 100644 --- a/clients/typescript/src/satellite/shapes/shapeManager.ts +++ b/clients/typescript/src/satellite/shapes/shapeManager.ts @@ -16,6 +16,8 @@ interface RequestedSubscription { fullKey: string } +type OnShapeSyncStatusUpdated = (key: string, status: SyncStatus) => void + type OptionalRecord = Record export class ShapeManager { @@ -33,6 +35,8 @@ export class ShapeManager { private serverIds: Map = new Map() private incompleteUnsubs: Set = new Set() + constructor(private onShapeSyncStatusUpdated?: OnShapeSyncStatusUpdated) {} + /** Set internal state using a string returned from {@link ShapeManager#serialize}. */ public initialize(serializedState: string): void { const { unfulfilled, active, known, unsubscribes } = @@ -227,10 +231,18 @@ export class ShapeManager { this.requestedSubscriptions[keyOrHash] = fullKey + let notified = false + this.promises[fullKey] = emptyPromise() return { key: keyOrHash, - setServerId: (id) => this.setServerId(fullKey, id), + setServerId: (id) => { + this.setServerId(fullKey, id) + if (!notified) { + notified = true + this.onShapeSyncStatusUpdated?.(keyOrHash, this.status(keyOrHash)) + } + }, syncFailed: () => this.syncFailed(keyOrHash, fullKey), promise: this.promises[fullKey].promise, } @@ -293,6 +305,7 @@ export class ShapeManager { this.activeSubscriptions[key] = fullKey if (sub.overshadowsFullKeys.length === 0) { + this.onShapeSyncStatusUpdated?.(key, this.status(key)) return () => { this.promises[fullKey].resolve() delete this.promises[fullKey] @@ -307,7 +320,15 @@ export class ShapeManager { } public unsubscribeMade(serverIds: string[]) { - for (const id of serverIds) this.incompleteUnsubs.add(id) + for (const id of serverIds) { + this.incompleteUnsubs.add(id) + + if (this.onShapeSyncStatusUpdated) { + const key = this.getKeyForServerID(id) + if (!key) continue + this.onShapeSyncStatusUpdated(key, this.status(key)) + } + } } /** @@ -339,6 +360,8 @@ export class ShapeManager { this.promises[sub.fullKey].resolve() } } + + this.onShapeSyncStatusUpdated?.(key, this.status(key)) } } diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index f927609a92..5c3dc848d0 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -1683,11 +1683,11 @@ export const processTests = (test: TestFn) => { const { synced } = await satellite.subscribe([shapeDef]) await synced - // first notification is 'connected', second is establishing shape, - // third one is initial sync, last one is shape established + // first notification is 'connected', second and third is establishing shape, + // final one is initial sync t.is(notifier.notifications.length, 4) - t.is(notifier.notifications[2].changes.length, 1) - t.deepEqual(notifier.notifications[2].changes[0], { + t.is(notifier.notifications[3].changes.length, 1) + t.deepEqual(notifier.notifications[3].changes[0], { qualifiedTablename: qualifiedTableName, recordChanges: [ { @@ -2722,9 +2722,9 @@ export const processTests = (test: TestFn) => { await syncedSecond - // fourth one is another "mutation" one, removing old data t.is(shapeNotifications().length, 5) + // fourth one is another "mutation" one, removing old data const fourthNotifictiaon = shapeNotifications()[3] t.is(fourthNotifictiaon.key, shapeSubKey) t.is(fourthNotifictiaon.status.status, 'establishing') From 17a6efa04e0b76d8a5a2365ff968f3c4c31baea6 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 20 May 2024 11:25:42 +0300 Subject: [PATCH 7/9] Add shape sub lifecycle tests to shape manager tests --- .../satellite/shapes/shapeManager.test.ts | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/clients/typescript/test/satellite/shapes/shapeManager.test.ts b/clients/typescript/test/satellite/shapes/shapeManager.test.ts index c4f2bc232b..e29d8009de 100644 --- a/clients/typescript/test/satellite/shapes/shapeManager.test.ts +++ b/clients/typescript/test/satellite/shapes/shapeManager.test.ts @@ -2,6 +2,7 @@ import test, { ExecutionContext } from 'ava' import { ShapeManager } from '../../../src/satellite/shapes/shapeManager' import { sleepAsync } from '../../../src/util' +import { SyncStatus } from '../../../src/client/model/shapes' test('Shape manager stores the subscription and returns a promise', (t) => { // Setup @@ -132,6 +133,107 @@ test('Shape manager correctly rehydrates the state', async (t) => { t.deepEqual(mng.listContinuedSubscriptions(), ['id2']) }) +test('Shape manager notifies about shape sync status lifecycle', async (t) => { + // Setup + const syncStatusUpdates: { key: string; status: SyncStatus }[] = [] + const subKey = 'foo' + const serverId1 = 'testID' + const serverId2 = 'testID2' + const mng = new ShapeManager((key, status) => + syncStatusUpdates.push({ key, status }) + ) + + // Assertions + + // request shape sub + const firstResult = mng.syncRequested([{ tablename: 't1' }], subKey) + t.is(syncStatusUpdates.length, 0) + if ('existing' in firstResult) return void t.fail() + firstResult.setServerId(serverId1) + t.is(syncStatusUpdates.length, 1) + t.deepEqual(syncStatusUpdates[0], { + key: subKey, + status: { + status: 'establishing', + progress: 'receiving_data', + serverId: serverId1, + }, + }) + + // notify when shape data delivered + const cb = mng.dataDelivered(serverId1) + t.deepEqual(cb(), []) + t.is(syncStatusUpdates.length, 2) + t.deepEqual(syncStatusUpdates[1], { + key: subKey, + status: { + status: 'active', + serverId: serverId1, + }, + }) + + // request overshadowing shape for same key + const secondResult = mng.syncRequested([{ tablename: 't2' }], subKey) + if ('existing' in secondResult) return void t.fail() + secondResult.setServerId(serverId2) + t.is(syncStatusUpdates.length, 3) + t.deepEqual(syncStatusUpdates[2], { + key: subKey, + status: { + status: 'establishing', + progress: 'receiving_data', + serverId: serverId2, + oldServerId: serverId1, + }, + }) + + // notify when new shape data delivered once unsubscribe + // of previous shape is made + const cb2 = mng.dataDelivered(serverId2) + t.deepEqual(cb2(), [serverId1]) + t.is(syncStatusUpdates.length, 3) + mng.unsubscribeMade([serverId1]) + t.is(syncStatusUpdates.length, 4) + t.deepEqual(syncStatusUpdates[3], { + key: subKey, + status: { + status: 'establishing', + progress: 'removing_data', + serverId: serverId2, + }, + }) + + // notify when new shape is both delivered and old one cleaned up + mng.goneBatchDelivered([serverId1]) + t.is(syncStatusUpdates.length, 5) + t.deepEqual(syncStatusUpdates[4], { + key: subKey, + status: { + status: 'active', + serverId: serverId2, + }, + }) + + // notify when shape is being cancelled + mng.unsubscribeMade([serverId2]) + t.is(syncStatusUpdates.length, 6) + t.deepEqual(syncStatusUpdates[5], { + key: subKey, + status: { + status: 'cancelling', + serverId: serverId2, + }, + }) + + // notify when shape is completely gone + mng.goneBatchDelivered([serverId2]) + t.is(syncStatusUpdates.length, 7) + t.deepEqual(syncStatusUpdates[6], { + key: subKey, + status: undefined, + }) +}) + async function promiseResolved( t: ExecutionContext, promise: Promise, From b065be03825785516bedd4bc00805a9a30761b82 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 20 May 2024 11:29:22 +0300 Subject: [PATCH 8/9] Clean up old shape file entirely --- clients/typescript/src/satellite/mock.ts | 4 ++-- .../typescript/src/satellite/shapes/index.ts | 23 ------------------- .../src/satellite/shapes/shapeManager.ts | 17 ++++++++++---- 3 files changed, 14 insertions(+), 30 deletions(-) delete mode 100644 clients/typescript/src/satellite/shapes/index.ts diff --git a/clients/typescript/src/satellite/mock.ts b/clients/typescript/src/satellite/mock.ts index feb47029fd..a3503bf1d5 100644 --- a/clients/typescript/src/satellite/mock.ts +++ b/clients/typescript/src/satellite/mock.ts @@ -61,8 +61,8 @@ import { } from '../_generated/protocol/satellite' import { ShapeSubscription } from './process' import { DbSchema } from '../client/model/schema' -import { getAllTablesForShape } from './shapes' import { SyncStatus } from '../client/model/shapes' +import { getTableNamesForShapes } from './shapes/shapeManager' export const MOCK_BEHIND_WINDOW_LSN = 42 export const MOCK_INTERNAL_ERROR = 27 @@ -276,7 +276,7 @@ export class MockSatelliteClient const shapeReqToUuid: Record = {} for (const shape of shapes) { - const tables = getAllTablesForShape(shape.definition, 'main') + const tables = getTableNamesForShapes([shape.definition], 'main') for (const { tablename } of tables) { if (tablename === 'failure' || tablename === 'Items') { return Promise.resolve({ diff --git a/clients/typescript/src/satellite/shapes/index.ts b/clients/typescript/src/satellite/shapes/index.ts deleted file mode 100644 index bb998fa469..0000000000 --- a/clients/typescript/src/satellite/shapes/index.ts +++ /dev/null @@ -1,23 +0,0 @@ -import uniqWith from 'lodash.uniqwith' - -import { Shape } from './types' -import { QualifiedTablename } from '../../util' - -/** List all tables covered by a given shape */ -export function getAllTablesForShape( - shape: Shape, - schema = 'main' -): QualifiedTablename[] { - return uniqWith(doGetAllTablesForShape(shape, schema), (a, b) => a.isEqual(b)) -} - -function doGetAllTablesForShape( - shape: Shape, - schema: string -): QualifiedTablename[] { - const includes = - shape.include?.flatMap((x) => doGetAllTablesForShape(x.select, schema)) ?? - [] - includes.push(new QualifiedTablename(schema, shape.tablename)) - return includes -} diff --git a/clients/typescript/src/satellite/shapes/shapeManager.ts b/clients/typescript/src/satellite/shapes/shapeManager.ts index 2fdc67c8bf..b9277c1d34 100644 --- a/clients/typescript/src/satellite/shapes/shapeManager.ts +++ b/clients/typescript/src/satellite/shapes/shapeManager.ts @@ -71,7 +71,7 @@ export class ShapeManager { }): QualifiedTablename[] { const requested = Object.values(this.requestedSubscriptions) - const tables = getTableNames( + const tables = getTableNamesForShapes( Object.values(this.knownSubscriptions) .filter((x) => !requested.includes(x?.fullKey)) .flatMap((x) => x?.shapes) @@ -427,16 +427,23 @@ function splitOnce(str: string, on: string): [string, string] { else return [str.slice(0, found), str.slice(found + 1)] } -function getTableNames(shapes: Shape[], schema: string): QualifiedTablename[] { +export function getTableNamesForShapes( + shapes: Shape[], + schema: string +): QualifiedTablename[] { return uniqWith( - shapes.flatMap((x) => doGetTableNames(x, schema)), + shapes.flatMap((x) => doGetTableNamesForShape(x, schema)), (a, b) => a.isEqual(b) ) } -function doGetTableNames(shape: Shape, schema: string): QualifiedTablename[] { +function doGetTableNamesForShape( + shape: Shape, + schema: string +): QualifiedTablename[] { const includes = - shape.include?.flatMap((x) => doGetTableNames(x.select, schema)) ?? [] + shape.include?.flatMap((x) => doGetTableNamesForShape(x.select, schema)) ?? + [] includes.push(new QualifiedTablename(schema, shape.tablename)) return includes } From e545d2e37f3417fabb547144ba0b2c327b45279a Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 20 May 2024 18:20:16 +0300 Subject: [PATCH 9/9] Address PR comments --- clients/typescript/src/notifiers/event.ts | 6 +----- clients/typescript/src/notifiers/index.ts | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/clients/typescript/src/notifiers/event.ts b/clients/typescript/src/notifiers/event.ts index 60cc004c30..8ff9e2ff4e 100644 --- a/clients/typescript/src/notifiers/event.ts +++ b/clients/typescript/src/notifiers/event.ts @@ -305,11 +305,7 @@ export class EventNotifier implements Notifier { key: string, status: SyncStatus ): ShapeSubscriptionSyncStatusChangeNotification { - const notification = { - dbName: dbName, - key: key, - status: status, - } + const notification = { dbName, key, status } this._emit(EVENT_NAMES.shapeSubscriptionStatusChange, notification) diff --git a/clients/typescript/src/notifiers/index.ts b/clients/typescript/src/notifiers/index.ts index b681e324cc..c591afacd7 100644 --- a/clients/typescript/src/notifiers/index.ts +++ b/clients/typescript/src/notifiers/index.ts @@ -144,8 +144,8 @@ export interface Notifier { ): UnsubscribeFunction // Notification for shape subscription sync status changes. - // Each subscription is associated with a key a notification will fire - // on every status change + // Every notification will include a key that uniquely identifies the + // shape for which the sync status changed, as well as the new sync status. shapeSubscriptionSyncStatusChanged( dbName: DbName, key: string,