Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce broadcast API for event sharing #884

Merged
merged 21 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f6ec48a
Implement subscription to broadcast events
gwbaik9717 Aug 20, 2024
6284cfe
Implement publishing broadcast events
gwbaik9717 Aug 20, 2024
0472c0a
Decode broadcasted payload
gwbaik9717 Aug 21, 2024
a1442b7
Add validation logic for serializable payload
gwbaik9717 Aug 21, 2024
323f981
Add test case for throwing error when trying to broadcast unserialize…
gwbaik9717 Aug 21, 2024
b692441
Fix bug in subscribeBroadcastEvent method
gwbaik9717 Aug 21, 2024
66c996a
Add test case for successfully broadcast serializeable payload
gwbaik9717 Aug 22, 2024
bc8b66c
Merge branch 'main' into broadcast-api
gwbaik9717 Aug 22, 2024
b0b2766
Add test cases for subscribing and unsubscribing broadcast events
gwbaik9717 Aug 22, 2024
06d37fa
Modify interface for subscribing broadcast events
gwbaik9717 Aug 26, 2024
8060985
Refactor the broadcast method to be called directly by the document o…
gwbaik9717 Aug 26, 2024
4d516be
Fix lint errors
gwbaik9717 Aug 26, 2024
69650c0
Refactor test code to use EventCollector
gwbaik9717 Aug 27, 2024
86035b8
Refactor by removing unnecessary broadcastEventHanlders
gwbaik9717 Aug 28, 2024
41da599
Refactor test codes
chacha912 Aug 28, 2024
bc49062
Merge branch 'main' into broadcast-api
gwbaik9717 Aug 28, 2024
5ac5e2b
Refactor Broadcast Subscription Interface to Enable Manual Topic Comp…
gwbaik9717 Aug 29, 2024
fe3c0e9
Remove client from document to prevent circular references
gwbaik9717 Aug 29, 2024
e88e3c1
Handle the case when broadcast event fails
gwbaik9717 Aug 30, 2024
43ae22c
Refactor test code to remove undeterministic Promise
gwbaik9717 Aug 30, 2024
ffe525f
Fix bug where publisher receives its own broadcast event
gwbaik9717 Sep 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/sdk/src/client/attachment.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Document, Indexable } from '@yorkie-js-sdk/src/document/document';
import { SyncMode } from '@yorkie-js-sdk/src/client/client';
import { Unsubscribe } from '../yorkie';

/**
* `WatchStream` is a stream that watches the changes of the document.
Expand All @@ -21,17 +22,21 @@ export class Attachment<T, P extends Indexable> {
watchLoopTimerID?: ReturnType<typeof setTimeout>;
watchAbortController?: AbortController;

unsubscribeBroadcastEvent: Unsubscribe;

constructor(
reconnectStreamDelay: number,
doc: Document<T, P>,
docID: string,
syncMode: SyncMode,
unsubscribeBroacastEvent: Unsubscribe,
) {
this.reconnectStreamDelay = reconnectStreamDelay;
this.doc = doc;
this.docID = docID;
this.syncMode = syncMode;
this.remoteChangeEventReceived = false;
this.unsubscribeBroadcastEvent = unsubscribeBroacastEvent;
}

/**
Expand Down
68 changes: 68 additions & 0 deletions packages/sdk/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation';
import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor';
import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor';
import { validateSerializable } from '../util/validator';

/**
* `SyncMode` defines synchronization modes for the PushPullChanges API.
Expand Down Expand Up @@ -303,6 +304,18 @@ export class Client {
}
doc.setActor(this.id!);
doc.update((_, p) => p.set(options.initialPresence || {}));
const unsubscribeBroacastEvent = doc.subscribe(
'broadcast',
(topic, payload, onBroadcastError) => {
try {
this.broadcast(doc.getKey(), topic, payload);
} catch (e: unknown) {
if (e instanceof Error) {
onBroadcastError?.(e);
}
}
},
);

const syncMode = options.syncMode ?? SyncMode.Realtime;
return this.enqueueTask(async () => {
Expand All @@ -329,6 +342,7 @@ export class Client {
doc,
res.documentId,
syncMode,
unsubscribeBroacastEvent,
),
);

Expand Down Expand Up @@ -584,6 +598,59 @@ export class Client {
return this.conditions[condition];
}

/**
* `broadcast` broadcasts the given payload to the given topic.
*/
public broadcast(
docKey: DocumentKey,
topic: string,
payload: any,
): Promise<void> {
if (!this.isActive()) {
throw new YorkieError(
Code.ErrClientNotActivated,
`${this.key} is not active`,
);
}
const attachment = this.attachmentMap.get(docKey);
if (!attachment) {
throw new YorkieError(
Code.ErrDocumentNotAttached,
`${docKey} is not attached`,
);
}

if (!validateSerializable(payload)) {
throw new YorkieError(
Code.ErrInvalidArgument,
'payload is not serializable',
);
}

return this.enqueueTask(async () => {
return this.rpcClient
.broadcast(
{
clientId: this.id!,
documentId: attachment.docID,
topic,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } },
)
.then(() => {
logger.info(
`[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`,
);
})
.catch((err) => {
logger.error(`[BC] c:"${this.getKey()}" err :`, err);
this.handleConnectError(err);
throw err;
});
});
}

/**
* `runSyncLoop` runs the sync loop. The sync loop pushes local changes to
* the server and pulls remote changes from the server.
Expand Down Expand Up @@ -748,6 +815,7 @@ export class Client {
}

attachment.cancelWatchStream();
attachment.unsubscribeBroadcastEvent();
this.attachmentMap.delete(docKey);
}

Expand Down
66 changes: 64 additions & 2 deletions packages/sdk/src/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ export enum DocEventType {
* `PresenceChanged` means that the presences of the client has updated.
*/
PresenceChanged = 'presence-changed',

/**
* `Broadcast` means that the message is broadcasted to clients who subscribe to the event.
*/
Broadcast = 'broadcast',
}

/**
Expand All @@ -191,7 +196,8 @@ export type DocEvent<P extends Indexable = Indexable, T = OperationInfo> =
| InitializedEvent<P>
| WatchedEvent<P>
| UnwatchedEvent<P>
| PresenceChangedEvent<P>;
| PresenceChangedEvent<P>
| BroadcastEvent;

/**
* `TransactionEvent` represents document events that occur within
Expand Down Expand Up @@ -371,6 +377,12 @@ export interface PresenceChangedEvent<P extends Indexable>
value: { clientID: ActorID; presence: P };
}

export interface BroadcastEvent extends BaseDocEvent {
type: DocEventType.Broadcast;
value: { topic: string; payload: any };
error?: ErrorFn;
}

type DocEventCallbackMap<P extends Indexable> = {
default: NextFn<
| SnapshotEvent
Expand All @@ -388,6 +400,7 @@ type DocEventCallbackMap<P extends Indexable> = {
connection: NextFn<ConnectionChangedEvent>;
status: NextFn<StatusChangedEvent>;
sync: NextFn<SyncStatusChangedEvent>;
broadcast: (topic: string, payload: any, error?: ErrorFn) => void;
all: NextFn<TransactionEvent<P>>;
};
export type DocEventTopic = keyof DocEventCallbackMap<never>;
Expand Down Expand Up @@ -818,6 +831,15 @@ export class Document<T, P extends Indexable = Indexable> {
error?: ErrorFn,
complete?: CompleteFn,
): Unsubscribe;
/**
* `subscribe` registers a callback to subscribe to events on the document.
* The callback will be called when the document is changed.
*/
public subscribe(
type: 'broadcast',
next: DocEventCallbackMap<P>['broadcast'],
error?: ErrorFn,
): Unsubscribe;
/**
* `subscribe` registers a callback to subscribe to events on the document.
*/
Expand Down Expand Up @@ -966,6 +988,21 @@ export class Document<T, P extends Indexable = Indexable> {
arg4,
);
}
if (arg1 === 'broadcast') {
const callback = arg2 as DocEventCallbackMap<P>['broadcast'];
return this.eventStream.subscribe((event) => {
for (const docEvent of event) {
if (docEvent.type !== DocEventType.Broadcast) {
continue;
}
callback(
docEvent.value.topic,
docEvent.value.payload,
docEvent.error,
);
}
}, arg3);
}
if (arg1 === 'all') {
const callback = arg2 as DocEventCallbackMap<P>['all'];
return this.eventStream.subscribe(callback, arg3, arg4);
Expand Down Expand Up @@ -1024,6 +1061,7 @@ export class Document<T, P extends Indexable = Indexable> {
complete,
);
}

throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`);
}

Expand Down Expand Up @@ -1468,7 +1506,8 @@ export class Document<T, P extends Indexable = Indexable> {

if (resp.body.case === 'event') {
const { type, publisher } = resp.body.value;
const event: Array<WatchedEvent<P> | UnwatchedEvent<P>> = [];
const event: Array<WatchedEvent<P> | UnwatchedEvent<P> | BroadcastEvent> =
[];
if (type === PbDocEventType.DOCUMENT_WATCHED) {
this.addOnlineClient(publisher);
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
Expand All @@ -1495,6 +1534,16 @@ export class Document<T, P extends Indexable = Indexable> {
value: { clientID: publisher, presence },
});
}
} else if (type === PbDocEventType.DOCUMENT_BROADCAST) {
if (resp.body.value.body) {
const { topic, payload } = resp.body.value.body;
const decoder = new TextDecoder();

event.push({
type: DocEventType.Broadcast,
value: { topic, payload: JSON.parse(decoder.decode(payload)) },
});
}
}

if (event.length > 0) {
Expand Down Expand Up @@ -1970,4 +2019,17 @@ export class Document<T, P extends Indexable = Indexable> {
public getRedoStackForTest(): Array<Array<HistoryOperation<P>>> {
return this.internalHistory.getRedoStackForTest();
}

/**
* `broadcast` the payload to the given topic.
*/
public broadcast(topic: string, payload: any, error?: ErrorFn) {
const broadcastEvent: BroadcastEvent = {
type: DocEventType.Broadcast,
value: { topic, payload },
error,
};

this.publish([broadcastEvent]);
}
}
31 changes: 31 additions & 0 deletions packages/sdk/src/util/validator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* `validateSerializable` returns whether the given value is serializable or not.
*/
export const validateSerializable = (value: any): boolean => {
try {
const serialized = JSON.stringify(value);

if (serialized === undefined) {
return false;
}
} catch (error) {
return false;
}
return true;
};
Loading
Loading