From 276149df52dab1b20cf848ae8ba926651c026d53 Mon Sep 17 00:00:00 2001 From: Stefanos Mousafeiris Date: Mon, 20 May 2024 18:34:34 +0300 Subject: [PATCH] feat(client): Shape subscription status notifier (#1276) The usual notifier pattern but for shape subscription status Added callback to shape manager to internalize notifying logic entirely to the shape manager rather than having arbitrary calls within the satellite process We could add additionall utility to subscribe to particular subscription with a given key but I think for now just having this general notifier method will cover use cases that need it, we don't need to expose/document it. --- .changeset/young-plants-try.md | 5 + clients/typescript/src/notifiers/event.ts | 51 +++++++++ clients/typescript/src/notifiers/index.ts | 26 +++++ clients/typescript/src/satellite/mock.ts | 4 +- clients/typescript/src/satellite/process.ts | 14 ++- .../typescript/src/satellite/shapes/index.ts | 108 ------------------ .../src/satellite/shapes/shapeManager.ts | 69 ++++++++--- .../typescript/test/notifiers/event.test.ts | 53 ++++++++- clients/typescript/test/satellite/process.ts | 101 +++++++++++++++- .../satellite/shapes/shapeManager.test.ts | 102 +++++++++++++++++ 10 files changed, 402 insertions(+), 131 deletions(-) create mode 100644 .changeset/young-plants-try.md delete mode 100644 clients/typescript/src/satellite/shapes/index.ts diff --git a/.changeset/young-plants-try.md b/.changeset/young-plants-try.md new file mode 100644 index 0000000000..1a8fe7c928 --- /dev/null +++ b/.changeset/young-plants-try.md @@ -0,0 +1,5 @@ +--- +"electric-sql": patch +--- + +Add notifier method `subscribeToShapeSubscriptionSyncStatusChanges` for listening to shape subscription status updates diff --git a/clients/typescript/src/notifiers/event.ts b/clients/typescript/src/notifiers/event.ts index 7dd408aee1..8ff9e2ff4e 100644 --- a/clients/typescript/src/notifiers/event.ts +++ b/clients/typescript/src/notifiers/event.ts @@ -19,14 +19,18 @@ import { Notifier, PotentialChangeCallback, PotentialChangeNotification, + ShapeSubscriptionSyncStatusChangeCallback, + ShapeSubscriptionSyncStatusChangeNotification, UnsubscribeFunction, } from './index' +import { SyncStatus } from '../client/model/shapes' export const EVENT_NAMES = { authChange: 'auth:changed', actualDataChange: 'data:actually:changed', potentialDataChange: 'data:potentially:changed', connectivityStateChange: 'network:connectivity:changed', + shapeSubscriptionStatusChange: 'shape:status:changed', } // Initialise global emitter to be shared between all @@ -201,6 +205,41 @@ export class EventNotifier implements Notifier { } } + shapeSubscriptionSyncStatusChanged( + dbName: string, + key: string, + status: SyncStatus + ): void { + if (!this._hasDbName(dbName)) { + return + } + + this._emitShapeSubscriptionSyncStatusChange(dbName, key, status) + } + + subscribeToShapeSubscriptionSyncStatusChanges( + callback: ShapeSubscriptionSyncStatusChangeCallback + ): UnsubscribeFunction { + const thisHasDbName = this._hasDbName.bind(this) + + const wrappedCallback = ( + notification: ShapeSubscriptionSyncStatusChangeNotification + ) => { + if (thisHasDbName(notification.dbName)) { + callback(notification) + } + } + + this._subscribe(EVENT_NAMES.shapeSubscriptionStatusChange, wrappedCallback) + + return () => { + this._unsubscribe( + EVENT_NAMES.shapeSubscriptionStatusChange, + wrappedCallback + ) + } + } + _getDbNames(): DbName[] { const idx = this.attachedDbIndex @@ -261,6 +300,18 @@ export class EventNotifier implements Notifier { return notification } + _emitShapeSubscriptionSyncStatusChange( + dbName: DbName, + key: string, + status: SyncStatus + ): ShapeSubscriptionSyncStatusChangeNotification { + const notification = { dbName, key, status } + + this._emit(EVENT_NAMES.shapeSubscriptionStatusChange, notification) + + return notification + } + _emit(eventName: string, notification: Notification) { this.events.emit(eventName, notification) } diff --git a/clients/typescript/src/notifiers/index.ts b/clients/typescript/src/notifiers/index.ts index 0f6a6bfefa..c591afacd7 100644 --- a/clients/typescript/src/notifiers/index.ts +++ b/clients/typescript/src/notifiers/index.ts @@ -1,4 +1,5 @@ import { AuthState } from '../auth/index' +import { SyncStatus } from '../client/model/shapes' import { QualifiedTablename } from '../util/tablename' import { ConnectivityState, @@ -39,11 +40,18 @@ export interface ConnectivityStateChangeNotification { connectivityState: ConnectivityState } +export interface ShapeSubscriptionSyncStatusChangeNotification { + dbName: DbName + key: string + status: SyncStatus +} + export type Notification = | AuthStateNotification | ChangeNotification | PotentialChangeNotification | ConnectivityStateChangeNotification + | ShapeSubscriptionSyncStatusChangeNotification export type AuthStateCallback = (notification: AuthStateNotification) => void export type ChangeCallback = (notification: ChangeNotification) => void @@ -54,11 +62,16 @@ export type ConnectivityStateChangeCallback = ( notification: ConnectivityStateChangeNotification ) => void +export type ShapeSubscriptionSyncStatusChangeCallback = ( + notification: ShapeSubscriptionSyncStatusChangeNotification +) => void + export type NotificationCallback = | AuthStateCallback | ChangeCallback | PotentialChangeCallback | ConnectivityStateChangeCallback + | ShapeSubscriptionSyncStatusChangeCallback export type UnsubscribeFunction = () => void @@ -129,4 +142,17 @@ export interface Notifier { subscribeToConnectivityStateChanges( callback: ConnectivityStateChangeCallback ): UnsubscribeFunction + + // Notification for shape subscription sync status changes. + // Every notification will include a key that uniquely identifies the + // shape for which the sync status changed, as well as the new sync status. + shapeSubscriptionSyncStatusChanged( + dbName: DbName, + key: string, + status: SyncStatus + ): void + + subscribeToShapeSubscriptionSyncStatusChanges( + callback: ShapeSubscriptionSyncStatusChangeCallback + ): UnsubscribeFunction } diff --git a/clients/typescript/src/satellite/mock.ts b/clients/typescript/src/satellite/mock.ts index feb47029fd..a3503bf1d5 100644 --- a/clients/typescript/src/satellite/mock.ts +++ b/clients/typescript/src/satellite/mock.ts @@ -61,8 +61,8 @@ import { } from '../_generated/protocol/satellite' import { ShapeSubscription } from './process' import { DbSchema } from '../client/model/schema' -import { getAllTablesForShape } from './shapes' import { SyncStatus } from '../client/model/shapes' +import { getTableNamesForShapes } from './shapes/shapeManager' export const MOCK_BEHIND_WINDOW_LSN = 42 export const MOCK_INTERNAL_ERROR = 27 @@ -276,7 +276,7 @@ export class MockSatelliteClient const shapeReqToUuid: Record = {} for (const shape of shapes) { - const tables = getAllTablesForShape(shape.definition, 'main') + const tables = getTableNamesForShapes([shape.definition], 'main') for (const { tablename } of tables) { if (tablename === 'failure' || tablename === 'Items') { return Promise.resolve({ diff --git a/clients/typescript/src/satellite/process.ts b/clients/typescript/src/satellite/process.ts index e8404329a5..aa5475f6ee 100644 --- a/clients/typescript/src/satellite/process.ts +++ b/clients/typescript/src/satellite/process.ts @@ -176,7 +176,12 @@ export class SatelliteProcess implements Satellite { this.relations = {} this.previousShapeSubscriptions = [] - this.subscriptionManager = new ShapeManager() + this.subscriptionManager = new ShapeManager( + this.notifier.shapeSubscriptionSyncStatusChanged.bind( + this.notifier, + this.dbName + ) + ) this._throttledSnapshot = throttle( this._mutexSnapshot.bind(this), @@ -446,6 +451,7 @@ export class SatelliteProcess implements Satellite { if (error) throw error + // persist subscription metadata await this._setMeta('subscriptions', this.subscriptionManager.serialize()) return { @@ -470,7 +476,7 @@ export class SatelliteProcess implements Satellite { ) } else { return this.unsubscribeIds( - this.subscriptionManager.getServerID(target.shapes) + this.subscriptionManager.getServerIDsForShapes(target.shapes) ) } } @@ -482,6 +488,8 @@ export class SatelliteProcess implements Satellite { // If the server didn't send an error, we persist the fact the subscription was deleted. this.subscriptionManager.unsubscribeMade(subscriptionIds) + + // persist subscription metadata await this.adapter.run( this._setMetaStatement( 'subscriptions', @@ -501,6 +509,7 @@ export class SatelliteProcess implements Satellite { [], subsData.subscriptionId ) + const toBeUnsubbed = afterApply() if (toBeUnsubbed.length > 0) await this.unsubscribeIds(toBeUnsubbed) } @@ -1549,6 +1558,7 @@ export class SatelliteProcess implements Satellite { ...stmts, ...this._enableTriggers(affectedTables) ) + this.subscriptionManager.goneBatchDelivered(subscriptionIds) this._notifyChanges(fakeOplogEntries, 'remote') diff --git a/clients/typescript/src/satellite/shapes/index.ts b/clients/typescript/src/satellite/shapes/index.ts deleted file mode 100644 index 917c8e8573..0000000000 --- a/clients/typescript/src/satellite/shapes/index.ts +++ /dev/null @@ -1,108 +0,0 @@ -import uniqWith from 'lodash.uniqwith' - -import { - Shape, - ShapeDefinition, - ShapeRequest, - SubscriptionData, - SubscriptionId, -} from './types' -import { QualifiedTablename } from '../../util' - -/** - * Manages the state of satellite shape subscriptions - */ -export interface SubscriptionsManager { - /** - * Stores the identifier for a subscription - * request that was accepted by the server - * - * @param subId the identifier of the subscription - * @param shapeRequests the shapes definitions of the request - */ - subscriptionRequested(subId: string, shapeRequests: ShapeRequest[]): void - - /** - * Cancel the subscription with the given subscription id - * - * @param subId the identifier of the subscription - */ - subscriptionCancelled(subId: string): void - - /** - * Registers that a subsciption that was in-flight is now - * delivered. - * @param data the data for the subscription - */ - subscriptionDelivered(data: SubscriptionData): void - - /** - * Returns the shape definitions for subscriptions avalailable locally - * @param subId the identifier of the subscription - */ - shapesForActiveSubscription(subId: string): ShapeDefinition[] | undefined - - /** - * @returns An array of fulfilled subscriptions that are active. - */ - getFulfilledSubscriptions(): SubscriptionId[] - - /** - * Check if a subscription with exactly the same shape requests has already been issued - * @param shapes Shapes for a potential request - */ - getDuplicatingSubscription( - shapes: Shape[] - ): null | { inFlight: string } | { fulfilled: string } - - /** - * Deletes the subscription(s) from the manager. - * @param subId the identifier of the subscription or an array of subscription identifiers - */ - unsubscribe(subId: SubscriptionId[]): void - - /** - * Delete the subscriptions from the manager and call a GC function - */ - unsubscribeAndGC(subIds: SubscriptionId[]): Promise - - /** - * Deletes all subscriptions from the manager. Useful to - * reset the state of the manager. Calls the configured GC. - * Returns the subscription identifiers of all subscriptions - * that were deleted. - */ - unsubscribeAllAndGC(): Promise - - /** - * Converts the state of the manager to a string format that - * can be used to persist it - */ - serialize(): string - - /** - * loads the subscription manager state from a text representation - */ - setState(serialized: string): void - - hash(shapes: Shape[]): string -} - -/** List all tables covered by a given shape */ -export function getAllTablesForShape( - shape: Shape, - schema = 'main' -): QualifiedTablename[] { - return uniqWith(doGetAllTablesForShape(shape, schema), (a, b) => a.isEqual(b)) -} - -function doGetAllTablesForShape( - shape: Shape, - schema: string -): QualifiedTablename[] { - const includes = - shape.include?.flatMap((x) => doGetAllTablesForShape(x.select, schema)) ?? - [] - includes.push(new QualifiedTablename(schema, shape.tablename)) - return includes -} diff --git a/clients/typescript/src/satellite/shapes/shapeManager.ts b/clients/typescript/src/satellite/shapes/shapeManager.ts index 19bec6e23b..c27ef2225f 100644 --- a/clients/typescript/src/satellite/shapes/shapeManager.ts +++ b/clients/typescript/src/satellite/shapes/shapeManager.ts @@ -16,6 +16,8 @@ interface RequestedSubscription { fullKey: string } +type OnShapeSyncStatusUpdated = (key: string, status: SyncStatus) => void + type OptionalRecord = Record export class ShapeManager { @@ -33,6 +35,8 @@ export class ShapeManager { private serverIds: Map = new Map() private incompleteUnsubs: Set = new Set() + constructor(private onShapeSyncStatusUpdated?: OnShapeSyncStatusUpdated) {} + /** Set internal state using a string returned from {@link ShapeManager#serialize}. */ public initialize(serializedState: string): void { const { unfulfilled, active, known, unsubscribes } = @@ -67,7 +71,7 @@ export class ShapeManager { }): QualifiedTablename[] { const requested = Object.values(this.requestedSubscriptions) - const tables = getTableNames( + const tables = getTableNamesForShapes( Object.values(this.knownSubscriptions) .filter((x) => !requested.includes(x?.fullKey)) .flatMap((x) => x?.shapes) @@ -212,11 +216,7 @@ export class ShapeManager { syncFailed: () => void promise: Promise } { - // TODO: This sorts the shapes objects for hashing to make sure that order of includes - // does not affect the hash. This has the unfortunate consequence of sorting the FK spec, - // but the chance of a table having two multi-column FKs over same columns BUT in a - // different order feels much lower than people using includes in an arbitrary order. - const shapeHash = hash(shapes, { unorderedArrays: true }) + const shapeHash = this.hashShapes(shapes) const keyOrHash = key ?? shapeHash /* Since multiple requests may have the same key, we'll need to differentiate them * based on both hash and key. We use `:` to join them because hash is base64 that @@ -252,10 +252,18 @@ export class ShapeManager { this.requestedSubscriptions[keyOrHash] = fullKey + let notified = false + this.promises[fullKey] = emptyPromise() return { key: keyOrHash, - setServerId: (id) => this.setServerId(fullKey, id), + setServerId: (id) => { + this.setServerId(fullKey, id) + if (!notified) { + notified = true + this.onShapeSyncStatusUpdated?.(keyOrHash, this.status(keyOrHash)) + } + }, syncFailed: () => this.syncFailed(keyOrHash, fullKey), promise: this.promises[fullKey].promise, } @@ -318,6 +326,7 @@ export class ShapeManager { this.activeSubscriptions[key] = fullKey if (sub.overshadowsFullKeys.length === 0) { + this.onShapeSyncStatusUpdated?.(key, this.status(key)) return () => { this.promises[fullKey].resolve() delete this.promises[fullKey] @@ -332,7 +341,15 @@ export class ShapeManager { } public unsubscribeMade(serverIds: string[]) { - for (const id of serverIds) this.incompleteUnsubs.add(id) + for (const id of serverIds) { + this.incompleteUnsubs.add(id) + + if (this.onShapeSyncStatusUpdated) { + const key = this.getKeyForServerID(id) + if (!key) continue + this.onShapeSyncStatusUpdated(key, this.status(key)) + } + } } /** @@ -364,6 +381,8 @@ export class ShapeManager { this.promises[sub.fullKey].resolve() } } + + this.onShapeSyncStatusUpdated?.(key, this.status(key)) } } @@ -388,12 +407,27 @@ export class ShapeManager { .filter(onlyDefined) } - public getServerID(shapes: Shape[]): string[] { - const shapeHash = hash(shapes, { unorderedArrays: true }) + public getServerIDsForShapes(shapes: Shape[]): string[] { + const shapeHash = this.hashShapes(shapes) const fullKey = makeFullKey(shapeHash, shapeHash) const serverId = this.knownSubscriptions[fullKey]?.serverId return serverId ? [serverId] : [] } + + public getKeyForServerID(serverId: string): string | undefined { + const fullKey = this.serverIds.get(serverId) + if (fullKey === undefined) return + const [_hash, key] = splitFullKey(fullKey) + return key + } + + public hashShapes(shapes: Shape[]): string { + // TODO: This sorts the shapes objects for hashing to make sure that order of includes + // does not affect the hash. This has the unfortunate consequence of sorting the FK spec, + // but the chance of a table having two multi-column FKs over same columns BUT in a + // different order feels much lower than people using includes in an arbitrary order. + return hash(shapes, { unorderedArrays: true }) + } } function onlyDefined(x: T | undefined): x is T { @@ -414,16 +448,23 @@ function splitOnce(str: string, on: string): [string, string] { else return [str.slice(0, found), str.slice(found + 1)] } -function getTableNames(shapes: Shape[], schema: string): QualifiedTablename[] { +export function getTableNamesForShapes( + shapes: Shape[], + schema: string +): QualifiedTablename[] { return uniqWith( - shapes.flatMap((x) => doGetTableNames(x, schema)), + shapes.flatMap((x) => doGetTableNamesForShape(x, schema)), (a, b) => a.isEqual(b) ) } -function doGetTableNames(shape: Shape, schema: string): QualifiedTablename[] { +function doGetTableNamesForShape( + shape: Shape, + schema: string +): QualifiedTablename[] { const includes = - shape.include?.flatMap((x) => doGetTableNames(x.select, schema)) ?? [] + shape.include?.flatMap((x) => doGetTableNamesForShape(x.select, schema)) ?? + [] includes.push(new QualifiedTablename(schema, shape.tablename)) return includes } diff --git a/clients/typescript/test/notifiers/event.test.ts b/clients/typescript/test/notifiers/event.test.ts index 19cdfb3739..600fe66118 100644 --- a/clients/typescript/test/notifiers/event.test.ts +++ b/clients/typescript/test/notifiers/event.test.ts @@ -1,5 +1,8 @@ import test from 'ava' -import { ConnectivityStateChangeNotification } from '../../src/notifiers' +import { + ConnectivityStateChangeNotification, + ShapeSubscriptionSyncStatusChangeNotification, +} from '../../src/notifiers' import { EventNotifier } from '../../src/notifiers/event' import { QualifiedTablename } from '../../src/util/tablename' @@ -152,3 +155,51 @@ test('empty changes should not emit', async (t) => { source.actuallyChanged('foo.db', [], 'local') t.is(notifications.length, 0) }) + +test('subscribe to shape subscription status changes', async (t) => { + const eventEmitter = new EventEmitter() + const source = new EventNotifier('test.db', eventEmitter) + const target = new EventNotifier('test.db', eventEmitter) + + const notifications: ShapeSubscriptionSyncStatusChangeNotification[] = [] + + target.subscribeToShapeSubscriptionSyncStatusChanges((x) => { + notifications.push(x) + }) + + source.shapeSubscriptionSyncStatusChanged('test.db', 'foo', { + status: 'establishing', + serverId: 'foo', + progress: 'receiving_data', + }) + + t.is(notifications.length, 1) +}) + +test('subscribe to shape subscription status changes is scoped by db name', async (t) => { + const eventEmitter = new EventEmitter() + const source = new EventNotifier('test.db', eventEmitter) + const target = new EventNotifier('test.db', eventEmitter) + + const notifications: ShapeSubscriptionSyncStatusChangeNotification[] = [] + + target.subscribeToShapeSubscriptionSyncStatusChanges((x) => { + notifications.push(x) + }) + + source.shapeSubscriptionSyncStatusChanged('test.db', 'foo', { + status: 'establishing', + serverId: 'foo', + progress: 'receiving_data', + }) + + t.is(notifications.length, 1) + + source.shapeSubscriptionSyncStatusChanged('non-existing.db', 'foo', { + status: 'establishing', + serverId: 'foo', + progress: 'receiving_data', + }) + + t.is(notifications.length, 1) +}) diff --git a/clients/typescript/test/satellite/process.ts b/clients/typescript/test/satellite/process.ts index 85d03b0c0f..5c3dc848d0 100644 --- a/clients/typescript/test/satellite/process.ts +++ b/clients/typescript/test/satellite/process.ts @@ -1683,10 +1683,11 @@ export const processTests = (test: TestFn) => { const { synced } = await satellite.subscribe([shapeDef]) await synced - // first notification is 'connected' - t.is(notifier.notifications.length, 2) - t.is(notifier.notifications[1].changes.length, 1) - t.deepEqual(notifier.notifications[1].changes[0], { + // first notification is 'connected', second and third is establishing shape, + // final one is initial sync + t.is(notifier.notifications.length, 4) + t.is(notifier.notifications[3].changes.length, 1) + t.deepEqual(notifier.notifications[3].changes[0], { qualifiedTablename: qualifiedTableName, recordChanges: [ { @@ -2660,4 +2661,96 @@ export const processTests = (test: TestFn) => { t.pass() }) + + test('notifies for shape lifecycle', async (t) => { + const { client, satellite, notifier, token } = t.context + const { runMigrations, authState } = t.context + await runMigrations() + + const shapeSubKey = 'foo' + const shapeNotifications = () => + notifier.notifications.filter((n) => n.key !== undefined) + + // relations must be present at subscription delivery + client.setRelations(relations) + client.setRelationData('parent', parentRecord) + client.setRelationData('child', childRecord) + + const conn = await startSatellite(satellite, authState, token) + await conn.connectionPromise + + satellite!.relations = relations + const { synced: syncedFirst } = await satellite.subscribe( + [{ tablename: 'parent' }], + shapeSubKey + ) + + // first one is establishing + + t.is(shapeNotifications().length, 1) + const firstNotification = shapeNotifications()[0] + t.is(firstNotification.key, shapeSubKey) + t.is(firstNotification.status.status, 'establishing') + t.is(firstNotification.status.progress, 'receiving_data') + const firstServerId = firstNotification.status.serverId + t.true(typeof firstServerId === 'string') + + await syncedFirst + + // second one is active + t.is(shapeNotifications().length, 2) + const secondNotification = shapeNotifications()[1] + t.is(secondNotification.key, shapeSubKey) + t.is(secondNotification.status.status, 'active') + t.is(secondNotification.status.serverId, firstServerId) + + // change existing sub to different shape + const { synced: syncedSecond } = await satellite.subscribe( + [{ tablename: 'child' }], + shapeSubKey + ) + + // third one is a "mutation" one, receiving new data + t.is(shapeNotifications().length, 3) + const thirdNotifictiaon = shapeNotifications()[2] + t.is(thirdNotifictiaon.key, shapeSubKey) + t.is(thirdNotifictiaon.status.status, 'establishing') + t.is(thirdNotifictiaon.status.progress, 'receiving_data') + t.is(thirdNotifictiaon.status.oldServerId, firstServerId) + const secondServerId = thirdNotifictiaon.status.serverId + t.true(typeof secondServerId === 'string') + + await syncedSecond + + t.is(shapeNotifications().length, 5) + + // fourth one is another "mutation" one, removing old data + const fourthNotifictiaon = shapeNotifications()[3] + t.is(fourthNotifictiaon.key, shapeSubKey) + t.is(fourthNotifictiaon.status.status, 'establishing') + t.is(fourthNotifictiaon.status.progress, 'removing_data') + t.is(fourthNotifictiaon.status.serverId, secondServerId) + + // fifth one should eventually get back to active + const fifthNotification = shapeNotifications()[4] + t.is(fifthNotification.key, shapeSubKey) + t.is(fifthNotification.status.status, 'active') + t.is(fifthNotification.status.serverId, secondServerId) + + // cancel subscription + await satellite.unsubscribe([shapeSubKey]) + await sleepAsync(100) + + // sixth one first notifies of cancellation + t.is(shapeNotifications().length, 7) + const sixthNotification = shapeNotifications()[5] + t.is(sixthNotification.key, shapeSubKey) + t.is(sixthNotification.status.status, 'cancelling') + t.is(sixthNotification.status.serverId, secondServerId) + + // last one should indicate that it is gone + const seventhNotification = shapeNotifications()[6] + t.is(seventhNotification.key, shapeSubKey) + t.is(seventhNotification.status, undefined) + }) } diff --git a/clients/typescript/test/satellite/shapes/shapeManager.test.ts b/clients/typescript/test/satellite/shapes/shapeManager.test.ts index c4f2bc232b..e29d8009de 100644 --- a/clients/typescript/test/satellite/shapes/shapeManager.test.ts +++ b/clients/typescript/test/satellite/shapes/shapeManager.test.ts @@ -2,6 +2,7 @@ import test, { ExecutionContext } from 'ava' import { ShapeManager } from '../../../src/satellite/shapes/shapeManager' import { sleepAsync } from '../../../src/util' +import { SyncStatus } from '../../../src/client/model/shapes' test('Shape manager stores the subscription and returns a promise', (t) => { // Setup @@ -132,6 +133,107 @@ test('Shape manager correctly rehydrates the state', async (t) => { t.deepEqual(mng.listContinuedSubscriptions(), ['id2']) }) +test('Shape manager notifies about shape sync status lifecycle', async (t) => { + // Setup + const syncStatusUpdates: { key: string; status: SyncStatus }[] = [] + const subKey = 'foo' + const serverId1 = 'testID' + const serverId2 = 'testID2' + const mng = new ShapeManager((key, status) => + syncStatusUpdates.push({ key, status }) + ) + + // Assertions + + // request shape sub + const firstResult = mng.syncRequested([{ tablename: 't1' }], subKey) + t.is(syncStatusUpdates.length, 0) + if ('existing' in firstResult) return void t.fail() + firstResult.setServerId(serverId1) + t.is(syncStatusUpdates.length, 1) + t.deepEqual(syncStatusUpdates[0], { + key: subKey, + status: { + status: 'establishing', + progress: 'receiving_data', + serverId: serverId1, + }, + }) + + // notify when shape data delivered + const cb = mng.dataDelivered(serverId1) + t.deepEqual(cb(), []) + t.is(syncStatusUpdates.length, 2) + t.deepEqual(syncStatusUpdates[1], { + key: subKey, + status: { + status: 'active', + serverId: serverId1, + }, + }) + + // request overshadowing shape for same key + const secondResult = mng.syncRequested([{ tablename: 't2' }], subKey) + if ('existing' in secondResult) return void t.fail() + secondResult.setServerId(serverId2) + t.is(syncStatusUpdates.length, 3) + t.deepEqual(syncStatusUpdates[2], { + key: subKey, + status: { + status: 'establishing', + progress: 'receiving_data', + serverId: serverId2, + oldServerId: serverId1, + }, + }) + + // notify when new shape data delivered once unsubscribe + // of previous shape is made + const cb2 = mng.dataDelivered(serverId2) + t.deepEqual(cb2(), [serverId1]) + t.is(syncStatusUpdates.length, 3) + mng.unsubscribeMade([serverId1]) + t.is(syncStatusUpdates.length, 4) + t.deepEqual(syncStatusUpdates[3], { + key: subKey, + status: { + status: 'establishing', + progress: 'removing_data', + serverId: serverId2, + }, + }) + + // notify when new shape is both delivered and old one cleaned up + mng.goneBatchDelivered([serverId1]) + t.is(syncStatusUpdates.length, 5) + t.deepEqual(syncStatusUpdates[4], { + key: subKey, + status: { + status: 'active', + serverId: serverId2, + }, + }) + + // notify when shape is being cancelled + mng.unsubscribeMade([serverId2]) + t.is(syncStatusUpdates.length, 6) + t.deepEqual(syncStatusUpdates[5], { + key: subKey, + status: { + status: 'cancelling', + serverId: serverId2, + }, + }) + + // notify when shape is completely gone + mng.goneBatchDelivered([serverId2]) + t.is(syncStatusUpdates.length, 7) + t.deepEqual(syncStatusUpdates[6], { + key: subKey, + status: undefined, + }) +}) + async function promiseResolved( t: ExecutionContext, promise: Promise,