@@ -149,7 +151,11 @@
const doc = new yorkie.Document(documentKey, {
enableDevtools: true,
});
-
+ doc.subscribe(
+ 'connection',
+ new Network(clientElem.querySelector('.network-status'))
+ .statusListener,
+ );
const onlineClients = clientElem.querySelector('.online-clients');
doc.subscribe('presence', (event) => {
// Update online clients list
diff --git a/public/quill.html b/public/quill.html
index ab8e2592a..ff6522958 100644
--- a/public/quill.html
+++ b/public/quill.html
@@ -66,12 +66,12 @@
// 01. create client with RPCAddr then activate it.
const client = new yorkie.Client('http://localhost:8080');
await client.activate();
- client.subscribe(network.statusListener(networkStatusElem));
// 02. create a document then attach it into the client.
const doc = new yorkie.Document(documentKey, {
enableDevtools: true,
});
+ doc.subscribe('connection', new Network(statusHolder).statusListener);
doc.subscribe('presence', (event) => {
if (event.type === 'presence-changed') return;
displayOnlineClients(doc.getPresences(), client.getID());
diff --git a/public/style.css b/public/style.css
index 27773edfc..642881a16 100644
--- a/public/style.css
+++ b/public/style.css
@@ -35,12 +35,14 @@ button {
}
#network-status,
+.network-status,
#online-clients,
#log-holder {
margin: 1rem;
font-family: monospace;
}
-#network-status:before {
+#network-status:before,
+.network-status:before {
content: 'network: ';
}
#online-clients:before {
@@ -49,16 +51,19 @@ button {
#log-holder:before {
content: 'root: ';
}
-#network-status span {
+#network-status span,
+.network-status span {
display: inline-block;
height: 0.8rem;
width: 0.8rem;
border-radius: 0.4rem;
}
-#network-status .green {
+#network-status .green,
+.network-status .green {
background-color: green;
}
-#network-status .red {
+#network-status .red,
+.network-status .red {
background-color: red;
}
diff --git a/public/util.js b/public/util.js
index 042a2e8d8..df74bacc7 100644
--- a/public/util.js
+++ b/public/util.js
@@ -1,34 +1,36 @@
-const network = {
- isOnline: false,
- showOffline: (elem) => {
- network.isOnline = false;
- elem.innerHTML = '
';
- },
- showOnline: (elem) => {
- network.isOnline = true;
- elem.innerHTML = '
';
- },
- statusListener: (elem) => {
- return (event) => {
- if (
- network.isOnline &&
- ((event.type == 'status-changed' && event.value == 'deactivated') ||
- (event.type == 'stream-connection-status-changed' &&
- event.value == 'disconnected') ||
- (event.type == 'document-sync-result' &&
- event.value == 'sync-failed'))
- ) {
- network.showOffline(elem);
- } else if (
- !network.isOnline &&
- ((event.type == 'status-changed' && event.value == 'activated') ||
- (event.type == 'stream-connection-status-changed' &&
- event.value == 'connected') ||
- (event.type == 'document-sync-result' && event.value == 'synced') ||
- event.type == 'document-changed')
- ) {
- network.showOnline(elem);
- }
- };
- },
-};
+/**
+ * `Network` is a class that manages the network status.
+ */
+class Network {
+ constructor(elem) {
+ this.isOnline = false;
+ this.elem = elem;
+ }
+
+ /**
+ * Show offline status.
+ */
+ showOffline() {
+ this.isOnline = false;
+ this.elem.innerHTML = '
';
+ }
+
+ /**
+ * Show online status.
+ */
+ showOnline() {
+ this.isOnline = true;
+ this.elem.innerHTML = '
';
+ }
+
+ /**
+ * Listen to the network status changes.
+ */
+ statusListener = (event) => {
+ if (this.isOnline && event.value === 'disconnected') {
+ this.showOffline();
+ } else if (!this.isOnline && event.value === 'connected') {
+ this.showOnline();
+ }
+ };
+}
diff --git a/public/whiteboard.html b/public/whiteboard.html
index cbced1d19..414f993fd 100644
--- a/public/whiteboard.html
+++ b/public/whiteboard.html
@@ -255,7 +255,6 @@
try {
// 01. create client with RPCAddr then activate it.
const client = new yorkie.Client('http://localhost:8080');
- client.subscribe(network.statusListener(statusHolder));
await client.activate();
const myClientID = client.getID();
@@ -286,6 +285,7 @@
redoStackCount.textContent = doc.getRedoStackForTest().length;
};
+ doc.subscribe('connection', new Network(statusHolder).statusListener);
doc.subscribe('presence', (event) => {
if (event.type === 'presence-changed') {
renderShapes(doc, myClientID);
diff --git a/src/client/client.ts b/src/client/client.ts
index 8281be532..c7353cee6 100644
--- a/src/client/client.ts
+++ b/src/client/client.ts
@@ -15,7 +15,12 @@
*/
import { ActorID } from '@yorkie-js-sdk/src/document/time/actor_id';
-import { createPromiseClient, PromiseClient } from '@connectrpc/connect';
+import {
+ createPromiseClient,
+ PromiseClient,
+ ConnectError,
+ Code as ConnectErrorCode,
+} from '@connectrpc/connect';
import { createGrpcWebTransport } from '@connectrpc/connect-web';
import { YorkieService } from '../api/yorkie/v1/yorkie_connect';
import { WatchDocumentResponse } from '@yorkie-js-sdk/src/api/yorkie/v1/yorkie_pb';
@@ -30,6 +35,9 @@ import {
DocumentKey,
DocumentStatus,
Indexable,
+ DocEventType,
+ StreamConnectionStatus,
+ DocumentSyncStatus,
} from '@yorkie-js-sdk/src/document/document';
import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor';
import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor';
@@ -446,13 +454,7 @@ export class Client {
});
}
- return Promise.all(promises).catch((err) => {
- // this.eventStreamObserver.next({
- // type: ClientEventType.DocumentSynced,
- // value: DocumentSyncResultType.SyncFailed,
- // });
- throw err;
- });
+ return Promise.all(promises);
}
/**
@@ -544,10 +546,6 @@ export class Client {
.then(() => setTimeout(doLoop, this.syncLoopDuration))
.catch((err) => {
logger.error(`[SL] c:"${this.getKey()}" sync failed:`, err);
- // this.eventStreamObserver.next({
- // type: ClientEventType.DocumentSynced,
- // value: DocumentSyncResultType.SyncFailed,
- // });
setTimeout(doLoop, this.retrySyncLoopDelay);
});
};
@@ -585,10 +583,12 @@ export class Client {
},
);
- // this.eventStreamObserver.next({
- // type: ClientEventType.StreamConnectionStatusChanged,
- // value: StreamConnectionStatus.Connected,
- // });
+ attachment.doc.publish([
+ {
+ type: DocEventType.ConnectionChanged,
+ value: StreamConnectionStatus.Connected,
+ },
+ ]);
logger.info(`[WD] c:"${this.getKey()}" watches d:"${docKey}"`);
return new Promise((resolve, reject) => {
@@ -604,12 +604,20 @@ export class Client {
}
}
} catch (err) {
- // this.eventStreamObserver.next({
- // type: ClientEventType.StreamConnectionStatusChanged,
- // value: StreamConnectionStatus.Disconnected,
- // });
+ attachment.doc.publish([
+ {
+ type: DocEventType.ConnectionChanged,
+ value: StreamConnectionStatus.Disconnected,
+ },
+ ]);
logger.debug(`[WD] c:"${this.getKey()}" unwatches`);
- onDisconnect();
+
+ if (
+ err instanceof ConnectError &&
+ err.code != ConnectErrorCode.Canceled
+ ) {
+ onDisconnect();
+ }
reject(err);
}
@@ -649,11 +657,6 @@ export class Client {
attachment.cancelWatchStream();
logger.debug(`[WD] c:"${this.getKey()}" unwatches`);
- // this.eventStreamObserver.next({
- // type: ClientEventType.StreamConnectionStatusChanged,
- // value: StreamConnectionStatus.Disconnected,
- // });
-
this.attachmentMap.delete(docKey);
}
@@ -689,10 +692,12 @@ export class Client {
}
doc.applyChangePack(respPack);
- // this.eventStreamObserver.next({
- // type: ClientEventType.DocumentSynced,
- // value: DocumentSyncResultType.Synced,
- // });
+ attachment.doc.publish([
+ {
+ type: DocEventType.SyncStatusChanged,
+ value: DocumentSyncStatus.Synced,
+ },
+ ]);
// NOTE(chacha912): If a document has been removed, watchStream should
// be disconnected to not receive an event for that document.
if (doc.getStatus() === DocumentStatus.Removed) {
@@ -709,6 +714,12 @@ export class Client {
return doc;
})
.catch((err) => {
+ doc.publish([
+ {
+ type: DocEventType.SyncStatusChanged,
+ value: DocumentSyncStatus.SyncFailed,
+ },
+ ]);
logger.error(`[PP] c:"${this.getKey()}" err :`, err);
throw err;
});
diff --git a/src/document/document.ts b/src/document/document.ts
index 50902c8f0..2ef06e8a5 100644
--- a/src/document/document.ts
+++ b/src/document/document.ts
@@ -128,6 +128,16 @@ export enum DocEventType {
*/
StatusChanged = 'status-changed',
+ /**
+ * `ConnectionChanged` means that the watch stream connection status has changed.
+ */
+ ConnectionChanged = 'connection-changed',
+
+ /**
+ * `SyncStatusChanged` means that the document sync status has changed.
+ */
+ SyncStatusChanged = 'sync-status-changed',
+
/**
* snapshot event type
*/
@@ -173,6 +183,8 @@ export enum DocEventType {
*/
export type DocEvent
=
| StatusChangedEvent
+ | ConnectionChangedEvent
+ | SyncStatusChangedEvent
| SnapshotEvent
| LocalChangeEvent
| RemoteChangeEvent
@@ -194,7 +206,6 @@ export type TransactionEvent = Array<
*/
export interface BaseDocEvent {
type: DocEventType;
- source: OpSource;
}
/**
@@ -214,6 +225,62 @@ export interface StatusChangedEvent extends BaseDocEvent {
| { status: DocumentStatus.Removed };
}
+/**
+ * `StreamConnectionStatus` represents whether the stream connection is connected or not.
+ * @public
+ */
+export enum StreamConnectionStatus {
+ /**
+ * `Connected` means that the stream connection is connected.
+ */
+ Connected = 'connected',
+ /**
+ * `Disconnected` means that the stream connection is disconnected.
+ */
+ Disconnected = 'disconnected',
+}
+
+/**
+ * `ConnectionChangedEvent` is an event that occurs when the stream connection state changes.
+ *
+ * @public
+ */
+export interface ConnectionChangedEvent extends BaseDocEvent {
+ /**
+ * enum {@link DocEventType}.ConnectionChanged
+ */
+ type: DocEventType.ConnectionChanged;
+ value: StreamConnectionStatus;
+}
+
+/**
+ * `DocumentSyncStatus` represents the result of synchronizing the document with the server.
+ * @public
+ */
+export enum DocumentSyncStatus {
+ /**
+ * `Synced` means that document synced successfully.
+ */
+ Synced = 'synced',
+ /**
+ * `SyncFiled` means that document synchronization has failed.
+ */
+ SyncFailed = 'sync-failed',
+}
+
+/**
+ * `SyncStatusChangedEvent` is an event that occurs when document is synced with the server.
+ *
+ * @public
+ */
+export interface SyncStatusChangedEvent extends BaseDocEvent {
+ /**
+ * enum {@link DocEventType}.SyncStatusChanged
+ */
+ type: DocEventType.SyncStatusChanged;
+ value: DocumentSyncStatus;
+}
+
/**
* `SnapshotEvent` is an event that occurs when a snapshot is received from
* the server.
@@ -679,6 +746,26 @@ export class Document {
error?: ErrorFn,
complete?: CompleteFn,
): Unsubscribe;
+ /**
+ * `subscribe` registers a callback to subscribe to events on the document.
+ * The callback will be called when the stream connection status changes.
+ */
+ public subscribe(
+ type: 'connection',
+ next: NextFn>,
+ error?: ErrorFn,
+ complete?: CompleteFn,
+ ): Unsubscribe;
+ /**
+ * `subscribe` registers a callback to subscribe to events on the document.
+ * The callback will be called when the document is synced with the server.
+ */
+ public subscribe(
+ type: 'sync',
+ next: NextFn>,
+ error?: ErrorFn,
+ complete?: CompleteFn,
+ ): Unsubscribe;
/**
* `subscribe` registers a callback to subscribe to events on the document.
* The callback will be called when the targetPath or any of its nested values change.
@@ -792,6 +879,36 @@ export class Document {
arg4,
);
}
+ if (arg1 === 'connection') {
+ const callback = arg2 as NextFn>;
+ return this.eventStream.subscribe(
+ (event) => {
+ for (const docEvent of event) {
+ if (docEvent.type !== DocEventType.ConnectionChanged) {
+ continue;
+ }
+ callback(docEvent);
+ }
+ },
+ arg3,
+ arg4,
+ );
+ }
+ if (arg1 === 'sync') {
+ const callback = arg2 as NextFn>;
+ return this.eventStream.subscribe(
+ (event) => {
+ for (const docEvent of event) {
+ if (docEvent.type !== DocEventType.SyncStatusChanged) {
+ continue;
+ }
+ callback(docEvent);
+ }
+ },
+ arg3,
+ arg4,
+ );
+ }
if (arg1 === 'all') {
const callback = arg2 as NextFn>;
return this.eventStream.subscribe(callback, arg3, arg4);
diff --git a/src/yorkie.ts b/src/yorkie.ts
index 5b6460067..37d2fb695 100644
--- a/src/yorkie.ts
+++ b/src/yorkie.ts
@@ -24,16 +24,8 @@ import * as Devtools from '@yorkie-js-sdk/src/devtools/types';
export {
Client,
- ClientEvent,
ClientStatus,
SyncMode,
- StreamConnectionStatus,
- DocumentSyncResultType,
- ClientEventType,
- StatusChangedEvent,
- DocumentChangedEvent,
- StreamConnectionStatusChangedEvent,
- DocumentSyncedEvent,
ClientOptions,
} from '@yorkie-js-sdk/src/client/client';
export {
@@ -41,6 +33,15 @@ export {
SnapshotEvent,
LocalChangeEvent,
RemoteChangeEvent,
+ ConnectionChangedEvent,
+ SyncStatusChangedEvent,
+ WatchedEvent,
+ UnwatchedEvent,
+ PresenceChangedEvent,
+ InitializedEvent,
+ StreamConnectionStatus,
+ DocumentSyncStatus,
+ DocumentStatus,
Indexable,
DocEvent,
TransactionEvent,
diff --git a/test/integration/client_test.ts b/test/integration/client_test.ts
index 3bd4c4449..bc2a78979 100644
--- a/test/integration/client_test.ts
+++ b/test/integration/client_test.ts
@@ -3,9 +3,8 @@ import { describe, it, assert, vi, afterEach } from 'vitest';
import yorkie, {
Counter,
SyncMode,
- DocumentSyncResultType,
DocEventType,
- ClientEventType,
+ DocumentSyncStatus,
Tree,
} from '@yorkie-js-sdk/src/yorkie';
import { EventCollector } from '@yorkie-js-sdk/test/helper/helper';
@@ -136,24 +135,20 @@ describe.sequential('Client', function () {
const eventCollectorD1 = new EventCollector();
const eventCollectorD2 = new EventCollector();
- const eventCollectorC1 = new EventCollector();
- const eventCollectorC2 = new EventCollector();
+ const eventCollectorSync1 = new EventCollector();
+ const eventCollectorSync2 = new EventCollector();
const unsub1 = {
- client: c1.subscribe((event) => {
- if (event.type === ClientEventType.DocumentSynced) {
- eventCollectorC1.add(event.value);
- }
+ syncEvent: d1.subscribe('sync', (event) => {
+ eventCollectorSync1.add(event.value as DocumentSyncStatus);
}),
doc: d1.subscribe((event) => {
eventCollectorD1.add(event.type);
}),
};
const unsub2 = {
- client: c2.subscribe((event) => {
- if (event.type === ClientEventType.DocumentSynced) {
- eventCollectorC2.add(event.value);
- }
+ syncEvent: d2.subscribe('sync', (event) => {
+ eventCollectorSync2.add(event.value as DocumentSyncStatus);
}),
doc: d2.subscribe((event) => {
eventCollectorD2.add(event.type);
@@ -169,8 +164,8 @@ describe.sequential('Client', function () {
await eventCollectorD1.waitAndVerifyNthEvent(1, DocEventType.RemoteChange);
assert.equal(d1.toSortedJSON(), d2.toSortedJSON());
- eventCollectorC1.reset();
- eventCollectorC2.reset();
+ eventCollectorSync1.reset();
+ eventCollectorSync2.reset();
// Simulate network error
vi.stubGlobal('fetch', () => {
@@ -184,27 +179,27 @@ describe.sequential('Client', function () {
});
await eventCollectorD2.waitAndVerifyNthEvent(2, DocEventType.LocalChange);
- await eventCollectorC2.waitFor(DocumentSyncResultType.SyncFailed); // c2 should fail to sync
+ await eventCollectorSync2.waitFor(DocumentSyncStatus.SyncFailed); // c2 should fail to sync
await c1.sync().catch((err) => {
assert.equal(err.message, '[unknown] Failed to fetch'); // c1 should also fail to sync
});
- await eventCollectorC1.waitFor(DocumentSyncResultType.SyncFailed);
+ await eventCollectorSync1.waitFor(DocumentSyncStatus.SyncFailed);
assert.equal(d1.toSortedJSON(), '{"k1":"undefined"}');
assert.equal(d2.toSortedJSON(), '{"k1":"v1"}');
// Back to normal condition
- eventCollectorC1.reset();
- eventCollectorC2.reset();
+ eventCollectorSync1.reset();
+ eventCollectorSync2.reset();
vi.unstubAllGlobals();
- await eventCollectorC1.waitFor(DocumentSyncResultType.Synced); // wait for c1 to sync
- await eventCollectorC2.waitFor(DocumentSyncResultType.Synced);
+ await eventCollectorSync1.waitFor(DocumentSyncStatus.Synced); // wait for c1 to sync
+ await eventCollectorSync2.waitFor(DocumentSyncStatus.Synced);
await eventCollectorD1.waitAndVerifyNthEvent(2, DocEventType.RemoteChange);
assert.equal(d1.toSortedJSON(), '{"k1":"v1"}'); // d1 should be able to receive d2's update
- unsub1.client();
- unsub2.client();
+ unsub1.syncEvent();
+ unsub2.syncEvent();
unsub1.doc();
unsub2.doc();
@@ -240,11 +235,11 @@ describe.sequential('Client', function () {
// 02. c2 changes the sync mode to realtime sync mode.
const eventCollector = new EventCollector();
- const unsub1 = c2.subscribe((event) => {
- eventCollector.add(event.type);
+ const unsub1 = d2.subscribe('sync', (event) => {
+ eventCollector.add(event.value as DocumentSyncStatus);
});
await c2.changeSyncMode(d2, SyncMode.Realtime);
- await eventCollector.waitFor(ClientEventType.DocumentSynced); // sync occurs when resuming
+ await eventCollector.waitFor(DocumentSyncStatus.Synced); // sync occurs when resuming
eventCollector.reset();
d1.update((root) => {
@@ -252,7 +247,7 @@ describe.sequential('Client', function () {
});
await c1.sync();
- await eventCollector.waitFor(ClientEventType.DocumentSynced); // c2 should sync automatically
+ await eventCollector.waitFor(DocumentSyncStatus.Synced); // c2 should sync automatically
assert.equal(d1.toSortedJSON(), `{"version":"v2"}`, 'd1');
assert.equal(d2.toSortedJSON(), `{"version":"v2"}`, 'd2');
unsub1();
@@ -418,8 +413,8 @@ describe.sequential('Client', function () {
const d2 = new yorkie.Document<{ version: string }>(docKey);
const eventCollector = new EventCollector();
- const unsub1 = c2.subscribe((event) => {
- eventCollector.add(event.type);
+ const unsub1 = d2.subscribe('sync', (event) => {
+ eventCollector.add(event.value as DocumentSyncStatus);
});
// 01. c2 attach the doc with realtime sync mode at first.
@@ -430,7 +425,7 @@ describe.sequential('Client', function () {
});
await c1.sync();
assert.equal(d1.toSortedJSON(), `{"version":"v1"}`, 'd1');
- await eventCollector.waitFor(ClientEventType.DocumentSynced);
+ await eventCollector.waitFor(DocumentSyncStatus.Synced);
assert.equal(d2.toSortedJSON(), `{"version":"v1"}`, 'd2');
// 02. c2 is changed to manual sync. So, c2 doesn't get the changes of c1.
@@ -447,7 +442,7 @@ describe.sequential('Client', function () {
eventCollector.reset();
await c2.changeSyncMode(d2, SyncMode.Realtime);
- await eventCollector.waitFor(ClientEventType.DocumentSynced);
+ await eventCollector.waitFor(DocumentSyncStatus.Synced);
assert.equal(d2.toSortedJSON(), `{"version":"v2"}`, 'd2');
// 04. c2 should automatically synchronize changes.
@@ -457,7 +452,7 @@ describe.sequential('Client', function () {
});
await c1.sync();
- await eventCollector.waitFor(ClientEventType.DocumentSynced);
+ await eventCollector.waitFor(DocumentSyncStatus.Synced);
assert.equal(d1.toSortedJSON(), `{"version":"v3"}`, 'd1');
assert.equal(d2.toSortedJSON(), `{"version":"v3"}`, 'd2');
unsub1();
@@ -495,8 +490,8 @@ describe.sequential('Client', function () {
// 03. cli update the document with increasing the counter(0 -> 1)
// and sync with push-only mode: CP(2, 2) -> CP(3, 2)
const eventCollector = new EventCollector();
- const unsub = c1.subscribe((event) => {
- eventCollector.add(event.type);
+ const unsub = d1.subscribe('sync', (event) => {
+ eventCollector.add(event.value as DocumentSyncStatus);
});
d1.update((root) => {
root.counter.increase(1);
@@ -504,7 +499,7 @@ describe.sequential('Client', function () {
let changePack = d1.createChangePack();
assert.equal(changePack.getChangeSize(), 1);
await c1.changeSyncMode(d1, SyncMode.RealtimePushOnly);
- await eventCollector.waitFor(ClientEventType.DocumentSynced);
+ await eventCollector.waitFor(DocumentSyncStatus.Synced);
checkpoint = d1.getCheckpoint();
assert.equal(checkpoint.getClientSeq(), 3);
assert.equal(checkpoint.getServerSeq().toInt(), 2);