Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Upstream changes
Browse files Browse the repository at this point in the history
276149df - feat(client): Shape subscription status notifier (#1276)
2024-05-20 15:34:34 +0000 - Stefanos Mousafeiris
electric-sql/electric@276149df
  • Loading branch information
davidmartos96 committed May 21, 2024
1 parent 97d30ce commit 3be19eb
Show file tree
Hide file tree
Showing 10 changed files with 474 additions and 153 deletions.
15 changes: 14 additions & 1 deletion packages/electricsql/lib/src/client/model/shapes.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'package:electricsql/src/satellite/satellite.dart';
import 'package:electricsql/src/satellite/shapes/types.dart';
import 'package:equatable/equatable.dart';

typedef TableName = String;

Expand All @@ -10,13 +11,16 @@ enum SyncStatusType {
establishing,
}

sealed class SyncStatus {
sealed class SyncStatus with EquatableMixin {
SyncStatusType get statusType;
}

class SyncStatusUndefined extends SyncStatus {
@override
SyncStatusType get statusType => SyncStatusType.undefined;

@override
List<Object?> get props => [statusType];
}

class SyncStatusActive extends SyncStatus {
Expand All @@ -26,6 +30,9 @@ class SyncStatusActive extends SyncStatus {
SyncStatusType get statusType => SyncStatusType.active;

SyncStatusActive(this.serverId);

@override
List<Object?> get props => [statusType, serverId];
}

class SyncStatusCancelling extends SyncStatus {
Expand All @@ -35,6 +42,9 @@ class SyncStatusCancelling extends SyncStatus {
SyncStatusType get statusType => SyncStatusType.cancelling;

SyncStatusCancelling(this.serverId);

@override
List<Object?> get props => [statusType, serverId];
}

class SyncStatusEstablishing extends SyncStatus {
Expand All @@ -50,6 +60,9 @@ class SyncStatusEstablishing extends SyncStatus {
required this.progress,
this.oldServerId,
});

@override
List<Object?> get props => [statusType, serverId, progress, oldServerId];
}

abstract interface class IShapeManager {
Expand Down
55 changes: 54 additions & 1 deletion packages/electricsql/lib/src/notifiers/event.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import 'package:collection/collection.dart';
import 'package:electricsql/src/auth/auth.dart';
import 'package:electricsql/electricsql.dart';
import 'package:electricsql/src/notifiers/notifiers.dart';
import 'package:electricsql/src/util/debug/debug.dart';
import 'package:electricsql/src/util/tablename.dart';
Expand All @@ -12,6 +12,7 @@ class EventNames {
static const actualDataChange = 'data:actually:changed';
static const potentialDataChange = 'data:potentially:changed';
static const connectivityStateChange = 'network:connectivity:changed';
static const shapeSubscriptionStatusChange = 'shape:status:changed';
}

// Initialise global emitter to be shared between all
Expand Down Expand Up @@ -183,6 +184,41 @@ class EventNotifier implements Notifier {
return () => _unsubscribe(eventListener);
}

@override
void shapeSubscriptionSyncStatusChanged(
String dbName,
String key,
SyncStatus status,
) {
if (!_hasDbName(dbName)) {
return;
}

_emitShapeSubscriptionSyncStatusChange(dbName, key, status);
}

@override
UnsubscribeFunction subscribeToShapeSubscriptionSyncStatusChanges(
ShapeSubscriptionSyncStatusChangeCallback callback,
) {
void wrappedCallback(
ShapeSubscriptionSyncStatusChangeNotification notification,
) {
if (_hasDbName(notification.dbName)) {
callback(notification);
}
return;
}

final eventListener = EventListener(
EventNames.shapeSubscriptionStatusChange,
wrappedCallback,
);
_subscribe(eventListener);

return () => _unsubscribe(eventListener);
}

List<DbName> _getDbNames() {
final idx = attachedDbIndex;

Expand Down Expand Up @@ -247,6 +283,23 @@ class EventNotifier implements Notifier {
return notification;
}

ShapeSubscriptionSyncStatusChangeNotification
_emitShapeSubscriptionSyncStatusChange(
DbName dbName,
String key,
SyncStatus status,
) {
final notification = ShapeSubscriptionSyncStatusChangeNotification(
dbName: dbName,
key: key,
status: status,
);

emit(EventNames.shapeSubscriptionStatusChange, notification);

return notification;
}

@protected
void emit(String eventName, Notification notification) {
events.emit(eventName, notification);
Expand Down
98 changes: 65 additions & 33 deletions packages/electricsql/lib/src/notifiers/notifiers.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:electricsql/src/auth/auth.dart';
import 'package:electricsql/src/client/model/shapes.dart';
import 'package:electricsql/src/satellite/oplog.dart';
import 'package:electricsql/src/util/tablename.dart';
import 'package:electricsql/src/util/types.dart';
Expand Down Expand Up @@ -48,7 +49,8 @@ class RecordChange with EquatableMixin {

class Change with EquatableMixin {
final QualifiedTablename qualifiedTablename;
// rowid of each oplog entry for the changes - availiable only for local changes

/// rowid of each oplog entry for the changes - availiable only for local changes
List<RowId>? rowids;
List<RecordChange>? recordChanges;

Expand Down Expand Up @@ -106,6 +108,18 @@ class ConnectivityStateChangeNotification extends Notification {
});
}

class ShapeSubscriptionSyncStatusChangeNotification extends Notification {
final DbName dbName;
final String key;
final SyncStatus status;

ShapeSubscriptionSyncStatusChangeNotification({
required this.dbName,
required this.key,
required this.status,
});
}

abstract class Notification {}

typedef AuthStateCallback = void Function(AuthStateNotification notification);
Expand All @@ -118,73 +132,91 @@ typedef ConnectivityStateChangeCallback = void Function(
ConnectivityStateChangeNotification notification,
);

typedef ShapeSubscriptionSyncStatusChangeCallback = void Function(
ShapeSubscriptionSyncStatusChangeNotification notification,
);

typedef NotificationCallback = void Function(Notification notification);

typedef UnsubscribeFunction = void Function();

abstract class Notifier {
// The name of the primary database that components communicating via this
// notifier have open and are using.
/// The name of the primary database that components communicating via this
/// notifier have open and are using.
DbName get dbName;

// Some drivers can attach other open databases and reference them by alias
// (i.e.: first you `attach('foo.db')` then you can write SQL queries like
// `select * from foo.bars`. We keep track of attached databases and their
// aliases, so we can map the table namespaces in SQL queries to their real
// database names and thus emit and handle notifications to and from them.
/// Some drivers can attach other open databases and reference them by alias
/// (i.e.: first you `attach('foo.db')` then you can write SQL queries like
/// `select * from foo.bars`. We keep track of attached databases and their
/// aliases, so we can map the table namespaces in SQL queries to their real
/// database names and thus emit and handle notifications to and from them.
void attach(DbName dbName, String dbAlias);
void detach(String dbAlias);

// Technically, we keep track of the attached dbs in two mappings -- one is
// `alias: name`, the other `name: alias`.
/// Technically, we keep track of the attached dbs in two mappings -- one is
/// `alias: name`, the other `name: alias`.
AttachedDbIndex get attachedDbIndex;

// And we provide a helper method to alias changes in the form
// `{attachedDbName, tablenames}` to `aliasedTablenames`.
/// And we provide a helper method to alias changes in the form
/// `{attachedDbName, tablenames}` to `aliasedTablenames`.
List<QualifiedTablename> alias(ChangeNotification notification);

// Calling `authStateChanged` notifies the Satellite process that the
// user's authentication credentials have changed.
/// Calling `authStateChanged` notifies the Satellite process that the
/// user's authentication credentials have changed.
void authStateChanged(AuthState authState);
UnsubscribeFunction subscribeToAuthStateChanges(AuthStateCallback callback);

// The data change notification workflow starts by the electric database
// clients (or the user manually) calling `potentiallyChanged` whenever
// a write or transaction has been issued that may have changed the
// contents of either the primary or any of the attached databases.
/// The data change notification workflow starts by the electric database
/// clients (or the user manually) calling `potentiallyChanged` whenever
/// a write or transaction has been issued that may have changed the
/// contents of either the primary or any of the attached databases.
void potentiallyChanged();

// Satellite processes subscribe to these "data has potentially changed"
// notifications. When they get one, they check the `_oplog` table in the
// database for *actual* changes persisted by the triggers.
/// Satellite processes subscribe to these "data has potentially changed"
/// notifications. When they get one, they check the `_oplog` table in the
/// database for *actual* changes persisted by the triggers.
UnsubscribeFunction subscribeToPotentialDataChanges(
PotentialChangeCallback callback,
);

// When Satellite detects actual data changes in the oplog for a given
// database, it replicates it and calls `actuallyChanged` with the list
// of changes.
/// When Satellite detects actual data changes in the oplog for a given
/// database, it replicates it and calls `actuallyChanged` with the list
/// of changes.
void actuallyChanged(
DbName dbName,
List<Change> changes,
ChangeOrigin origin,
);

// Reactive hooks then subscribe to "data has actually changed" notifications,
// using the info to trigger re-queries, if the changes affect databases and
// tables that their queries depend on. This then trigger re-rendering iff
// the query results are actually affected by the data changes.
/// Reactive hooks then subscribe to "data has actually changed" notifications,
/// using the info to trigger re-queries, if the changes affect databases and
/// tables that their queries depend on. This then trigger re-rendering iff
/// the query results are actually affected by the data changes.
UnsubscribeFunction subscribeToDataChanges(ChangeCallback callback);

// Notification for network connectivity state changes.
// A connectivity change is automatically triggered in consequence of internal client events.
// 'connected': connection to Electric established
// 'disconnected': Electric is unreachable, or network is unavailable.
// A reason for the disconnection can be provided.
/// Notification for network connectivity state changes.
/// A connectivity change is automatically triggered in consequence of internal client events.
/// 'connected': connection to Electric established
/// 'disconnected': Electric is unreachable, or network is unavailable.
/// A reason for the disconnection can be provided.
void connectivityStateChanged(String dbName, ConnectivityState state);

UnsubscribeFunction subscribeToConnectivityStateChanges(
ConnectivityStateChangeCallback callback,
);

/// Notification for shape subscription sync status changes.
/// Every notification will include a key that uniquely identifies the
/// shape for which the sync status changed, as well as the new sync status.
void shapeSubscriptionSyncStatusChanged(
String dbName,
String key,
SyncStatus status,
);

UnsubscribeFunction subscribeToShapeSubscriptionSyncStatusChanges(
ShapeSubscriptionSyncStatusChangeCallback callback,
);
}

class AttachedDbIndex {
Expand Down
4 changes: 2 additions & 2 deletions packages/electricsql/lib/src/satellite/mock.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import 'package:electricsql/src/satellite/config.dart';
import 'package:electricsql/src/satellite/oplog.dart';
import 'package:electricsql/src/satellite/registry.dart';
import 'package:electricsql/src/satellite/satellite.dart';
import 'package:electricsql/src/satellite/shapes/shapes.dart';
import 'package:electricsql/src/satellite/shapes/shape_manager.dart';
import 'package:electricsql/src/satellite/shapes/types.dart';
import 'package:electricsql/src/sockets/sockets.dart';
import 'package:electricsql/src/util/common.dart';
Expand Down Expand Up @@ -254,7 +254,7 @@ class MockSatelliteClient extends AsyncEventEmitter implements Client {
final Map<String, String> shapeReqToUuid = {};

for (final shape in shapes) {
final tables = getAllTablesForShape(shape.definition, schema: 'main');
final tables = getTableNamesForShapes([shape.definition], 'main');

for (final qualTable in tables) {
final tablename = qualTable.tablename;
Expand Down
12 changes: 10 additions & 2 deletions packages/electricsql/lib/src/satellite/process.dart
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ class SatelliteProcess implements Satellite {
required this.notifier,
}) : _adapter = adapter,
builder = migrator.queryBuilder {
subscriptionManager = ShapeManager();
subscriptionManager = ShapeManager(
onShapeSyncStatusUpdated: (key, status) =>
notifier.shapeSubscriptionSyncStatusChanged(dbName, key, status),
);

throttledSnapshot = Throttle(
mutexSnapshot,
Expand Down Expand Up @@ -378,6 +381,7 @@ This means there is a notifier subscription leak.`''');

if (error != null) throw error;

// persist subscription metadata
await setMeta('subscriptions', subscriptionManager.serialize());

return ShapeSubscription(
Expand Down Expand Up @@ -405,7 +409,7 @@ This means there is a notifier subscription leak.`''');
} else if (key != null) {
return unsubscribeIds(subscriptionManager.getServerIDs([key]));
} else {
return unsubscribeIds(subscriptionManager.getServerID(shapes!));
return unsubscribeIds(subscriptionManager.getServerIDsForShapes(shapes!));
}
}

Expand All @@ -416,6 +420,8 @@ This means there is a notifier subscription leak.`''');

// If the server didn't send an error, we persist the fact the subscription was deleted.
subscriptionManager.unsubscribeMade(subscriptionIds);

// persist subscription metadata
await adapter.run(
_setMetaStatement('subscriptions', subscriptionManager.serialize()),
);
Expand All @@ -431,6 +437,7 @@ This means there is a notifier subscription leak.`''');
additionalStatements: [],
subscriptionId: subsData.subscriptionId,
);

final toBeUnsubbed = afterApply();
if (toBeUnsubbed.isNotEmpty) {
await unsubscribeIds(toBeUnsubbed);
Expand Down Expand Up @@ -1579,6 +1586,7 @@ INSERT $orIgnore INTO $qualifiedTableName (${columnNames.join(', ')}) VALUES '''
...stmts,
..._enableTriggers(affectedTables),
]);

subscriptionManager.goneBatchDelivered(subscriptionIds);

_notifyChanges(fakeOplogEntries, ChangeOrigin.remote);
Expand Down
Loading

0 comments on commit 3be19eb

Please sign in to comment.