From f6ec48a176470431449dd3af8ae8dfc0c4392bae Mon Sep 17 00:00:00 2001 From: Paik Date: Wed, 21 Aug 2024 08:02:08 +0900 Subject: [PATCH 01/19] Implement subscription to broadcast events --- packages/sdk/src/document/document.ts | 72 ++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 9343b6353..b48a2a82a 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -173,6 +173,11 @@ export enum DocEventType { * `PresenceChanged` means that the presences of the client has updated. */ PresenceChanged = 'presence-changed', + + /** + * `Broadcast` means that the message is broadcasted to clients who subscribe to the event. + */ + Broadcast = 'broadcast', } /** @@ -191,7 +196,8 @@ export type DocEvent

= | InitializedEvent

| WatchedEvent

| UnwatchedEvent

- | PresenceChangedEvent

; + | PresenceChangedEvent

+ | BroadcastEvent; /** * `TransactionEvent` represents document events that occur within @@ -371,6 +377,11 @@ export interface PresenceChangedEvent

value: { clientID: ActorID; presence: P }; } +export interface BroadcastEvent extends BaseDocEvent { + type: DocEventType.Broadcast; + value: { topic: string; payload: any }; +} + type DocEventCallbackMap

= { default: NextFn< | SnapshotEvent @@ -589,6 +600,15 @@ export class Document { */ private isUpdating: boolean; + /** + * `broadcastEventHandlers` is a map of broadcast event handlers. + * The key is the topic of the broadcast event, and the value is the handler. + */ + private broadcastEventHandlers: Map< + string, + (topic: string, payload: any) => void + >; + constructor(key: string, opts?: DocumentOptions) { this.opts = opts || {}; @@ -616,6 +636,8 @@ export class Document { redo: this.redo.bind(this), }; + this.broadcastEventHandlers = new Map(); + setupDevtools(this); } @@ -1027,6 +1049,35 @@ export class Document { throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`); } + /** + * subscribeBroadcastEvent registers a callback to subscribe to broadcast events + * on the document. The callback will be called when the document receives a + * broadcast event with the given topic. + */ + public subscribeBroadcastEvent( + topic: string, + handler: (topic: string, payload: any) => void, + error?: ErrorFn, + ): Unsubscribe { + this.broadcastEventHandlers.set(topic, handler); + + this.eventStream.subscribe((event) => { + for (const docEvent of event) { + if (docEvent.type !== DocEventType.Broadcast) { + continue; + } + + if (docEvent.value.topic === topic) { + handler(topic, docEvent.value.payload); + } + } + }, error); + + return () => { + this.broadcastEventHandlers.delete(topic); + }; + } + /** * `publish` triggers an event in this document, which can be received by * callback functions from document.subscribe(). @@ -1468,7 +1519,8 @@ export class Document { if (resp.body.case === 'event') { const { type, publisher } = resp.body.value; - const event: Array | UnwatchedEvent

> = []; + const event: Array | UnwatchedEvent

| BroadcastEvent> = + []; if (type === PbDocEventType.DOCUMENT_WATCHED) { this.addOnlineClient(publisher); // NOTE(chacha912): We added to onlineClients, but we won't trigger watched event @@ -1495,6 +1547,14 @@ export class Document { value: { clientID: publisher, presence }, }); } + } else if (type === PbDocEventType.DOCUMENT_BROADCAST) { + if (resp.body.value.body) { + const { topic, payload } = resp.body.value.body; + event.push({ + type: DocEventType.Broadcast, + value: { topic, payload }, + }); + } } if (event.length > 0) { @@ -1584,6 +1644,14 @@ export class Document { const { clientID, presence } = event.value; this.presences.set(clientID, presence); } + + if (event.type === DocEventType.Broadcast) { + const { topic, payload } = event.value; + const handler = this.broadcastEventHandlers.get(topic); + if (handler) { + handler(topic, payload); + } + } } /** From 6284cfea363f6dc1e50b237dffe46b205051e845 Mon Sep 17 00:00:00 2001 From: Paik Date: Wed, 21 Aug 2024 08:27:06 +0900 Subject: [PATCH 02/19] Implement publishing broadcast events --- packages/sdk/src/client/client.ts | 46 +++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index 03338bc80..95437a28f 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -584,6 +584,52 @@ export class Client { return this.conditions[condition]; } + /** + * `broadcast` broadcasts the given payload to the given topic. + */ + public broadcast( + doc: Document, + topic: string, + payload: any, + ): Promise { + if (!this.isActive()) { + throw new YorkieError( + Code.ErrClientNotActivated, + `${this.key} is not active`, + ); + } + const attachment = this.attachmentMap.get(doc.getKey()); + if (!attachment) { + throw new YorkieError( + Code.ErrDocumentNotAttached, + `${doc.getKey()} is not attached`, + ); + } + + return this.enqueueTask(async () => { + return this.rpcClient + .broadcast( + { + clientId: this.id!, + documentId: attachment.docID, + topic, + payload: new TextEncoder().encode(JSON.stringify(payload)), + }, + { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, + ) + .then(() => { + logger.info( + `[BC] c:"${this.getKey()}" broadcasts d:"${doc.getKey()}" t:"${topic}"`, + ); + }) + .catch((err) => { + logger.error(`[BC] c:"${this.getKey()}" err :`, err); + this.handleConnectError(err); + throw err; + }); + }); + } + /** * `runSyncLoop` runs the sync loop. The sync loop pushes local changes to * the server and pulls remote changes from the server. From 0472c0a9b3660a428589b86cdea1e349e4efd33d Mon Sep 17 00:00:00 2001 From: Paik Date: Wed, 21 Aug 2024 09:25:45 +0900 Subject: [PATCH 03/19] Decode broadcasted payload --- packages/sdk/src/document/document.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index b48a2a82a..4583db233 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -1550,9 +1550,11 @@ export class Document { } else if (type === PbDocEventType.DOCUMENT_BROADCAST) { if (resp.body.value.body) { const { topic, payload } = resp.body.value.body; + const decoder = new TextDecoder(); + event.push({ type: DocEventType.Broadcast, - value: { topic, payload }, + value: { topic, payload: JSON.parse(decoder.decode(payload)) }, }); } } From a1442b7b9f564f54ec7152e23177036ba576ce1d Mon Sep 17 00:00:00 2001 From: Paik Date: Wed, 21 Aug 2024 09:56:15 +0900 Subject: [PATCH 04/19] Add validation logic for serializable payload --- packages/sdk/src/client/client.ts | 8 ++++++++ packages/sdk/src/util/validator.ts | 31 ++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 packages/sdk/src/util/validator.ts diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index 95437a28f..5dbf4bfb6 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -42,6 +42,7 @@ import { import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation'; import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor'; import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor'; +import { validateSerializable } from '../util/validator'; /** * `SyncMode` defines synchronization modes for the PushPullChanges API. @@ -606,6 +607,13 @@ export class Client { ); } + if (!validateSerializable(payload)) { + throw new YorkieError( + Code.ErrInvalidArgument, + 'payload is not serializable', + ); + } + return this.enqueueTask(async () => { return this.rpcClient .broadcast( diff --git a/packages/sdk/src/util/validator.ts b/packages/sdk/src/util/validator.ts new file mode 100644 index 000000000..33625d837 --- /dev/null +++ b/packages/sdk/src/util/validator.ts @@ -0,0 +1,31 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * `validateSerializable` returns whether the given value is serializable or not. + */ +export const validateSerializable = (value: any): boolean => { + try { + const serialized = JSON.stringify(value); + + if (serialized === undefined) { + return false; + } + } catch (error) { + return false; + } + return true; +}; From 323f98181895080496e6bc814310e830e3c83a79 Mon Sep 17 00:00:00 2001 From: Paik Date: Wed, 21 Aug 2024 09:58:17 +0900 Subject: [PATCH 05/19] Add test case for throwing error when trying to broadcast unserializeable payload --- packages/sdk/test/integration/client_test.ts | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index b315408f4..c8c7ec019 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -863,4 +863,24 @@ describe.sequential('Client', function () { assert.equal(d1.toSortedJSON(), d2.toSortedJSON()); }, task.name); }); + + it('Throw error when broadcasting unserializeable payload', async ({ + task, + }) => { + const cli = new yorkie.Client(testRPCAddr); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); + await cli.attach(doc); + + // broadcast unserializable payload + const payload = () => {}; + const broadcastTopic = 'test'; + + expect(async () => + cli.broadcast(doc, broadcastTopic, payload), + ).rejects.toThrowErrorCode(Code.ErrInvalidArgument); + + await cli.deactivate(); + }); }); From b6924411f983b4d071ee9ee757e807668c0eb8bc Mon Sep 17 00:00:00 2001 From: Paik Date: Wed, 21 Aug 2024 18:04:33 +0900 Subject: [PATCH 06/19] Fix bug in subscribeBroadcastEvent method --- packages/sdk/src/document/document.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 4583db233..b78fa3eb2 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -1061,7 +1061,7 @@ export class Document { ): Unsubscribe { this.broadcastEventHandlers.set(topic, handler); - this.eventStream.subscribe((event) => { + const unsubscribe = this.eventStream.subscribe((event) => { for (const docEvent of event) { if (docEvent.type !== DocEventType.Broadcast) { continue; @@ -1074,6 +1074,7 @@ export class Document { }, error); return () => { + unsubscribe(); this.broadcastEventHandlers.delete(topic); }; } From 66c996a7e6c665b4531a5216c46c3a70877d4fab Mon Sep 17 00:00:00 2001 From: Paik Date: Thu, 22 Aug 2024 09:24:34 +0900 Subject: [PATCH 07/19] Add test case for successfully broadcast serializeable payload --- packages/sdk/test/integration/client_test.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index c8c7ec019..565ca16ea 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -864,6 +864,23 @@ describe.sequential('Client', function () { }, task.name); }); + it('Successfully broadcast serializeable payload', async ({ task }) => { + const cli = new yorkie.Client(testRPCAddr); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); + await cli.attach(doc); + + const broadcastTopic = 'test'; + const payload = { a: 1, b: '2' }; + + expect(async () => + cli.broadcast(doc, broadcastTopic, payload), + ).not.toThrow(); + + await cli.deactivate(); + }); + it('Throw error when broadcasting unserializeable payload', async ({ task, }) => { From b0b2766b811ff6d6513bad204f3bb2a786c65040 Mon Sep 17 00:00:00 2001 From: Paik Date: Fri, 23 Aug 2024 08:37:58 +0900 Subject: [PATCH 08/19] Add test cases for subscribing and unsubscribing broadcast events --- packages/sdk/test/integration/client_test.ts | 93 ++++++++++++++++++- .../test/integration/integration_helper.ts | 5 +- 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 565ca16ea..94a7bdad3 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -864,7 +864,9 @@ describe.sequential('Client', function () { }, task.name); }); - it('Successfully broadcast serializeable payload', async ({ task }) => { + it('Should successfully broadcast serializeable payload', async ({ + task, + }) => { const cli = new yorkie.Client(testRPCAddr); await cli.activate(); @@ -881,7 +883,7 @@ describe.sequential('Client', function () { await cli.deactivate(); }); - it('Throw error when broadcasting unserializeable payload', async ({ + it('Should throw error when broadcasting unserializeable payload', async ({ task, }) => { const cli = new yorkie.Client(testRPCAddr); @@ -901,3 +903,90 @@ describe.sequential('Client', function () { await cli.deactivate(); }); }); + +it('Should trigger the handler for a subscribed broadcast event', async ({ + task, +}) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const spy = vi.fn(); + + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic, spy); + + const payload = { a: 1, b: '2' }; + await c1.broadcast(d1, broadcastTopic, payload); + + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); + + expect(spy.mock.calls.length).toBe(1); + expect(spy.mock.calls[0][0]).toBe(broadcastTopic); + expect(spy.mock.calls[0][1]).toEqual(payload); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); +}); + +it('Should not trigger the handler for an unsubscribed broadcast event', async ({ + task, +}) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const spy = vi.fn(); + + const broadcastTopic1 = 'test1'; + const broadcastTopic2 = 'test2'; + + const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic2, spy); + + const payload = { a: 1, b: '2' }; + await c1.broadcast(d1, broadcastTopic1, payload); + + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); + + expect(spy.mock.calls.length).toBe(0); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); +}); + +it('Should not trigger the handler for a broadcast event after unsubscribing', async ({ + task, +}) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const spy = vi.fn(); + + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic, spy); + + const payload = { a: 1, b: '2' }; + await c1.broadcast(d1, broadcastTopic, payload); + + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); + + expect(spy.mock.calls.length).toBe(1); + + unsubscribe(); + + await c1.broadcast(d1, broadcastTopic, payload); + + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); + + // No change in the number of calls + expect(spy.mock.calls.length).toBe(1); + }, + task.name, + SyncMode.Realtime, + ); +}); diff --git a/packages/sdk/test/integration/integration_helper.ts b/packages/sdk/test/integration/integration_helper.ts index 7db44e2ad..f0189e390 100644 --- a/packages/sdk/test/integration/integration_helper.ts +++ b/packages/sdk/test/integration/integration_helper.ts @@ -21,6 +21,7 @@ export async function withTwoClientsAndDocuments( d2: Document, ) => Promise, title: string, + syncMode: SyncMode = SyncMode.Manual, ): Promise { const client1 = new yorkie.Client(testRPCAddr); const client2 = new yorkie.Client(testRPCAddr); @@ -31,8 +32,8 @@ export async function withTwoClientsAndDocuments( const doc1 = new yorkie.Document(docKey); const doc2 = new yorkie.Document(docKey); - await client1.attach(doc1, { syncMode: SyncMode.Manual }); - await client2.attach(doc2, { syncMode: SyncMode.Manual }); + await client1.attach(doc1, { syncMode }); + await client2.attach(doc2, { syncMode }); await callback(client1, doc1, client2, doc2); From 06d37fa5c60bf9e431c49b655a68681d1dc0a3ba Mon Sep 17 00:00:00 2001 From: Paik Date: Mon, 26 Aug 2024 09:33:42 +0900 Subject: [PATCH 09/19] Modify interface for subscribing broadcast events --- packages/sdk/src/document/document.ts | 88 +++++++++++++------- packages/sdk/test/integration/client_test.ts | 15 +++- 2 files changed, 70 insertions(+), 33 deletions(-) diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index b78fa3eb2..8ccf3726f 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -399,6 +399,7 @@ type DocEventCallbackMap

= { connection: NextFn; status: NextFn; sync: NextFn; + broadcast: (topic: string, payload: any) => void; all: NextFn>; }; export type DocEventTopic = keyof DocEventCallbackMap; @@ -550,6 +551,21 @@ type PathOf = PathOfInternal< Depth >; +/* + * `SubscribePair` represents the type of the subscribe pair. + */ +type SubscribePair = { + type: string; +}; + +/* + * `BroadcastSubscribePair` represents the type of the broadcast subscribe pair. + */ +type BroadcastSubscribePair = { + type: 'broadcast'; + topic: string; +} & SubscribePair; + /** * `Document` is a CRDT-based data type. We can represent the model * of the application and edit it even while offline. @@ -606,7 +622,7 @@ export class Document { */ private broadcastEventHandlers: Map< string, - (topic: string, payload: any) => void + DocEventCallbackMap

['broadcast'] >; constructor(key: string, opts?: DocumentOptions) { @@ -840,6 +856,18 @@ export class Document { error?: ErrorFn, complete?: CompleteFn, ): Unsubscribe; + /** + * `subscribe` registers a callback to subscribe to events on the document. + * The callback will be called when the document is changed. + */ + public subscribe( + type: { + type: 'broadcast'; + topic: string; + }, + next: DocEventCallbackMap

['broadcast'], + error?: ErrorFn, + ): Unsubscribe; /** * `subscribe` registers a callback to subscribe to events on the document. */ @@ -856,7 +884,11 @@ export class Document { TPath extends PathOf, TOperationInfo extends OperationInfoOf, >( - arg1: TPath | DocEventTopic | DocEventCallbackMap

['default'], + arg1: + | TPath + | DocEventTopic + | DocEventCallbackMap

['default'] + | SubscribePair, arg2?: | NextFn< | LocalChangeEvent @@ -1046,37 +1078,33 @@ export class Document { complete, ); } - throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`); - } - - /** - * subscribeBroadcastEvent registers a callback to subscribe to broadcast events - * on the document. The callback will be called when the document receives a - * broadcast event with the given topic. - */ - public subscribeBroadcastEvent( - topic: string, - handler: (topic: string, payload: any) => void, - error?: ErrorFn, - ): Unsubscribe { - this.broadcastEventHandlers.set(topic, handler); + if (typeof arg1 === 'object') { + const { type } = arg1 as SubscribePair; + + if (type === 'broadcast') { + const { topic } = arg1 as BroadcastSubscribePair; + const handler = arg2 as DocEventCallbackMap

['broadcast']; + const error = arg3 as ErrorFn; + this.broadcastEventHandlers.set(topic, handler); + const unsubscribe = this.eventStream.subscribe((event) => { + for (const docEvent of event) { + if (docEvent.type !== DocEventType.Broadcast) { + continue; + } - const unsubscribe = this.eventStream.subscribe((event) => { - for (const docEvent of event) { - if (docEvent.type !== DocEventType.Broadcast) { - continue; - } + if (docEvent.value.topic === topic) { + handler(topic, docEvent.value.payload); + } + } + }, error); - if (docEvent.value.topic === topic) { - handler(topic, docEvent.value.payload); - } + return () => { + unsubscribe(); + this.broadcastEventHandlers.delete(topic); + }; } - }, error); - - return () => { - unsubscribe(); - this.broadcastEventHandlers.delete(topic); - }; + } + throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`); } /** diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 94a7bdad3..f721b9faf 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -912,7 +912,10 @@ it('Should trigger the handler for a subscribed broadcast event', async ({ const spy = vi.fn(); const broadcastTopic = 'test'; - const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic, spy); + const unsubscribe = d2.subscribe( + { type: 'broadcast', topic: broadcastTopic }, + spy, + ); const payload = { a: 1, b: '2' }; await c1.broadcast(d1, broadcastTopic, payload); @@ -941,7 +944,10 @@ it('Should not trigger the handler for an unsubscribed broadcast event', async ( const broadcastTopic1 = 'test1'; const broadcastTopic2 = 'test2'; - const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic2, spy); + const unsubscribe = d2.subscribe( + { type: 'broadcast', topic: broadcastTopic2 }, + spy, + ); const payload = { a: 1, b: '2' }; await c1.broadcast(d1, broadcastTopic1, payload); @@ -966,7 +972,10 @@ it('Should not trigger the handler for a broadcast event after unsubscribing', a const spy = vi.fn(); const broadcastTopic = 'test'; - const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic, spy); + const unsubscribe = d2.subscribe( + { type: 'broadcast', topic: broadcastTopic }, + spy, + ); const payload = { a: 1, b: '2' }; await c1.broadcast(d1, broadcastTopic, payload); From 80609858fde2dc104a9a50dec5a4e339d1c07477 Mon Sep 17 00:00:00 2001 From: Paik Date: Tue, 27 Aug 2024 08:43:09 +0900 Subject: [PATCH 10/19] Refactor the broadcast method to be called directly by the document object --- packages/sdk/src/client/attachment.ts | 7 +++++ packages/sdk/src/client/client.ts | 14 +++++---- packages/sdk/src/document/document.ts | 30 ++++++++++++++++++++ packages/sdk/test/integration/client_test.ts | 14 ++++----- 4 files changed, 51 insertions(+), 14 deletions(-) diff --git a/packages/sdk/src/client/attachment.ts b/packages/sdk/src/client/attachment.ts index 1a020570d..b2ddbff2c 100644 --- a/packages/sdk/src/client/attachment.ts +++ b/packages/sdk/src/client/attachment.ts @@ -107,4 +107,11 @@ export class Attachment { clearTimeout(this.watchLoopTimerID); this.watchLoopTimerID = undefined; } + + /** + * `unsetClient` unsets the client of the document. + */ + public unsetClient(): void { + this.doc.setClient(null); + } } diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index 5dbf4bfb6..f2a4c6614 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -304,6 +304,7 @@ export class Client { } doc.setActor(this.id!); doc.update((_, p) => p.set(options.initialPresence || {})); + doc.setClient(this); const syncMode = options.syncMode ?? SyncMode.Realtime; return this.enqueueTask(async () => { @@ -588,8 +589,8 @@ export class Client { /** * `broadcast` broadcasts the given payload to the given topic. */ - public broadcast( - doc: Document, + public broadcast( + docKey: DocumentKey, topic: string, payload: any, ): Promise { @@ -599,11 +600,11 @@ export class Client { `${this.key} is not active`, ); } - const attachment = this.attachmentMap.get(doc.getKey()); + const attachment = this.attachmentMap.get(docKey); if (!attachment) { throw new YorkieError( Code.ErrDocumentNotAttached, - `${doc.getKey()} is not attached`, + `${docKey} is not attached`, ); } @@ -623,11 +624,11 @@ export class Client { topic, payload: new TextEncoder().encode(JSON.stringify(payload)), }, - { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, + { headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } }, ) .then(() => { logger.info( - `[BC] c:"${this.getKey()}" broadcasts d:"${doc.getKey()}" t:"${topic}"`, + `[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`, ); }) .catch((err) => { @@ -801,6 +802,7 @@ export class Client { return; } + attachment.unsetClient(); attachment.cancelWatchStream(); this.attachmentMap.delete(docKey); } diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 8ccf3726f..4e2944d1a 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -78,6 +78,7 @@ import { import { History, HistoryOperation } from '@yorkie-js-sdk/src/document/history'; import { setupDevtools } from '@yorkie-js-sdk/src/devtools'; import * as Devtools from '@yorkie-js-sdk/src/devtools/types'; +import { Client } from '@yorkie-js-sdk/src/client/client'; /** * `DocumentOptions` are the options to create a new document. @@ -625,6 +626,8 @@ export class Document { DocEventCallbackMap

['broadcast'] >; + private client: Client | null = null; + constructor(key: string, opts?: DocumentOptions) { this.opts = opts || {}; @@ -1330,6 +1333,15 @@ export class Document { return this.root.getGarbageLen(); } + /* + * `setClient` sets the client of this document. + * + * @internal + */ + public setClient(client: Client | null): void { + this.client = client; + } + /** * `getGarbageLenFromClone` returns the length of elements should be purged from clone. */ @@ -2069,4 +2081,22 @@ export class Document { public getRedoStackForTest(): Array>> { return this.internalHistory.getRedoStackForTest(); } + + /** + * `broadcast` the payload to the given topic. + */ + public broadcast(topic: string, payload: any): Promise { + if (this.client) { + try { + return this.client.broadcast(this.getKey(), topic, payload); + } catch (e) { + throw e; + } + } + + throw new YorkieError( + Code.ErrClientNotFound, + 'Document is not attached to a client', + ); + } } diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index f721b9faf..c94000840 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -876,9 +876,7 @@ describe.sequential('Client', function () { const broadcastTopic = 'test'; const payload = { a: 1, b: '2' }; - expect(async () => - cli.broadcast(doc, broadcastTopic, payload), - ).not.toThrow(); + expect(async () => doc.broadcast(broadcastTopic, payload)).not.toThrow(); await cli.deactivate(); }); @@ -897,7 +895,7 @@ describe.sequential('Client', function () { const broadcastTopic = 'test'; expect(async () => - cli.broadcast(doc, broadcastTopic, payload), + doc.broadcast(broadcastTopic, payload), ).rejects.toThrowErrorCode(Code.ErrInvalidArgument); await cli.deactivate(); @@ -918,7 +916,7 @@ it('Should trigger the handler for a subscribed broadcast event', async ({ ); const payload = { a: 1, b: '2' }; - await c1.broadcast(d1, broadcastTopic, payload); + await d1.broadcast(broadcastTopic, payload); // Assuming that every subscriber can receive the broadcast event within 1000ms. await new Promise((res) => setTimeout(res, 1000)); @@ -950,7 +948,7 @@ it('Should not trigger the handler for an unsubscribed broadcast event', async ( ); const payload = { a: 1, b: '2' }; - await c1.broadcast(d1, broadcastTopic1, payload); + await d1.broadcast(broadcastTopic1, payload); // Assuming that every subscriber can receive the broadcast event within 1000ms. await new Promise((res) => setTimeout(res, 1000)); @@ -978,7 +976,7 @@ it('Should not trigger the handler for a broadcast event after unsubscribing', a ); const payload = { a: 1, b: '2' }; - await c1.broadcast(d1, broadcastTopic, payload); + await d1.broadcast(broadcastTopic, payload); // Assuming that every subscriber can receive the broadcast event within 1000ms. await new Promise((res) => setTimeout(res, 1000)); @@ -987,7 +985,7 @@ it('Should not trigger the handler for a broadcast event after unsubscribing', a unsubscribe(); - await c1.broadcast(d1, broadcastTopic, payload); + await d1.broadcast(broadcastTopic, payload); // Assuming that every subscriber can receive the broadcast event within 1000ms. await new Promise((res) => setTimeout(res, 1000)); From 4d516be605807e697956ef0a7f435fc5a5e1cbf7 Mon Sep 17 00:00:00 2001 From: Paik Date: Tue, 27 Aug 2024 08:53:13 +0900 Subject: [PATCH 11/19] Fix lint errors --- packages/sdk/src/client/attachment.ts | 2 +- packages/sdk/src/document/document.ts | 17 +++++------------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/packages/sdk/src/client/attachment.ts b/packages/sdk/src/client/attachment.ts index b2ddbff2c..ef3ca3be3 100644 --- a/packages/sdk/src/client/attachment.ts +++ b/packages/sdk/src/client/attachment.ts @@ -112,6 +112,6 @@ export class Attachment { * `unsetClient` unsets the client of the document. */ public unsetClient(): void { - this.doc.setClient(null); + this.doc.setClient(); } } diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 4e2944d1a..1fcf8842b 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -626,7 +626,7 @@ export class Document { DocEventCallbackMap

['broadcast'] >; - private client: Client | null = null; + private client?: Client; constructor(key: string, opts?: DocumentOptions) { this.opts = opts || {}; @@ -864,10 +864,7 @@ export class Document { * The callback will be called when the document is changed. */ public subscribe( - type: { - type: 'broadcast'; - topic: string; - }, + type: BroadcastSubscribePair, next: DocEventCallbackMap

['broadcast'], error?: ErrorFn, ): Unsubscribe; @@ -1333,12 +1330,12 @@ export class Document { return this.root.getGarbageLen(); } - /* + /** * `setClient` sets the client of this document. * * @internal */ - public setClient(client: Client | null): void { + public setClient(client?: Client): void { this.client = client; } @@ -2087,11 +2084,7 @@ export class Document { */ public broadcast(topic: string, payload: any): Promise { if (this.client) { - try { - return this.client.broadcast(this.getKey(), topic, payload); - } catch (e) { - throw e; - } + return this.client.broadcast(this.getKey(), topic, payload); } throw new YorkieError( From 69650c091aa471c89741945f99aeeaafdc1a135c Mon Sep 17 00:00:00 2001 From: Paik Date: Wed, 28 Aug 2024 08:30:24 +0900 Subject: [PATCH 12/19] Refactor test code to use EventCollector --- packages/sdk/test/integration/client_test.ts | 27 ++++++++------------ 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index c94000840..59f4a3000 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -907,23 +907,18 @@ it('Should trigger the handler for a subscribed broadcast event', async ({ }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const spy = vi.fn(); - + const eventCollector = new EventCollector<[string, any]>(); const broadcastTopic = 'test'; const unsubscribe = d2.subscribe( { type: 'broadcast', topic: broadcastTopic }, - spy, + (topic, payload) => { + eventCollector.add([topic, payload]); + }, ); const payload = { a: 1, b: '2' }; await d1.broadcast(broadcastTopic, payload); - - // Assuming that every subscriber can receive the broadcast event within 1000ms. - await new Promise((res) => setTimeout(res, 1000)); - - expect(spy.mock.calls.length).toBe(1); - expect(spy.mock.calls[0][0]).toBe(broadcastTopic); - expect(spy.mock.calls[0][1]).toEqual(payload); + await eventCollector.waitAndVerifyNthEvent(1, [broadcastTopic, payload]); unsubscribe(); }, @@ -969,19 +964,19 @@ it('Should not trigger the handler for a broadcast event after unsubscribing', a async (c1, d1, c2, d2) => { const spy = vi.fn(); + const eventCollector = new EventCollector<[string, any]>(); const broadcastTopic = 'test'; const unsubscribe = d2.subscribe( { type: 'broadcast', topic: broadcastTopic }, - spy, + (topic, payload) => { + spy(); + eventCollector.add([topic, payload]); + }, ); const payload = { a: 1, b: '2' }; await d1.broadcast(broadcastTopic, payload); - - // Assuming that every subscriber can receive the broadcast event within 1000ms. - await new Promise((res) => setTimeout(res, 1000)); - - expect(spy.mock.calls.length).toBe(1); + await eventCollector.waitAndVerifyNthEvent(1, [broadcastTopic, payload]); unsubscribe(); From 86035b8b00b9f19a09ca5f2e6f82c39494ada5af Mon Sep 17 00:00:00 2001 From: Paik Date: Wed, 28 Aug 2024 09:19:20 +0900 Subject: [PATCH 13/19] Refactor by removing unnecessary broadcastEventHanlders --- packages/sdk/src/document/document.ts | 32 +++----------------- packages/sdk/test/integration/client_test.ts | 16 +++++----- 2 files changed, 12 insertions(+), 36 deletions(-) diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 1fcf8842b..d4b637f3d 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -400,7 +400,7 @@ type DocEventCallbackMap

= { connection: NextFn; status: NextFn; sync: NextFn; - broadcast: (topic: string, payload: any) => void; + broadcast: (payload: any) => void; all: NextFn>; }; export type DocEventTopic = keyof DocEventCallbackMap; @@ -617,15 +617,6 @@ export class Document { */ private isUpdating: boolean; - /** - * `broadcastEventHandlers` is a map of broadcast event handlers. - * The key is the topic of the broadcast event, and the value is the handler. - */ - private broadcastEventHandlers: Map< - string, - DocEventCallbackMap

['broadcast'] - >; - private client?: Client; constructor(key: string, opts?: DocumentOptions) { @@ -655,8 +646,6 @@ export class Document { redo: this.redo.bind(this), }; - this.broadcastEventHandlers = new Map(); - setupDevtools(this); } @@ -1085,23 +1074,18 @@ export class Document { const { topic } = arg1 as BroadcastSubscribePair; const handler = arg2 as DocEventCallbackMap

['broadcast']; const error = arg3 as ErrorFn; - this.broadcastEventHandlers.set(topic, handler); - const unsubscribe = this.eventStream.subscribe((event) => { + + return this.eventStream.subscribe((event) => { for (const docEvent of event) { if (docEvent.type !== DocEventType.Broadcast) { continue; } if (docEvent.value.topic === topic) { - handler(topic, docEvent.value.payload); + handler(docEvent.value.payload); } } }, error); - - return () => { - unsubscribe(); - this.broadcastEventHandlers.delete(topic); - }; } } throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`); @@ -1684,14 +1668,6 @@ export class Document { const { clientID, presence } = event.value; this.presences.set(clientID, presence); } - - if (event.type === DocEventType.Broadcast) { - const { topic, payload } = event.value; - const handler = this.broadcastEventHandlers.get(topic); - if (handler) { - handler(topic, payload); - } - } } /** diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 59f4a3000..e6c991998 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -907,18 +907,18 @@ it('Should trigger the handler for a subscribed broadcast event', async ({ }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector<[string, any]>(); + const eventCollector = new EventCollector<[any]>(); const broadcastTopic = 'test'; const unsubscribe = d2.subscribe( { type: 'broadcast', topic: broadcastTopic }, - (topic, payload) => { - eventCollector.add([topic, payload]); + (payload) => { + eventCollector.add([payload]); }, ); const payload = { a: 1, b: '2' }; await d1.broadcast(broadcastTopic, payload); - await eventCollector.waitAndVerifyNthEvent(1, [broadcastTopic, payload]); + await eventCollector.waitAndVerifyNthEvent(1, [payload]); unsubscribe(); }, @@ -964,19 +964,19 @@ it('Should not trigger the handler for a broadcast event after unsubscribing', a async (c1, d1, c2, d2) => { const spy = vi.fn(); - const eventCollector = new EventCollector<[string, any]>(); + const eventCollector = new EventCollector<[any]>(); const broadcastTopic = 'test'; const unsubscribe = d2.subscribe( { type: 'broadcast', topic: broadcastTopic }, - (topic, payload) => { + (payload) => { spy(); - eventCollector.add([topic, payload]); + eventCollector.add([payload]); }, ); const payload = { a: 1, b: '2' }; await d1.broadcast(broadcastTopic, payload); - await eventCollector.waitAndVerifyNthEvent(1, [broadcastTopic, payload]); + await eventCollector.waitAndVerifyNthEvent(1, [payload]); unsubscribe(); From 41da599f00b0e7c08ed9323b2a64b278bdc6e129 Mon Sep 17 00:00:00 2001 From: Yourim Cha Date: Wed, 28 Aug 2024 12:21:50 +0900 Subject: [PATCH 14/19] Refactor test codes --- packages/sdk/test/integration/client_test.ts | 147 +++++++++---------- 1 file changed, 72 insertions(+), 75 deletions(-) diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index e6c991998..487a616a5 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -900,95 +900,92 @@ describe.sequential('Client', function () { await cli.deactivate(); }); -}); -it('Should trigger the handler for a subscribed broadcast event', async ({ - task, -}) => { - await withTwoClientsAndDocuments<{ t: Text }>( - async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector<[any]>(); - const broadcastTopic = 'test'; - const unsubscribe = d2.subscribe( - { type: 'broadcast', topic: broadcastTopic }, - (payload) => { - eventCollector.add([payload]); - }, - ); + it('Should trigger the handler for a subscribed broadcast event', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe( + { type: 'broadcast', topic: broadcastTopic }, + (payload) => { + eventCollector.add(payload); + }, + ); - const payload = { a: 1, b: '2' }; - await d1.broadcast(broadcastTopic, payload); - await eventCollector.waitAndVerifyNthEvent(1, [payload]); + const payload = { a: 1, b: '2' }; + await d1.broadcast(broadcastTopic, payload); + await eventCollector.waitAndVerifyNthEvent(1, payload); - unsubscribe(); - }, - task.name, - SyncMode.Realtime, - ); -}); + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); -it('Should not trigger the handler for an unsubscribed broadcast event', async ({ - task, -}) => { - await withTwoClientsAndDocuments<{ t: Text }>( - async (c1, d1, c2, d2) => { - const spy = vi.fn(); + it('Should not trigger the handler for an unsubscribed broadcast event', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const spy = vi.fn(); - const broadcastTopic1 = 'test1'; - const broadcastTopic2 = 'test2'; + const broadcastTopic1 = 'test1'; + const broadcastTopic2 = 'test2'; - const unsubscribe = d2.subscribe( - { type: 'broadcast', topic: broadcastTopic2 }, - spy, - ); + const unsubscribe = d2.subscribe( + { type: 'broadcast', topic: broadcastTopic2 }, + spy, + ); - const payload = { a: 1, b: '2' }; - await d1.broadcast(broadcastTopic1, payload); + const payload = { a: 1, b: '2' }; + await d1.broadcast(broadcastTopic1, payload); - // Assuming that every subscriber can receive the broadcast event within 1000ms. - await new Promise((res) => setTimeout(res, 1000)); + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); - expect(spy.mock.calls.length).toBe(0); + expect(spy.mock.calls.length).toBe(0); - unsubscribe(); - }, - task.name, - SyncMode.Realtime, - ); -}); - -it('Should not trigger the handler for a broadcast event after unsubscribing', async ({ - task, -}) => { - await withTwoClientsAndDocuments<{ t: Text }>( - async (c1, d1, c2, d2) => { - const spy = vi.fn(); + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); - const eventCollector = new EventCollector<[any]>(); - const broadcastTopic = 'test'; - const unsubscribe = d2.subscribe( - { type: 'broadcast', topic: broadcastTopic }, - (payload) => { - spy(); - eventCollector.add([payload]); - }, - ); + it('Should not trigger the handler for a broadcast event after unsubscribing', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe( + { type: 'broadcast', topic: broadcastTopic }, + (payload) => { + eventCollector.add(payload); + }, + ); - const payload = { a: 1, b: '2' }; - await d1.broadcast(broadcastTopic, payload); - await eventCollector.waitAndVerifyNthEvent(1, [payload]); + const payload = { a: 1, b: '2' }; + await d1.broadcast(broadcastTopic, payload); + await eventCollector.waitAndVerifyNthEvent(1, payload); - unsubscribe(); + unsubscribe(); - await d1.broadcast(broadcastTopic, payload); + await d1.broadcast(broadcastTopic, payload); - // Assuming that every subscriber can receive the broadcast event within 1000ms. - await new Promise((res) => setTimeout(res, 1000)); + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); - // No change in the number of calls - expect(spy.mock.calls.length).toBe(1); - }, - task.name, - SyncMode.Realtime, - ); + // No change in the number of calls + assert.equal(eventCollector.getLength(), 1); + }, + task.name, + SyncMode.Realtime, + ); + }); }); From 5ac5e2b516211723e70d10b7128a402a1e6c3e6e Mon Sep 17 00:00:00 2001 From: Paik Date: Thu, 29 Aug 2024 09:24:13 +0900 Subject: [PATCH 15/19] Refactor Broadcast Subscription Interface to Enable Manual Topic Comparison in Handlers --- packages/sdk/src/document/document.ts | 56 +++++-------------- packages/sdk/test/integration/client_test.ts | 59 +++++++++++--------- 2 files changed, 47 insertions(+), 68 deletions(-) diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index d4b637f3d..197b93f4f 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -400,7 +400,7 @@ type DocEventCallbackMap

= { connection: NextFn; status: NextFn; sync: NextFn; - broadcast: (payload: any) => void; + broadcast: (topic: string, payload: any) => void; all: NextFn>; }; export type DocEventTopic = keyof DocEventCallbackMap; @@ -552,21 +552,6 @@ type PathOf = PathOfInternal< Depth >; -/* - * `SubscribePair` represents the type of the subscribe pair. - */ -type SubscribePair = { - type: string; -}; - -/* - * `BroadcastSubscribePair` represents the type of the broadcast subscribe pair. - */ -type BroadcastSubscribePair = { - type: 'broadcast'; - topic: string; -} & SubscribePair; - /** * `Document` is a CRDT-based data type. We can represent the model * of the application and edit it even while offline. @@ -853,7 +838,7 @@ export class Document { * The callback will be called when the document is changed. */ public subscribe( - type: BroadcastSubscribePair, + type: 'broadcast', next: DocEventCallbackMap

['broadcast'], error?: ErrorFn, ): Unsubscribe; @@ -873,11 +858,7 @@ export class Document { TPath extends PathOf, TOperationInfo extends OperationInfoOf, >( - arg1: - | TPath - | DocEventTopic - | DocEventCallbackMap

['default'] - | SubscribePair, + arg1: TPath | DocEventTopic | DocEventCallbackMap

['default'], arg2?: | NextFn< | LocalChangeEvent @@ -1009,6 +990,17 @@ export class Document { arg4, ); } + if (arg1 === 'broadcast') { + const callback = arg2 as DocEventCallbackMap

['broadcast']; + return this.eventStream.subscribe((event) => { + for (const docEvent of event) { + if (docEvent.type !== DocEventType.Broadcast) { + continue; + } + callback(docEvent.value.topic, docEvent.value.payload); + } + }, arg3); + } if (arg1 === 'all') { const callback = arg2 as DocEventCallbackMap

['all']; return this.eventStream.subscribe(callback, arg3, arg4); @@ -1067,27 +1059,7 @@ export class Document { complete, ); } - if (typeof arg1 === 'object') { - const { type } = arg1 as SubscribePair; - - if (type === 'broadcast') { - const { topic } = arg1 as BroadcastSubscribePair; - const handler = arg2 as DocEventCallbackMap

['broadcast']; - const error = arg3 as ErrorFn; - - return this.eventStream.subscribe((event) => { - for (const docEvent of event) { - if (docEvent.type !== DocEventType.Broadcast) { - continue; - } - if (docEvent.value.topic === topic) { - handler(docEvent.value.payload); - } - } - }, error); - } - } throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`); } diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 487a616a5..598a6d384 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -906,18 +906,20 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector(); + const eventCollector = new EventCollector<[string, any]>(); const broadcastTopic = 'test'; - const unsubscribe = d2.subscribe( - { type: 'broadcast', topic: broadcastTopic }, - (payload) => { - eventCollector.add(payload); - }, - ); + const unsubscribe = d2.subscribe('broadcast', (topic, payload) => { + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); const payload = { a: 1, b: '2' }; await d1.broadcast(broadcastTopic, payload); - await eventCollector.waitAndVerifyNthEvent(1, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); unsubscribe(); }, @@ -931,23 +933,26 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const spy = vi.fn(); - + const eventCollector = new EventCollector<[string, any]>(); const broadcastTopic1 = 'test1'; const broadcastTopic2 = 'test2'; - const unsubscribe = d2.subscribe( - { type: 'broadcast', topic: broadcastTopic2 }, - spy, - ); + const unsubscribe = d2.subscribe('broadcast', (topic, payload) => { + if (topic === broadcastTopic1) { + eventCollector.add([topic, payload]); + } else if (topic === broadcastTopic2) { + eventCollector.add([topic, payload]); + } + }); const payload = { a: 1, b: '2' }; await d1.broadcast(broadcastTopic1, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic1, + payload, + ]); - // Assuming that every subscriber can receive the broadcast event within 1000ms. - await new Promise((res) => setTimeout(res, 1000)); - - expect(spy.mock.calls.length).toBe(0); + assert.equal(eventCollector.getLength(), 1); unsubscribe(); }, @@ -961,18 +966,20 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector(); + const eventCollector = new EventCollector<[string, any]>(); const broadcastTopic = 'test'; - const unsubscribe = d2.subscribe( - { type: 'broadcast', topic: broadcastTopic }, - (payload) => { - eventCollector.add(payload); - }, - ); + const unsubscribe = d2.subscribe('broadcast', (topic, payload) => { + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); const payload = { a: 1, b: '2' }; await d1.broadcast(broadcastTopic, payload); - await eventCollector.waitAndVerifyNthEvent(1, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); unsubscribe(); From fe3c0e9ca19d5ef7181b22f066a3bac3e4b61ec0 Mon Sep 17 00:00:00 2001 From: Paik Date: Fri, 30 Aug 2024 08:17:19 +0900 Subject: [PATCH 16/19] Remove client from document to prevent circular references Refactor to have Client subscribe to Document's broadcast events and handle them by calling Client's broadcast method --- packages/sdk/src/client/attachment.ts | 12 ++++---- packages/sdk/src/client/client.ts | 10 +++++-- packages/sdk/src/document/document.ts | 26 ++++-------------- packages/sdk/test/integration/client_test.ts | 29 ++++---------------- 4 files changed, 24 insertions(+), 53 deletions(-) diff --git a/packages/sdk/src/client/attachment.ts b/packages/sdk/src/client/attachment.ts index ef3ca3be3..18aec7899 100644 --- a/packages/sdk/src/client/attachment.ts +++ b/packages/sdk/src/client/attachment.ts @@ -1,5 +1,6 @@ import { Document, Indexable } from '@yorkie-js-sdk/src/document/document'; import { SyncMode } from '@yorkie-js-sdk/src/client/client'; +import { Unsubscribe } from '../yorkie'; /** * `WatchStream` is a stream that watches the changes of the document. @@ -21,17 +22,21 @@ export class Attachment { watchLoopTimerID?: ReturnType; watchAbortController?: AbortController; + unsubscribeBroadcastEvent: Unsubscribe; + constructor( reconnectStreamDelay: number, doc: Document, docID: string, syncMode: SyncMode, + unsubscribeBroacastEvent: Unsubscribe, ) { this.reconnectStreamDelay = reconnectStreamDelay; this.doc = doc; this.docID = docID; this.syncMode = syncMode; this.remoteChangeEventReceived = false; + this.unsubscribeBroadcastEvent = unsubscribeBroacastEvent; } /** @@ -107,11 +112,4 @@ export class Attachment { clearTimeout(this.watchLoopTimerID); this.watchLoopTimerID = undefined; } - - /** - * `unsetClient` unsets the client of the document. - */ - public unsetClient(): void { - this.doc.setClient(); - } } diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index f2a4c6614..48ee73da9 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -304,7 +304,12 @@ export class Client { } doc.setActor(this.id!); doc.update((_, p) => p.set(options.initialPresence || {})); - doc.setClient(this); + const unsubscribeBroacastEvent = doc.subscribe( + 'broadcast', + (topic, payload) => { + this.broadcast(doc.getKey(), topic, payload); + }, + ); const syncMode = options.syncMode ?? SyncMode.Realtime; return this.enqueueTask(async () => { @@ -331,6 +336,7 @@ export class Client { doc, res.documentId, syncMode, + unsubscribeBroacastEvent, ), ); @@ -802,8 +808,8 @@ export class Client { return; } - attachment.unsetClient(); attachment.cancelWatchStream(); + attachment.unsubscribeBroadcastEvent(); this.attachmentMap.delete(docKey); } diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 197b93f4f..1cabd94a7 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -78,7 +78,6 @@ import { import { History, HistoryOperation } from '@yorkie-js-sdk/src/document/history'; import { setupDevtools } from '@yorkie-js-sdk/src/devtools'; import * as Devtools from '@yorkie-js-sdk/src/devtools/types'; -import { Client } from '@yorkie-js-sdk/src/client/client'; /** * `DocumentOptions` are the options to create a new document. @@ -602,8 +601,6 @@ export class Document { */ private isUpdating: boolean; - private client?: Client; - constructor(key: string, opts?: DocumentOptions) { this.opts = opts || {}; @@ -1286,15 +1283,6 @@ export class Document { return this.root.getGarbageLen(); } - /** - * `setClient` sets the client of this document. - * - * @internal - */ - public setClient(client?: Client): void { - this.client = client; - } - /** * `getGarbageLenFromClone` returns the length of elements should be purged from clone. */ @@ -2030,14 +2018,12 @@ export class Document { /** * `broadcast` the payload to the given topic. */ - public broadcast(topic: string, payload: any): Promise { - if (this.client) { - return this.client.broadcast(this.getKey(), topic, payload); - } + public broadcast(topic: string, payload: any) { + const broadcastEvent: BroadcastEvent = { + type: DocEventType.Broadcast, + value: { topic, payload }, + }; - throw new YorkieError( - Code.ErrClientNotFound, - 'Document is not attached to a client', - ); + this.publish([broadcastEvent]); } } diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 598a6d384..a6d3cc69e 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -881,26 +881,6 @@ describe.sequential('Client', function () { await cli.deactivate(); }); - it('Should throw error when broadcasting unserializeable payload', async ({ - task, - }) => { - const cli = new yorkie.Client(testRPCAddr); - await cli.activate(); - - const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); - await cli.attach(doc); - - // broadcast unserializable payload - const payload = () => {}; - const broadcastTopic = 'test'; - - expect(async () => - doc.broadcast(broadcastTopic, payload), - ).rejects.toThrowErrorCode(Code.ErrInvalidArgument); - - await cli.deactivate(); - }); - it('Should trigger the handler for a subscribed broadcast event', async ({ task, }) => { @@ -915,7 +895,7 @@ describe.sequential('Client', function () { }); const payload = { a: 1, b: '2' }; - await d1.broadcast(broadcastTopic, payload); + d1.broadcast(broadcastTopic, payload); await eventCollector.waitAndVerifyNthEvent(1, [ broadcastTopic, payload, @@ -946,7 +926,7 @@ describe.sequential('Client', function () { }); const payload = { a: 1, b: '2' }; - await d1.broadcast(broadcastTopic1, payload); + d1.broadcast(broadcastTopic1, payload); await eventCollector.waitAndVerifyNthEvent(1, [ broadcastTopic1, payload, @@ -975,7 +955,8 @@ describe.sequential('Client', function () { }); const payload = { a: 1, b: '2' }; - await d1.broadcast(broadcastTopic, payload); + + d1.broadcast(broadcastTopic, payload); await eventCollector.waitAndVerifyNthEvent(1, [ broadcastTopic, payload, @@ -983,7 +964,7 @@ describe.sequential('Client', function () { unsubscribe(); - await d1.broadcast(broadcastTopic, payload); + d1.broadcast(broadcastTopic, payload); // Assuming that every subscriber can receive the broadcast event within 1000ms. await new Promise((res) => setTimeout(res, 1000)); From e88e3c108a95d04d9981b41d8f901e81f9cf8e3b Mon Sep 17 00:00:00 2001 From: Paik Date: Fri, 30 Aug 2024 09:45:04 +0900 Subject: [PATCH 17/19] Handle the case when broadcast event fails This commit handles the case when broadcast event fails such as when trying to broadcast unserializable payload --- packages/sdk/src/client/client.ts | 10 ++++++-- packages/sdk/src/document/document.ts | 12 +++++++--- packages/sdk/test/integration/client_test.ts | 24 ++++++++++++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index 48ee73da9..7f0a63990 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -306,8 +306,14 @@ export class Client { doc.update((_, p) => p.set(options.initialPresence || {})); const unsubscribeBroacastEvent = doc.subscribe( 'broadcast', - (topic, payload) => { - this.broadcast(doc.getKey(), topic, payload); + (topic, payload, onBroadcastError) => { + try { + this.broadcast(doc.getKey(), topic, payload); + } catch (e: unknown) { + if (e instanceof Error) { + onBroadcastError?.(e); + } + } }, ); diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 1cabd94a7..efdf34d22 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -380,6 +380,7 @@ export interface PresenceChangedEvent

export interface BroadcastEvent extends BaseDocEvent { type: DocEventType.Broadcast; value: { topic: string; payload: any }; + error?: ErrorFn; } type DocEventCallbackMap

= { @@ -399,7 +400,7 @@ type DocEventCallbackMap

= { connection: NextFn; status: NextFn; sync: NextFn; - broadcast: (topic: string, payload: any) => void; + broadcast: (topic: string, payload: any, error?: ErrorFn) => void; all: NextFn>; }; export type DocEventTopic = keyof DocEventCallbackMap; @@ -994,7 +995,11 @@ export class Document { if (docEvent.type !== DocEventType.Broadcast) { continue; } - callback(docEvent.value.topic, docEvent.value.payload); + callback( + docEvent.value.topic, + docEvent.value.payload, + docEvent.error, + ); } }, arg3); } @@ -2018,10 +2023,11 @@ export class Document { /** * `broadcast` the payload to the given topic. */ - public broadcast(topic: string, payload: any) { + public broadcast(topic: string, payload: any, error?: ErrorFn) { const broadcastEvent: BroadcastEvent = { type: DocEventType.Broadcast, value: { topic, payload }, + error, }; this.publish([broadcastEvent]); diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index a6d3cc69e..249619505 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -881,6 +881,30 @@ describe.sequential('Client', function () { await cli.deactivate(); }); + it('Should throw error when broadcasting unserializeable payload', async ({ + task, + }) => { + const spy = vi.fn(); + const cli = new yorkie.Client(testRPCAddr); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); + await cli.attach(doc); + + // broadcast unserializable payload + const payload = () => {}; + const broadcastTopic = 'test'; + + doc.broadcast(broadcastTopic, payload, spy); + + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); + + expect(spy).toBeCalledTimes(1); + + await cli.deactivate(); + }); + it('Should trigger the handler for a subscribed broadcast event', async ({ task, }) => { From 43ae22c6eae0ccbe2126e4875fdc65cae64d7305 Mon Sep 17 00:00:00 2001 From: Paik Date: Fri, 30 Aug 2024 11:32:40 +0900 Subject: [PATCH 18/19] Refactor test code to remove undeterministic Promise --- packages/sdk/test/integration/client_test.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 249619505..b0614ac7a 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -884,7 +884,7 @@ describe.sequential('Client', function () { it('Should throw error when broadcasting unserializeable payload', async ({ task, }) => { - const spy = vi.fn(); + const eventCollector = new EventCollector(); const cli = new yorkie.Client(testRPCAddr); await cli.activate(); @@ -894,13 +894,15 @@ describe.sequential('Client', function () { // broadcast unserializable payload const payload = () => {}; const broadcastTopic = 'test'; + const broadcastErrMessage = 'payload is not serializable'; - doc.broadcast(broadcastTopic, payload, spy); + const errorHandler = (error: Error) => { + eventCollector.add(error.message); + }; - // Assuming that every subscriber can receive the broadcast event within 1000ms. - await new Promise((res) => setTimeout(res, 1000)); + doc.broadcast(broadcastTopic, payload, errorHandler); - expect(spy).toBeCalledTimes(1); + await eventCollector.waitAndVerifyNthEvent(1, broadcastErrMessage); await cli.deactivate(); }); From ffe525fe8cee9e7a5aa7b5382cb198b2426e8055 Mon Sep 17 00:00:00 2001 From: Paik Date: Sun, 1 Sep 2024 11:55:28 +0900 Subject: [PATCH 19/19] Fix bug where publisher receives its own broadcast event --- packages/sdk/src/client/client.ts | 13 +++-- packages/sdk/src/document/document.ts | 59 +++++++++++++++---- packages/sdk/test/integration/client_test.ts | 60 +++++++++++++++++++- 3 files changed, 112 insertions(+), 20 deletions(-) diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index 7f0a63990..aa406ed40 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -305,13 +305,16 @@ export class Client { doc.setActor(this.id!); doc.update((_, p) => p.set(options.initialPresence || {})); const unsubscribeBroacastEvent = doc.subscribe( - 'broadcast', - (topic, payload, onBroadcastError) => { + 'local-broadcast', + (event) => { + const { topic, payload } = event.value; + const errorFn = event.error; + try { this.broadcast(doc.getKey(), topic, payload); - } catch (e: unknown) { - if (e instanceof Error) { - onBroadcastError?.(e); + } catch (error: unknown) { + if (error instanceof Error) { + errorFn?.(error); } } }, diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index efdf34d22..39007afe5 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -175,9 +175,14 @@ export enum DocEventType { PresenceChanged = 'presence-changed', /** - * `Broadcast` means that the message is broadcasted to clients who subscribe to the event. + * `Broadcast` means that the broadcast event is received from the remote client. */ Broadcast = 'broadcast', + + /** + * `LocalBroadcast` means that the broadcast event is sent from the local client. + */ + LocalBroadcast = 'local-broadcast', } /** @@ -197,7 +202,8 @@ export type DocEvent

= | WatchedEvent

| UnwatchedEvent

| PresenceChangedEvent

- | BroadcastEvent; + | BroadcastEvent + | LocalBroadcastEvent; /** * `TransactionEvent` represents document events that occur within @@ -379,6 +385,12 @@ export interface PresenceChangedEvent

export interface BroadcastEvent extends BaseDocEvent { type: DocEventType.Broadcast; + value: { clientID: ActorID; topic: string; payload: any }; + error?: ErrorFn; +} + +export interface LocalBroadcastEvent extends BaseDocEvent { + type: DocEventType.LocalBroadcast; value: { topic: string; payload: any }; error?: ErrorFn; } @@ -400,7 +412,8 @@ type DocEventCallbackMap

= { connection: NextFn; status: NextFn; sync: NextFn; - broadcast: (topic: string, payload: any, error?: ErrorFn) => void; + broadcast: NextFn; + 'local-broadcast': NextFn; all: NextFn>; }; export type DocEventTopic = keyof DocEventCallbackMap; @@ -833,13 +846,22 @@ export class Document { ): Unsubscribe; /** * `subscribe` registers a callback to subscribe to events on the document. - * The callback will be called when the document is changed. + * The callback will be called when the broadcast event is received from the remote client. */ public subscribe( type: 'broadcast', next: DocEventCallbackMap

['broadcast'], error?: ErrorFn, ): Unsubscribe; + /** + * `subscribe` registers a callback to subscribe to events on the document. + * The callback will be called when the local client sends a broadcast event. + */ + public subscribe( + type: 'local-broadcast', + next: DocEventCallbackMap

['local-broadcast'], + error?: ErrorFn, + ): Unsubscribe; /** * `subscribe` registers a callback to subscribe to events on the document. */ @@ -988,6 +1010,18 @@ export class Document { arg4, ); } + if (arg1 === 'local-broadcast') { + const callback = arg2 as DocEventCallbackMap

['local-broadcast']; + return this.eventStream.subscribe((event) => { + for (const docEvent of event) { + if (docEvent.type !== DocEventType.LocalBroadcast) { + continue; + } + + callback(docEvent); + } + }, arg3); + } if (arg1 === 'broadcast') { const callback = arg2 as DocEventCallbackMap

['broadcast']; return this.eventStream.subscribe((event) => { @@ -995,11 +1029,8 @@ export class Document { if (docEvent.type !== DocEventType.Broadcast) { continue; } - callback( - docEvent.value.topic, - docEvent.value.payload, - docEvent.error, - ); + + callback(docEvent); } }, arg3); } @@ -1541,7 +1572,11 @@ export class Document { event.push({ type: DocEventType.Broadcast, - value: { topic, payload: JSON.parse(decoder.decode(payload)) }, + value: { + clientID: publisher, + topic, + payload: JSON.parse(decoder.decode(payload)), + }, }); } } @@ -2024,8 +2059,8 @@ export class Document { * `broadcast` the payload to the given topic. */ public broadcast(topic: string, payload: any, error?: ErrorFn) { - const broadcastEvent: BroadcastEvent = { - type: DocEventType.Broadcast, + const broadcastEvent: LocalBroadcastEvent = { + type: DocEventType.LocalBroadcast, value: { topic, payload }, error, }; diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index b0614ac7a..004e0b506 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -914,7 +914,9 @@ describe.sequential('Client', function () { async (c1, d1, c2, d2) => { const eventCollector = new EventCollector<[string, any]>(); const broadcastTopic = 'test'; - const unsubscribe = d2.subscribe('broadcast', (topic, payload) => { + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + if (topic === broadcastTopic) { eventCollector.add([topic, payload]); } @@ -927,6 +929,8 @@ describe.sequential('Client', function () { payload, ]); + assert.equal(eventCollector.getLength(), 1); + unsubscribe(); }, task.name, @@ -943,7 +947,9 @@ describe.sequential('Client', function () { const broadcastTopic1 = 'test1'; const broadcastTopic2 = 'test2'; - const unsubscribe = d2.subscribe('broadcast', (topic, payload) => { + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + if (topic === broadcastTopic1) { eventCollector.add([topic, payload]); } else if (topic === broadcastTopic2) { @@ -974,7 +980,9 @@ describe.sequential('Client', function () { async (c1, d1, c2, d2) => { const eventCollector = new EventCollector<[string, any]>(); const broadcastTopic = 'test'; - const unsubscribe = d2.subscribe('broadcast', (topic, payload) => { + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + if (topic === broadcastTopic) { eventCollector.add([topic, payload]); } @@ -1002,4 +1010,50 @@ describe.sequential('Client', function () { SyncMode.Realtime, ); }); + + it('Should not trigger the handler for a broadcast event sent by the publisher to itself', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector1 = new EventCollector<[string, any]>(); + const eventCollector2 = new EventCollector<[string, any]>(); + const broadcastTopic = 'test'; + const payload = { a: 1, b: '2' }; + + // Publisher subscribes to the broadcast event + const unsubscribe1 = d1.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector1.add([topic, payload]); + } + }); + + const unsubscribe2 = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector2.add([topic, payload]); + } + }); + + d1.broadcast(broadcastTopic, payload); + + // Assuming that D2 takes longer to receive the broadcast event compared to D1 + await eventCollector2.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + unsubscribe1(); + unsubscribe2(); + + assert.equal(eventCollector1.getLength(), 0); + assert.equal(eventCollector2.getLength(), 1); + }, + task.name, + SyncMode.Realtime, + ); + }); });