From bcf965f0b55ea949d3fedbbb4393a0966b574efd Mon Sep 17 00:00:00 2001 From: zak Date: Mon, 19 Feb 2024 12:03:55 +0000 Subject: [PATCH 1/9] Add channel groups Add a client side channel group that listens for active channels and subscribes/unsubscribes as the set of active changes. Add a consumer group based on presence set Add typing information for ChannelGroups Fix tests for v2 ChannelGroups is a default realtime module types: add channel group typings consumergroup: use modulo-based hashing scheme The `hashring` package is node-only as it depends on the native `crypto` package. Replaced with a simple modulo hash scheme for now. Fixes the case where the channel is already attached and the channel is obtained with new rewind options that require a re-attach. Updates the consumer group partitioning test to more robustly assert that channels are partitioned across consumers. channelgroup: make get sync, subscribe async This matches the pattern used by Channels, which is sync to obtain the channel and async on subscribe in order to await attachment. In the channel group case, awaiting the subscribe awaits the joining of the consumer group. consumergroup: make consumerId required field consumergroup: make hashring a required field format: apply prettier formatting rules jsdoc: add consumer group docs channelgroups: add active channel name option channelgroup: add explicit join method Exposes the join method on the channel group, which is analogous to attach on the channel. Uses this in tests for more robust assertions. Additionally avoids re-attaching to the consumer group channel if already attached, and obtains presence membership synchronously in the join. consumergroup: use subscribe over on The `on` method was not reliable across clients, despite being documented in https://ably.com/docs/presence-occupancy/presence?lang=java#synced, so use subscribe instead. channelgroup: fix assigned channel processing We need to keep the total set of active channels around as updating the assigned channel set when the membership changes requires computing the new assignments from the complete channel set, not the previously set of assigned channels. consumergroup: include consumerId in logs consumergroups: add test for consumer group resize test: remove explicit join from test lint: apply formatting and cleanup test: replace var with let test: remove unnecessary outer try-catch test: add prefix to channels Avoid channel name collisions causing tests to fail from concurrent test runs in CI. channelgroup: detach from channel on un-assignment channelgroup: add unsubscribe listener method test: fix rebalance test waits for consumers channelgroup: add leave method test: remove dangling console logs test: test consumer group scale down event test: prefix consumer group channel Similar to the active channel, we need to avoid conflicts. consumergroup: fix current member tracking We store the current active set of members in the hashring. test: fix done condition w/ at-least-once delivery Messages can be delivered more than once during a consumer group rescaling event, so deduplicate the results when checking the end condition. channelgroups: use Utils.inspectError in logs channelgroups: do not share channel object The Channels object used by the ChannelGroup for internal channel operations no longer shares the same object exposed on the client via the .channels() method. This is to ensure that independent usage of an individual channel that happens to be included in a channel group is not impacted by its usage in the channel group. test: tidy up leave test Now that we can correctly handle a channel group and channel being used independently from the same client, this tidies up the leave test to remove the additional client previously needed. test: rename waitForConsumers for clarity test: rename waitForConsumers for clarity channelgroups: do not share channel object The Channels object used by the ChannelGroup for internal channel operations no longer shares the same object exposed on the client via the .channels() method. This is to ensure that independent usage of an individual channel that happens to be included in a channel group is not impacted by its usage in the channel group. channelgroups: add module integration modules: update ChannelGroups module definitions channelgroup: add temp rewind channel group option channelgroup: unsubscribe channel after timeout In order to avoid keeping the channel alive, we add a configurable timeout after which the channel will be unsubscribed if no messages are received. This is to avoid keeping the channel active. This can lead to missed messages if the a message is published after the client unsubscribes and before the channel becomes inactive. This is an acceptable edge case for the client-side simulation, especially with the default 1h timeout. deps: remove unused hashring types pkg utils: remove arrIndexOf polyfill consumergroup: rename hashring to locator test: use async style tests for channel groups Replaces the use of the `done()` callback with an async function style test. This allows us to await channel publish results and more easily handle race conditions in tests. channelgroup: use qualifier options Previously we relied on a new BaseRealtime instance with it's own Channels object to separate usage of channels in the ChannelGroup from independent external usage of those channels from the regular client.channels.get() method. This led to various problems with shared Auth state such as nonces in token requests which caused connections to terminate and tests to fail. A simpler solution is to avoid creating a new client instance and instead share the Channel pool, but force the library to treat channels used from the ChannelGroup independently (with their own attachment) by setting dummy options in the qualifier, which is used as the key in the channel map. This implementation does not support channels in the channel group which already have a qualifier. This is acceptable for the experimental client-side simulation of the feature. --- .gitignore | 2 + ably.d.ts | 115 +++- modular.d.ts | 29 + package-lock.json | 4 +- scripts/moduleReport.ts | 1 + src/common/lib/client/baserealtime.ts | 351 +++++++++++- src/common/lib/client/defaultrealtime.ts | 3 +- src/common/lib/client/modularplugins.ts | 2 + src/common/lib/util/locator.ts | 44 ++ src/common/lib/util/utils.ts | 27 + src/platform/web/modular.ts | 1 + test/common/modules/shared_helper.js | 14 + test/realtime/channelgroup.test.js | 701 +++++++++++++++++++++++ 13 files changed, 1288 insertions(+), 6 deletions(-) create mode 100644 src/common/lib/util/locator.ts create mode 100644 test/realtime/channelgroup.test.js diff --git a/.gitignore b/.gitignore index 072dde41f2..561e218cb8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ ably-js.iml node_modules npm-debug.log .tool-versions +*.swp +*.swo build/ react/ typedoc/generated/ diff --git a/ably.d.ts b/ably.d.ts index fdf42676ca..d670ac978b 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -859,6 +859,56 @@ export interface ChannelOptions { modes?: ChannelMode[]; } +/** + * Describes the consumer group. Consumers in the same group will partition the channels of that group. + */ +interface ConsumerGroupOptions { + /** + * The name of the consumer group. + * Channels will be partitioned across consumers in the same group identified by this name. + */ + name: string; +} + +/** + * Allows specifying properties of a {@link ChannelGroup} + */ +interface ChannelGroupOptions { + /** + * Options for a consumer group used to partition the channels in the channel group across consumers in the consumer group. + */ + consumerGroup?: ConsumerGroupOptions; + /** + * The name of the channel that receives the set of active channels in the group. + * For correct behaviour, this channel should have persist-last enabled. + * Note that this is a temporary option only required for the client-side simulation of channel groups. + * + * @defaultValue $ably:active + */ + activeChannel?: string; + /** + * The rewind interval to use when attaching to the matched channels in the group. + * This faciltates at-least-once delivery in the event of a consumer group scaling event. + * Note that this is a temporary option only required for the client-side simulation of channel groups. + * + * @defaultValue 5s + */ + rewind?: string; + + /** + * The time for which the client will remain subscribed to a given channel after the last message is received. + * In the client-side simulation of channel groups, subscribing to the channel has the effect of keeping + * the channel active for the duration of the subscription. This timeout is used to determine how long to remain + * subscribed to a channel while no messages are received on it. It is possible that a message is sent after the + * client unsubscribes and before the channel becomes inactive, which means the client may not re-subscribe to the + * channel and could miss a message. + * Note that this is a temporary option only required for the client-side simulation of channel groups. + * + * @defaultValue 60 * 60 * 1000 (1 hour) + */ + subscriptionTimeout?: number; +} + /** * Passes additional properties to a {@link RealtimeChannel} name to produce a new derived channel */ @@ -1426,6 +1476,13 @@ export interface TokenRevocationFailureResult { * @param message - The message which triggered the callback. */ export type messageCallback = (message: T) => void; +/** + * A callback which returns a channel and message argument, used for {@link ChannelGroup} subscriptions. + * + * @param channel - The channel name on which the message was received. + * @param message - The message which triggered the callback. + */ +export type channelAndMessageCallback = (channel: string, message: T) => void; /** * The callback used for the events emitted by {@link RealtimeChannel}. * @@ -1958,6 +2015,41 @@ export declare interface Channel { status(): Promise; } +/** + * This is a preview feature and may change in a future non-major release. + * + * Enables messages to be subscribed to on a group of channels. + */ +export declare interface ChannelGroup { + /** + * Registers a listener for messages on this channel group. The caller supplies a listener function, which is called each time one or more messages arrives on the channels in the group. + * + * @param callback - An event listener function. + */ + subscribe(callback: channelAndMessageCallback): Promise; + + /** + * Deregisters a listener for messages on this channel group. + * + * @param callback - An event listener function. + */ + unsubscribe(callback: channelAndMessageCallback): void; + + /** + * Joins the consumer group if one was created for this channel group, + * and returns a promise that is resolved when the channel group is attached + * to the active channel. + */ + join(): Promise; + + /** + * Leaves the consumer group if one was created for this channel group, + * and returns a promise that is resolved when the channel group is detached + * from the active channel. + */ + leave(): Promise; +} + /** * Enables messages to be published and subscribed to. Also enables historic messages to be retrieved and provides access to the {@link RealtimePresence} object of a channel. */ @@ -2168,7 +2260,8 @@ export declare interface Channels { */ get(name: string, channelOptions?: ChannelOptions): T; /** - * @experimental This is a preview feature and may change in a future non-major release. + * This is a preview feature and may change in a future non-major release. + * * This experimental method allows you to create custom realtime data feeds by selectively subscribing * to receive only part of the data from the channel. * See the [announcement post](https://pages.ably.com/subscription-filters-preview) for more information. @@ -2190,6 +2283,22 @@ export declare interface Channels { release(name: string): void; } +/** + * This is a preview feature and may change in a future non-major release. + * + * Creates and destroys {@link ChannelGroup} objects. + */ +export declare interface ChannelGroups { + /** + * Creates a new {@link ChannelGroup} object, with the specified {@link ChannelGroupOptions}, or returns the existing channel group object. + * + * @param filter - A regular expression defining the channel group. Only wildcards are supported. + * @param options - A {@link ChannelGroupOptions} object. + * @returns A {@link ChannelGroup} object. + */ + get(filter: string, options?: ChannelGroupOptions): ChannelGroup; +} + /** * Contains an individual message that is sent to, or received from, Ably. */ @@ -2725,6 +2834,10 @@ export declare class Realtime implements RealtimeClient { connect(): void; auth: Auth; channels: Channels; + /** + * This is a preview feature and may change in a future non-major release. + */ + channelGroups: ChannelGroups; connection: Connection; request( method: string, diff --git a/modular.d.ts b/modular.d.ts index 1e03425c8a..936d192a2e 100644 --- a/modular.d.ts +++ b/modular.d.ts @@ -32,6 +32,7 @@ import { RealtimeClient, Auth, Channels, + ChannelGroups as ChannelGroupsImpl, Channel, HttpPaginatedResponse, StatsParams, @@ -131,6 +132,25 @@ export declare const MsgPack: unknown; */ export declare const RealtimePresence: unknown; +/** + * The module is experimental and the API is subject to change. + * + * Provides a {@link BaseRealtime} instance with the ability to interact with the experiemental channel groups feature. + * + * To create a client that includes this module, include it in the `ModulesMap` that you pass to the {@link BaseRealtime.constructor}: + * + * ```javascript + * import { BaseRealtime, WebSocketTransport, FetchRequest, RealtimePresence, ChannelGroups } from 'ably/modules'; + * const realtime = new BaseRealtime(options, { WebSocketTransport, FetchRequest, RealtimePresence, ChannelGroups }); + * ``` + * + * If you do not provide this module, then attempting to use the functionality of channel groups will cause a runtime error. + * + * Note that in the experimental client-side simulation of ChannelGroups, you must provide the RealtimePresence module + * which is required for internal coordination among consumers. + */ +export declare const ChannelGroups: unknown; + /** * Provides a {@link BaseRealtime} instance with the ability to establish a connection with the Ably realtime service using a [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) connection. * @@ -240,6 +260,11 @@ export interface ModularPlugins { */ RealtimePresence?: typeof RealtimePresence; + /** + * See {@link ChannelGroups | documentation for the `ChannelGroups` module}. + */ + ChannelGroups?: typeof ChannelGroups; + /** * See {@link WebSocketTransport | documentation for the `WebSocketTransport` plugin}. */ @@ -353,6 +378,10 @@ export declare class BaseRealtime implements RealtimeClient { connect(): void; auth: Auth; channels: Channels; + /** + * This is a preview feature and may change in a future non-major release. + */ + channelGroups: ChannelGroupsImpl; connection: Connection; request( method: string, diff --git a/package-lock.json b/package-lock.json index cabe3808df..73165dbcb6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2946,7 +2946,7 @@ "node_modules/concat-map": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", - "integrity": "sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==", + "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=", "dev": true }, "node_modules/content-disposition": { @@ -13007,7 +13007,7 @@ "concat-map": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", - "integrity": "sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==", + "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=", "dev": true }, "content-disposition": { diff --git a/scripts/moduleReport.ts b/scripts/moduleReport.ts index 2d72004951..cbcd5e96b9 100644 --- a/scripts/moduleReport.ts +++ b/scripts/moduleReport.ts @@ -22,6 +22,7 @@ const pluginNames = [ 'XHRRequest', 'FetchRequest', 'MessageInteractions', + 'ChannelGroups', ]; // List of all free-standing functions exported by the library along with the diff --git a/src/common/lib/client/baserealtime.ts b/src/common/lib/client/baserealtime.ts index 359ad7bc6f..38ab54dbbf 100644 --- a/src/common/lib/client/baserealtime.ts +++ b/src/common/lib/client/baserealtime.ts @@ -13,6 +13,8 @@ import { ModularPlugins, RealtimePresencePlugin } from './modularplugins'; import { TransportNames } from 'common/constants/TransportName'; import { TransportImplementations } from 'common/platform'; import Defaults from '../util/defaults'; +import Locator from '../util/locator'; +import ChannelStateChange from './channelstatechange'; /** `BaseRealtime` is an export of the tree-shakable version of the SDK, and acts as the base class for the `DefaultRealtime` class exported by the non tree-shakable version. @@ -21,8 +23,9 @@ class BaseRealtime extends BaseClient { readonly _RealtimePresence: RealtimePresencePlugin | null; // Extra transport implementations available to this client, in addition to those in Platform.Transports.bundledImplementations readonly _additionalTransportImplementations: TransportImplementations; - _channels: any; + _channels: Channels; connection: Connection; + readonly _channelGroups: ChannelGroups | null; constructor(options: ClientOptions | string) { super(Defaults.objectifyOptions(options)); @@ -31,6 +34,11 @@ class BaseRealtime extends BaseClient { this._RealtimePresence = this.options.plugins?.RealtimePresence ?? null; this.connection = new Connection(this, this.options); this._channels = new Channels(this); + if (this.options.plugins?.ChannelGroups) { + this._channelGroups = new this.options.plugins.ChannelGroups(this._channels); + } else { + this._channelGroups = null; + } if (this.options.autoConnect !== false) this.connect(); } @@ -51,7 +59,14 @@ class BaseRealtime extends BaseClient { } get channels() { - return this._channels; + return this._channels as any; + } + + get channelGroups() { + if (!this._channelGroups) { + Utils.throwMissingPluginError('ChannelGroups'); + } + return this._channelGroups; } connect(): void { @@ -65,6 +80,337 @@ class BaseRealtime extends BaseClient { } } +class ChannelGroups { + groups: Record = {}; + + constructor(readonly channels: Channels) {} + + get(filter: string, options?: API.ChannelGroupOptions): ChannelGroup { + let group = this.groups[filter]; + if (group) { + return group; + } + this.groups[filter] = new ChannelGroup(this.channels, filter, options); + return this.groups[filter]; + } +} + +class ConsumerGroup extends EventEmitter { + consumerId: string; + + private channel?: RealtimeChannel; + private locator: Locator; + private computeMembershipListener: () => Promise; + + constructor(readonly channels: Channels, readonly consumerGroupName?: string) { + super(); + // The client ID can in fact be set on receipt of CONNECTED event from realtime, + // but for these purposes we must rely on the client ID being explicitly provided + // by the user as we do not have a mechanism to listen for or handle changes. + // It must be provided when the consumer group is instantiated due to the use of a new + // base realtime client instance for the channel group. + if (!this.channels.realtime.options.clientId || this.channels.realtime.options.clientId === '*') { + throw new ErrorInfo('clientId must be explicitly specified to use consumer groups', 40000, 400); + } + this.consumerId = this.channels.realtime.options.clientId; + this.locator = new Locator([this.consumerId]); + this.computeMembershipListener = this.computeMembership.bind(this); // we need the exact reference to unsubscribe + } + + async join() { + if (!this.consumerGroupName) { + // If no name is specified, then don't enforce the consumer group + // this is the equivalent of a no-op group where every channel + // is considered to be assigned to this client. + return; + } + + try { + Logger.logAction( + Logger.LOG_MAJOR, + 'ConsumerGroup.join()', + 'joining consumer group ' + this.consumerGroupName + ' as ' + this.consumerId, + ); + if (this.channel) { + await this.computeMembership(); + return; + } + this.channel = this.channels.get(this.consumerGroupName); + await this.channel.attach(); + await this.channel.presence.enter(null); + await this.computeMembership(); + this.channel.presence.subscribe(this.computeMembershipListener); + } catch (err) { + Logger.logAction( + Logger.LOG_ERROR, + 'ConsumerGroup.join()', + 'failed to enter presence set on consumer group channel; err = ' + Utils.inspectError(err), + ); + throw err; + } + } + + async leave() { + if (!this.consumerGroupName) { + return; + } + try { + Logger.logAction( + Logger.LOG_MAJOR, + 'ConsumerGroup.leave()', + 'leaving consumer group ' + this.consumerGroupName + ' as ' + this.consumerId, + ); + if (!this.channel) { + Logger.logAction(Logger.LOG_ERROR, 'ConsumerGroup.leave()', 'leave called with no channel initialised'); + return; + } + this.channel.presence.unsubscribe(this.computeMembershipListener); + await this.channel.presence.leave(null); + await this.channel.detach(); + this.channel = undefined; + } catch (err) { + Logger.logAction( + Logger.LOG_ERROR, + 'ConsumerGroup.leave()', + 'failed to leave presence set on consumer group channel; err = ' + Utils.inspectError(err), + ); + } + } + + async computeMembership() { + if (!this.channel) { + Logger.logAction( + Logger.LOG_ERROR, + 'ConsumerGroup.computeMembership()', + 'compute membership called with no channel initialised', + ); + return; + } + try { + const result = await this.channel.presence.get({ waitForSync: true }); + + const memberIds = result?.filter((member) => member.clientId).map((member) => member.clientId!) || []; + const { add, remove } = diffSets(this.locator.getNodes(), memberIds); + + Logger.logAction( + Logger.LOG_DEBUG, + 'ConsumerGroup.computeMembership()', + 'computed member diffs add=' + add + ' remove=' + remove + ' consumerId=' + this.consumerId, + ); + + add.forEach((member) => { + this.locator.add(member); + }); + + remove.forEach((member) => { + this.locator.remove(member); + }); + + this.emit('membership'); + Logger.logAction( + Logger.LOG_MAJOR, + 'ConsumerGroup.computeMembership()', + 'membership computed, consumerId=' + this.consumerId + ' state=' + this.locator.getNodes(), + ); + } catch (err) { + Logger.logAction( + Logger.LOG_ERROR, + 'ConsumerGroup.computeMembership()', + 'failed to get presence set on consumer group channel; err = ' + Utils.inspectError(err), + ); + } + } + + assigned(channel: string): boolean { + if (!this.consumerGroupName) { + // Consumer group is not enabled, every channel + // is considered assigned to this client + return true; + } + return this.consumerId === this.locator.get(channel); + } +} + +class ChannelGroup { + activeChannels: string[] = []; + assignedChannels: string[] = []; + active: RealtimeChannel; + subscriptions: EventEmitter; + subscribedChannels: Record = {}; + channelTimers: Record = {}; + expression: RegExp; + consumerGroup: ConsumerGroup; + + constructor(readonly channels: Channels, readonly filter: string, readonly options?: API.ChannelGroupOptions) { + this.subscriptions = new EventEmitter(); + this.active = channels.get(this.safeChannelName(options?.activeChannel || '$ably:active')); + this.consumerGroup = new ConsumerGroup(channels, options?.consumerGroup?.name); + this.consumerGroup.on('membership', () => this.updateAssignedChannels()); + this.expression = new RegExp(filter); // eslint-disable-line security/detect-non-literal-regexp + } + + // Add dummy options to the channel name so that it is treated as an independent channel + // in the channel pool. This avoids any conflicts with external, independent use of individual + // channels that happen to also be included in a channel group. + private safeChannelName(name: string) { + // base64 encode to ensure only allowed characters are used in the qualifier + return `[?x-ably-channelgroup=${Utils.toBase64(this.filter)}]${name}`; + } + + async join() { + await this.consumerGroup.join(); + await this.active.setOptions({ params: { rewind: '1' } }); + await this.active.subscribe((msg: any) => { + this.activeChannels = msg.data.active; + this.updateAssignedChannels(); + }); + } + + async leave() { + this.active.unsubscribe(); + await this.active.detach(); + await this.consumerGroup.leave(); + this.assignedChannels = []; + this.removeSubscriptions(Object.keys(this.subscribedChannels)); + } + + private updateAssignedChannels() { + Logger.logAction( + Logger.LOG_DEBUG, + 'ChannelGroups.updateAssignedChannels', + 'activeChannels=' + + this.activeChannels + + ' assignedChannels=' + + this.assignedChannels + + ' consumerId=' + + this.consumerGroup.consumerId, + ); + + const matched = this.activeChannels + .filter((x) => this.expression.test(x)) + .filter((x) => this.consumerGroup.assigned(x)); + + const { add, remove } = diffSets(this.assignedChannels, matched); + this.assignedChannels = matched; + + Logger.logAction( + Logger.LOG_MAJOR, + 'ChannelGroups.updateAssignedChannels', + 'computed channel diffs: add=' + add + ' remove=' + remove + ' consumerId=' + this.consumerGroup.consumerId, + ); + + this.removeSubscriptions(remove); + this.addSubscriptions(add); + Logger.logAction( + Logger.LOG_MAJOR, + 'ChannelGroups.updateAssignedChannels', + 'assignedChannels=' + this.assignedChannels + ' consumerId=' + this.consumerGroup.consumerId, + ); + } + + private unsubscribeTimeout(channel: string) { + const timeout = this.options?.subscriptionTimeout || 60 * 60 * 1000; + Logger.logAction( + Logger.LOG_MAJOR, + 'ChannelGroups.addSubscriptions()', + 'subscription timeout started on channel ' + channel + ' timeout = ' + timeout, + ); + return setTimeout(() => { + Logger.logAction( + Logger.LOG_MAJOR, + 'ChannelGroups.addSubscriptions()', + 'subscription timeout expired on channel ' + channel, + ); + this.removeSubscriptions([channel]); + }, timeout); + } + + private async subscribeChannel(channel: string) { + try { + Logger.logAction( + Logger.LOG_MAJOR, + 'ChannelGroups.subscribeChannel()', + 'setting up subscription to channel ' + channel, + ); + this.subscribedChannels[channel] = this.channels.get(this.safeChannelName(channel)); + this.subscribedChannels[channel].on(['detached', 'failed'], (event: ChannelStateChange) => { + Logger.logAction( + Logger.LOG_MAJOR, + 'ChannelGroups.subscribeChannel()', + 'clearing subscribe timeout; event = ' + event.current, + ); + clearTimeout(this.channelTimers[channel]); + delete this.channelTimers[channel]; + }); + this.channelTimers[channel] = this.unsubscribeTimeout(channel); + await this.subscribedChannels[channel].setOptions({ params: { rewind: this.options?.rewind || '5s' } }); + await this.subscribedChannels[channel].attach(); + await this.subscribedChannels[channel].subscribe((msg: any) => { + clearTimeout(this.channelTimers[channel]); + this.channelTimers[channel] = this.unsubscribeTimeout(channel); + this.subscriptions.emit('message', channel, msg); + }); + } catch (err) { + Logger.logAction( + Logger.LOG_ERROR, + 'ChannelGroups.subscribeChannel()', + 'setup channel subscription failed on channel ' + channel + '; err = ' + Utils.inspectError(err), + ); + } + } + + private async unsubscribeChannel(channel: string) { + try { + Logger.logAction(Logger.LOG_MAJOR, 'ChannelGroups.unsubscribeChannel()', 'unsubscribing from channel ' + channel); + clearTimeout(this.channelTimers[channel]); + delete this.channelTimers[channel]; + this.subscribedChannels[channel].unsubscribe(); + await this.subscribedChannels[channel].detach(); + delete this.subscribedChannels[channel]; + } catch (err) { + Logger.logAction( + Logger.LOG_ERROR, + 'ChannelGroups.unsubscribeChannel()', + 'failed to unsubscribe from channel ' + channel + '; err = ' + Utils.inspectError(err), + ); + } + } + + private addSubscriptions(channels: string[]) { + channels.forEach((channel) => { + if (this.subscribedChannels[channel]) { + return; + } + this.subscribeChannel(channel); + }); + } + + private removeSubscriptions(channels: string[]) { + channels.forEach((channel) => { + if (!this.subscribedChannels[channel]) { + return; + } + this.unsubscribeChannel(channel); + }); + } + + async subscribe(cb: (channel: string, msg: any) => void): Promise { + await this.join(); + this.subscriptions.on('message', cb); + } + + unsubscribe(cb: (channel: string, msg: any) => void): void { + this.subscriptions.off('message', cb); + } +} + +function diffSets(current: string[], updated: string[]): { add: string[]; remove: string[] } { + const remove = current.filter((x) => !updated.includes(x)); + const add = updated.filter((x) => !current.includes(x)); + + return { add, remove }; +} + class Channels extends EventEmitter { realtime: BaseRealtime; all: Record; @@ -202,3 +548,4 @@ class Channels extends EventEmitter { } export default BaseRealtime; +export { ChannelGroups }; diff --git a/src/common/lib/client/defaultrealtime.ts b/src/common/lib/client/defaultrealtime.ts index d1fa3f4ce2..d78478eea1 100644 --- a/src/common/lib/client/defaultrealtime.ts +++ b/src/common/lib/client/defaultrealtime.ts @@ -1,4 +1,4 @@ -import BaseRealtime from './baserealtime'; +import BaseRealtime, { ChannelGroups } from './baserealtime'; import ClientOptions from '../../types/ClientOptions'; import { allCommonModularPlugins } from './modularplugins'; import * as Utils from '../util/utils'; @@ -40,6 +40,7 @@ export class DefaultRealtime extends BaseRealtime { }, WebSocketTransport: initialiseWebSocketTransport, MessageInteractions: FilteredSubscriptions, + ChannelGroups: ChannelGroups, }), ); } diff --git a/src/common/lib/client/modularplugins.ts b/src/common/lib/client/modularplugins.ts index d3189aa85a..7ae68b11d4 100644 --- a/src/common/lib/client/modularplugins.ts +++ b/src/common/lib/client/modularplugins.ts @@ -10,6 +10,7 @@ import { fromValues as presenceMessageFromValues, fromValuesArray as presenceMessagesFromValuesArray, } from '../types/presencemessage'; +import { ChannelGroups } from './baserealtime'; export interface PresenceMessagePlugin { presenceMessageFromValues: typeof presenceMessageFromValues; @@ -31,6 +32,7 @@ export interface ModularPlugins { XHRRequest?: typeof XHRRequest; FetchRequest?: typeof fetchRequest; MessageInteractions?: typeof FilteredSubscriptions; + ChannelGroups?: typeof ChannelGroups; } export const allCommonModularPlugins: ModularPlugins = { Rest }; diff --git a/src/common/lib/util/locator.ts b/src/common/lib/util/locator.ts new file mode 100644 index 0000000000..afa3c41c63 --- /dev/null +++ b/src/common/lib/util/locator.ts @@ -0,0 +1,44 @@ +// Locator currently implements a modulo-based hash partitioning scheme. +// It is used to distribute keys across a set of nodes. +// TODO(mschristensen): use consistent hashing instead of modulo-based hashing +// to avoid re-assigning all keys when nodes are added or removed. +export default class Locator { + private nodes: string[] = []; + + constructor(nodes?: string[]) { + if (nodes) { + this.nodes = nodes; + } + } + + add(node: string): void { + if (this.nodes.includes(node)) { + return; + } + this.nodes.push(node); + this.nodes.sort(); + } + + remove(node: string): void { + this.nodes = this.nodes.filter((n) => n !== node); + } + + get(key: string): string { + const hash = this.hash(key); + const index = hash % this.nodes.length; + return this.nodes[index]; + } + + hash(key: string): number { + let hash = 0; + for (let i = 0; i < key.length; i++) { + hash = (hash << 5) - hash + key.charCodeAt(i); + hash = hash >>> 0; // convert to 32 bit unsigned integer + } + return hash; + } + + getNodes(): string[] { + return this.nodes; + } +} diff --git a/src/common/lib/util/utils.ts b/src/common/lib/util/utils.ts index 1761ca03cb..78367d5ff5 100644 --- a/src/common/lib/util/utils.ts +++ b/src/common/lib/util/utils.ts @@ -174,6 +174,33 @@ export function arrSubtract(arr1: Array, arr2: Array): Array { return result; } +export function arrIndexOfBy( + arr: Array, + iteratee: (value: T) => unknown, + elem: unknown, + fromIndex: number = 0, +): number { + const len = arr.length; + for (; fromIndex < len; fromIndex++) { + if (iteratee(arr[fromIndex]) === elem) { + return fromIndex; + } + } + return -1; +} + +export function arrUnique(arr: Array): Array { + return arr.filter((value, index) => arr.indexOf(value) === index); +} + +export function arrUniqueBy(arr: Array, iteratee: (value: T) => unknown): Array { + return arr.filter((value, index) => arrIndexOfBy(arr, iteratee, iteratee(value)) === index); +} + +export function arrIn(arr: Array, val: unknown): boolean { + return arr.indexOf(val) !== -1; +} + export function arrDeleteValue(arr: Array, val: T): boolean { const idx = arr.indexOf(val); const res = idx != -1; diff --git a/src/platform/web/modular.ts b/src/platform/web/modular.ts index 19a9b513a5..1388a753a5 100644 --- a/src/platform/web/modular.ts +++ b/src/platform/web/modular.ts @@ -43,4 +43,5 @@ export * from './modular/transports'; export * from './modular/http'; export { Rest } from '../../common/lib/client/rest'; export { FilteredSubscriptions as MessageInteractions } from '../../common/lib/client/filteredsubscriptions'; +export { ChannelGroups } from '../../common/lib/client/baserealtime'; export { BaseRest, BaseRealtime, ErrorInfo }; diff --git a/test/common/modules/shared_helper.js b/test/common/modules/shared_helper.js index 833bbfa114..ba50449c90 100644 --- a/test/common/modules/shared_helper.js +++ b/test/common/modules/shared_helper.js @@ -44,6 +44,20 @@ define([ async function monitorConnectionAsync(action, realtime, states) { const monitoringResultPromise = new Promise((resolve, reject) => { + if (Object.prototype.toString.call(realtime) == '[object Array]') { + realtime = realtime.filter(function (rt) { + return rt !== undefined; + }); + Promise.race( + realtime.map( + (rt) => + new Promise((resolve, reject) => monitorConnection((err) => (err ? reject(err) : resolve()), rt, states)), + ), + ) + .then(resolve) + .catch(reject); + return; + } monitorConnection((err) => (err ? reject(err) : resolve()), realtime, states); }); const actionResultPromise = Promise.resolve(action()); diff --git a/test/realtime/channelgroup.test.js b/test/realtime/channelgroup.test.js new file mode 100644 index 0000000000..e07536632d --- /dev/null +++ b/test/realtime/channelgroup.test.js @@ -0,0 +1,701 @@ +'use strict'; + +class Deferred { + promise; + resolve; + reject; + constructor() { + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + }); + } +} + +class DeferredMany { + constructor(count) { + this.items = Array.from({ length: count }, () => new Deferred()); + } +} + +define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async, chai) { + let expect = chai.expect; + let closeAndFinishAsync = helper.closeAndFinishAsync; + let monitorConnectionAsync = helper.monitorConnectionAsync; + let utils = helper.Utils; + + describe('realtime/channelgroup', function () { + this.timeout(60 * 1000); + before(function (done) { + helper.setupApp(function (err) { + if (err) { + done(err); + } + done(); + }); + }); + + it('subscribes to active', async function () { + const realtime = helper.AblyRealtime({ clientId: 'testclient' }); + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + + // connect and attach + await new Promise((resolve) => realtime.connection.on('connected', resolve)); + + const channelGroup = realtime.channelGroups.get(`${prefix}.*`, { activeChannel: activeChannelName }); + const activeChannel = realtime.channels.get(activeChannelName); + const dataChannel1 = realtime.channels.get(`${prefix}:channel1`); + const dataChannel2 = realtime.channels.get(`${prefix}:channel2`); + + // subscribe to channel group and assert results + let events = 0; + const result = new Deferred(); + await channelGroup.subscribe(async (channel, msg) => { + events++; + try { + expect(msg.data).to.equal(`test data ${events}`, 'Unexpected msg text received'); + expect(channel).to.equal(`${prefix}:channel${events}`, 'Unexpected channel name'); + } catch (err) { + result.reject(err); + return; + } + + if (events === 1) { + await dataChannel2.publish('event0', 'test data 2'); + return; + } + result.resolve(); + }); + + // publish active channels + await activeChannel.attach(); + await activeChannel.publish('event0', { + active: [`${prefix}:channel1`, `${prefix}:channel2`], + }); + + // publish first message + await dataChannel1.publish('event0', 'test data 1'); + + return result.promise; + } + + try { + await monitorConnectionAsync(test, realtime); + await closeAndFinishAsync(realtime); + } catch (err) { + await closeAndFinishAsync(realtime, err); + return; + } + }); + + it('unsubscribes a listener', async function () { + const realtime = helper.AblyRealtime({ clientId: 'testclient' }); + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + + await new Promise((resolve) => realtime.connection.on('connected', resolve)); + + const channelGroup = realtime.channelGroups.get(`${prefix}.*`, { activeChannel: activeChannelName }); + const activeChannel = realtime.channels.get(activeChannelName); + const channel = realtime.channels.get(`${prefix}:channel1`); + + // subscribe to channel group and assert results + let events = 0; + let messages = new DeferredMany(3); + let result = new Deferred(); + const listener1 = () => { + expect(events).to.be.lessThan(3, 'Unexpected number of messages received'); + }; + const listener2 = () => { + messages.items[events].resolve(); + events++; + if (events < 3) { + return; + } + result.resolve(); + }; + await channelGroup.subscribe(listener1); + await channelGroup.subscribe(listener2); + + // publish active channels + await activeChannel.attach(); + await activeChannel.publish('event0', { + active: [`${prefix}:channel1`], + }); + // publish first message + await channel.publish('event0', 'test data'); + await channel.publish('event1', 'test data'); + await messages.items[0].promise; + await messages.items[1].promise; + + // unsubscribe the first listener and publish the last message to end the test + channelGroup.unsubscribe(listener1); + await channel.publish('event2', 'test data'); + await messages.items[2].promise; + + return result.promise; + } + + try { + await monitorConnectionAsync(test, realtime); + await closeAndFinishAsync(realtime); + } catch (err) { + await closeAndFinishAsync(realtime, err); + return; + } + }); + + it('leaves the channel group', async function () { + const realtime = helper.AblyRealtime({ clientId: 'testclient' }); + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + + await new Promise((resolve) => realtime.connection.on('connected', resolve)); + + const channelGroup = realtime.channelGroups.get(`${prefix}.*`, { activeChannel: activeChannelName }); + const activeChannel = realtime.channels.get(activeChannelName); + const channel = realtime.channels.get(`${prefix}:channel1`); + + // subscribe to channel group and assert results + let events = 0; + await channelGroup.subscribe(() => { + expect(events).to.be.lessThan(3, 'Unexpected number of messages received'); + }); + let result = new Deferred(); + await channel.subscribe(() => { + events++; + if (events < 3) { + return; + } + result.resolve(); + }); + + // publish active channels + await activeChannel.attach(); + await activeChannel.publish('event0', { + active: [`${prefix}:channel1`], + }); + + // publish first message + await channel.publish('event0', 'test data'); + await channel.publish('event1', 'test data'); + + // leave the first channel group and publish the last message to end the test + await channelGroup.leave(); + await channel.publish('event2', 'test data'); + + return result.promise; + } + + try { + await monitorConnectionAsync(test, realtime); + await closeAndFinishAsync(realtime); + } catch (err) { + await closeAndFinishAsync(realtime, err); + return; + } + }); + + it('ignores channels not matched on filter', async function () { + let realtime = helper.AblyRealtime({ clientId: 'testclient' }); + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + + await new Promise((resolve) => realtime.connection.on('connected', resolve)); + + const channelGroup = realtime.channelGroups.get(`${prefix}:include:.*`, { activeChannel: activeChannelName }); + + const activeChannel = realtime.channels.get(activeChannelName); + const dataChannel1 = realtime.channels.get(`${prefix}:include:channel1`); + const streamChannel3 = realtime.channels.get(`${prefix}:stream3`); + + // subscribe to channel group and assert results + let result = new Deferred(); + await channelGroup.subscribe((channel, msg) => { + try { + expect(msg.data).to.equal('test data 1', 'Unexpected msg text received'); + expect(channel).to.equal(`${prefix}:include:channel1`, 'Unexpected channel name'); + } catch (err) { + result.reject(err); + return; + } + result.resolve(); + }); + + // publish active channels + await activeChannel.attach(); + await activeChannel.publish('event0', { active: [`${prefix}:include:channel1`, `${prefix}:stream3`] }); + + // publish messages + await streamChannel3.publish('event0', 'should not be subscribed to this message'); + await dataChannel1.publish('event0', 'test data 1'); + + return result.promise; + } + + try { + await monitorConnectionAsync(test, realtime); + await closeAndFinishAsync(realtime); + } catch (err) { + await closeAndFinishAsync(realtime, err); + return; + } + }); + + it('reacts to changing active channels', async function () { + let realtime = helper.AblyRealtime({ clientId: 'testclient' }); + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + + await new Promise((resolve) => realtime.connection.on('connected', resolve)); + + const channelGroup = realtime.channelGroups.get(`${prefix}:group:.*`, { activeChannel: activeChannelName }); + + const activeChannel = realtime.channels.get(activeChannelName); + const dataChannel1 = realtime.channels.get(`${prefix}:group:channel1`); + const dataChannel2 = realtime.channels.get(`${prefix}:group:channel2`); + const dataChannel4 = realtime.channels.get(`${prefix}:group:channel4`); + const dataChannel5 = realtime.channels.get(`${prefix}:group:channel5`); + + // subscribe to channel group and assert results + let events = 1; + let result = new Deferred(); + await channelGroup.subscribe(async (channel, msg) => { + try { + expect(msg.data).to.equal(`test data ${events}`, 'Unexpected msg text received'); + expect(channel).to.equal(`${prefix}:group:channel${events}`, 'Unexpected channel name'); + } catch (err) { + result.reject(err); + return; + } + + if (events === 1) { + events = 4; + await activeChannel.publish('event0', { active: [`${prefix}:group:channel4`, `${prefix}:group:channel5`] }); + await dataChannel4.publish('event0', 'test data 4'); + return; + } + + if (events === 4) { + events++; + await dataChannel2.publish('event 0', 'should be ignored test data'); + await dataChannel5.publish('event0', 'test data 5'); + return; + } + + result.resolve(); + }); + + // publish active channels + await activeChannel.attach(); + await activeChannel.publish('event0', { + active: [`${prefix}:group:channel1`, `${prefix}:group:channel2`, `${prefix}:group:channel3`], + }); + + // publish first message + await dataChannel1.publish('event0', 'test data 1'); + + return result.promise; + } + + try { + await monitorConnectionAsync(test, realtime); + await closeAndFinishAsync(realtime); + } catch (err) { + await closeAndFinishAsync(realtime, err); + return; + } + }); + + // wait for n consumers to appear in the given channel's presence set + function waitForNConsumersInPresenceSet(channel, n) { + return new Promise(async (resolve, reject) => { + const interval = setInterval(async () => { + try { + const result = await channel.presence.get({ waitForSync: true }); + if (result.length === n) { + resolve(); + channel.presence.unsubscribe(); + clearInterval(interval); + } + } catch (err) { + reject(err); + channel.presence.unsubscribe(); + clearInterval(interval); + } + }, 100); + }); + } + + function uniqueResults(results) { + return utils.arrUniqueBy(results, (elem) => `${elem.channel}:${elem.name}`); + } + + function assertResults(results, channelPrefix, allowDuplicates) { + // expect each consumer to have received at least 1 message + for (let i = 0; i < results.length; i++) { + expect(results[i].length).to.be.greaterThan(0, `consumer ${i} received no messages`); + } + // sort first on channel, then on message name + let allResults = results.flat().sort((a, b) => { + if (a.channel < b.channel) return -1; + if (a.channel > b.channel) return 1; + if (a.name < b.name) return -1; + if (a.name > b.name) return 1; + return 0; + }); + // remove duplicates if allowed + if (allowDuplicates) { + allResults = uniqueResults(allResults); + } + // expect to have received 2 messages from each channel across all consumers + for (let i = 0; i < allResults.length; i += 2) { + const baseIndex = i / 2; + const data = `test data ${baseIndex}`; + const channel = `${channelPrefix}:${baseIndex}`; + + const first = allResults[i]; + const second = allResults[i + 1]; + + expect(first.channel).to.equal(channel); + expect(second.channel).to.equal(channel); + expect(first.name).to.equal('event0'); + expect(second.name).to.equal('event1'); + expect(first.data).to.equal(data); + expect(second.data).to.equal(data); + } + } + + it('partitions over consumer group', async function () { + let realtime1 = helper.AblyRealtime({ clientId: 'testclient' }); + let realtime2 = helper.AblyRealtime({ clientId: 'consumer1' }); + let realtime3 = helper.AblyRealtime({ clientId: 'consumer2' }); + + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + const consumerGroupName = `${prefix}:testgroup`; + + await Promise.all([ + new Promise((resolve) => realtime1.connection.on('connected', resolve)), + new Promise((resolve) => realtime2.connection.on('connected', resolve)), + new Promise((resolve) => realtime3.connection.on('connected', resolve)), + ]); + + // create 2 consumers in one group + const consumers = [ + realtime2.channelGroups.get(`${prefix}:.*`, { + activeChannel: activeChannelName, + consumerGroup: { name: consumerGroupName }, + }), + realtime3.channelGroups.get(`${prefix}:.*`, { + activeChannel: activeChannelName, + consumerGroup: { name: consumerGroupName }, + }), + ]; + + // create 5 channels + const channels = []; + const channelNames = Array.from({ length: 5 }, (_, i) => `${prefix}:` + i); + for (const name of channelNames) { + const channel = realtime1.channels.get(name); + await channel.attach(); + channels.push(channel); + } + + // subscribe each consumer and collect results + let result = new Deferred(); + const messages = Array.from({ length: consumers.length }, () => []); + for (let i = 0; i < consumers.length; i++) { + await consumers[i].subscribe((channel, msg) => { + messages[i].push({ channel, name: msg.name, data: msg.data }); + if (messages.flat().length === 2 * channels.length) { + assertResults(messages, prefix, false); + result.resolve(); + } + }); + } + + // publish active channels + let activeChannel = realtime1.channels.get(activeChannelName); + await activeChannel.attach(); + await activeChannel.publish('event0', { active: channelNames }); + + // wait for all consumers to appear in the group + await waitForNConsumersInPresenceSet(realtime1.channels.get(consumerGroupName), consumers.length); + + // send 2 messages to each channel + for (let i = 0; i < channels.length; i++) { + await channels[i].publish('event0', `test data ${i}`); + await channels[i].publish('event1', `test data ${i}`); + } + + return result.promise; + } + + try { + await monitorConnectionAsync(test, [realtime1, realtime2, realtime3]); + await closeAndFinishAsync([realtime1, realtime2, realtime3]); + } catch (err) { + await closeAndFinishAsync([realtime1, realtime2, realtime3], err); + return; + } + }); + + it('dynamically rebalances the consumer group', async function () { + let realtime1 = helper.AblyRealtime({ clientId: 'testclient' }); + let realtime2 = helper.AblyRealtime({ clientId: 'consumer1' }); + let realtime3 = helper.AblyRealtime({ clientId: 'consumer2' }); + const deliveryTimeout = 3000; + + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + const consumerGroupName = `${prefix}:testgroup`; + + await Promise.all([ + new Promise((resolve) => realtime1.connection.on('connected', resolve)), + new Promise((resolve) => realtime2.connection.on('connected', resolve)), + new Promise((resolve) => realtime3.connection.on('connected', resolve)), + ]); + + // create 2 consumers in one group + const consumer1 = realtime2.channelGroups.get(`${prefix}:.*`, { + activeChannel: activeChannelName, + consumerGroup: { name: consumerGroupName }, + }); + const consumer2 = realtime3.channelGroups.get(`${prefix}:.*`, { + activeChannel: activeChannelName, + consumerGroup: { name: consumerGroupName }, + }); + + // create 10 channels + const channels = []; + const channelNames = Array.from({ length: 10 }, (_, i) => `${prefix}:` + i); + for (const name of channelNames) { + const channel = realtime1.channels.get(name); + await channel.attach(); + channels.push(channel); + } + + // subscribe the first consumer and collect results + const consumer1Results = []; + let hasLeft = false; + await consumer1.subscribe((channel, msg) => { + expect(hasLeft).to.be.false; + consumer1Results.push({ channel, name: msg.name, data: msg.data }); + }); + + // publish active channels + let activeChannel = realtime1.channels.get(activeChannelName); + await activeChannel.attach(); + await activeChannel.publish('event0', { active: channelNames }); + + // wait for first consumer to appear in the group + await waitForNConsumersInPresenceSet(realtime1.channels.get(consumerGroupName), 1); + + // send 2 messages to the first third of the channels + for (let i = 0; i < Math.floor(channels.length / 3); i++) { + await channels[i].publish('event0', `test data ${i}`); + await channels[i].publish('event1', `test data ${i}`); + } + + // wait for the messages to be delivered + await new Promise((resolve) => setTimeout(resolve, deliveryTimeout)); + + // subscribe the second consumer and collect results + let consumer2Results = []; + await consumer2.subscribe((channel, msg) => { + consumer2Results.push({ channel, name: msg.name, data: msg.data }); + }); + + // wait for second consumer to appear in the group + await waitForNConsumersInPresenceSet(realtime1.channels.get(consumerGroupName), 2); + + // send 2 messages to the second third of the channels + for (let i = Math.floor(channels.length / 3); i < 2 * Math.floor(channels.length / 3); i++) { + await channels[i].publish('event0', `test data ${i}`); + await channels[i].publish('event1', `test data ${i}`); + } + + // wait for the messages to be delivered + await new Promise((resolve) => setTimeout(resolve, deliveryTimeout)); + + // the first consumer leaves the group, remaining messages should be received by the second consumer + await consumer1.leave(); + hasLeft = true; + await waitForNConsumersInPresenceSet(realtime1.channels.get(consumerGroupName), 1); + + // send 2 messages to the final third of the channels + for (let i = 2 * Math.floor(channels.length / 3); i < channels.length; i++) { + await channels[i].publish('event0', `test data ${i}`); + await channels[i].publish('event1', `test data ${i}`); + } + + // wait for the messages to be delivered + await new Promise((resolve) => setTimeout(resolve, deliveryTimeout)); + + assertResults([consumer1Results, consumer2Results], prefix, true); + } + + try { + await monitorConnectionAsync(test, [realtime1, realtime2, realtime3]); + await closeAndFinishAsync([realtime1, realtime2, realtime3]); + } catch (err) { + await closeAndFinishAsync([realtime1, realtime2, realtime3], err); + return; + } + }); + + it('does not conflict with regular use of channel', async function () { + let realtime1 = helper.AblyRealtime({ clientId: 'testclient' }); + let realtime2 = helper.AblyRealtime({ clientId: 'consumer' }); + + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + const consumerGroupName = `${prefix}:testgroup`; + + await Promise.all([ + new Promise((resolve) => realtime1.connection.on('connected', resolve)), + new Promise((resolve) => realtime2.connection.on('connected', resolve)), + ]); + + // create a group with a single consumer + const consumer = realtime2.channelGroups.get(`${prefix}:.*`, { + activeChannel: activeChannelName, + consumerGroup: { name: consumerGroupName }, + }); + + let events = 0; + const messages = new DeferredMany(2); + await consumer.subscribe((channel, msg) => { + messages.items[events].resolve(); + events++; + expect(msg.data).to.equal('test data', 'Unexpected msg text received'); + expect(channel).to.equal(`${prefix}:channel`, 'Unexpected channel name'); + }); + + // publish active channels + const channelName = `${prefix}:channel`; + let activeChannel = realtime1.channels.get(activeChannelName); + await activeChannel.attach(); + await activeChannel.publish('event0', { active: [channelName] }); + + // wait for the consumer to appear in the group + await waitForNConsumersInPresenceSet(realtime1.channels.get(consumerGroupName), 1); + + // send a message to the channel + // channel.publish('event0', 'test data'); + realtime1.channels.get(channelName).publish('event0', 'test data'); + + // wait for the consumer to receive the message + await messages.items[0].promise; + + // Create channel on same client as consumer group and rewind, + // as the attach will fail as options have changed if the channel group + // uses the same channel object exposed on the client. + // For example, if the BaseRealtime._channelGroups object was instantiated as: + // ``` + // this._channelGroups = new modules.ChannelGroups(this._channels) + // ``` + // We would see: + // ``` + // Error: Channels.get() cannot be used to set channel options that would cause the channel to reattach. Please, use RealtimeChannel.setOptions() instead. + // ``` + const channel = realtime2.channels.get(channelName, { params: { rewind: 5 } }); + + await channel.subscribe((msg) => { + messages.items[events].resolve(); + events++; + expect(msg.data).to.equal('test data', 'Unexpected msg text received'); + }); + + // wait for the channel to receive the message via rewind + await messages.items[1].promise; + } + + try { + await monitorConnectionAsync(test, [realtime1, realtime2]); + await closeAndFinishAsync([realtime1, realtime2]); + } catch (err) { + await closeAndFinishAsync([realtime1, realtime2], err); + return; + } + }); + + it('unsubscribes channels after timeout', async function () { + let realtime = helper.AblyRealtime({ clientId: 'testclient' }); + + async function test() { + const prefix = utils.cheapRandStr(); + const activeChannelName = `${prefix}:active`; + const consumerGroupName = `${prefix}:testgroup`; + + await new Promise((resolve) => realtime.connection.on('connected', resolve)); + + // create a group with a single consumer + const consumer = realtime.channelGroups.get(`${prefix}:.*`, { + activeChannel: activeChannelName, + consumerGroup: { name: consumerGroupName }, + subscriptionTimeout: 1000, + }); + + let events = 0; + let messages = new DeferredMany(2); + await consumer.subscribe((channel, msg) => { + events++; + expect(events).to.equal(1, 'Unexpected number of messages received'); + messages.items[0].resolve(); + expect(msg.data).to.equal('test data', 'Unexpected msg text received'); + expect(channel).to.equal(`${prefix}:channel`, 'Unexpected channel name'); + }); + + // publish active channels + const channelName = `${prefix}:channel`; + let activeChannel = realtime.channels.get(activeChannelName); + await activeChannel.attach(); + await activeChannel.publish('event0', { active: [channelName] }); + + // wait for the consumer to appear in the group + await waitForNConsumersInPresenceSet(realtime.channels.get(consumerGroupName), 1); + + // send a message to the channel + realtime.channels.get(channelName).publish('event0', 'test data'); + + // wait for the consumer to receive the message + await messages.items[0].promise; + + await realtime.channels.get(channelName).subscribe((msg) => { + events++; + expect(events).to.equal(2, 'Unexpected number of messages received'); + messages.items[1].resolve(); + expect(msg.data).to.equal('test data', 'Unexpected msg text received'); + }); + + // wait for the consumer to unsubscribe from the channel after timeout + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // send a message to the channel; the channel group should not receive it + realtime.channels.get(channelName).publish('event0', 'test data'); + + // wait for the channel to receive the message + await messages.items[1].promise; + } + + try { + await monitorConnectionAsync(test, realtime); + await closeAndFinishAsync(realtime); + } catch (err) { + await closeAndFinishAsync(realtime, err); + return; + } + }); + }); +}); From dc3fb7ed94540e9af49085e06995c92c34935a5c Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Fri, 8 Mar 2024 10:13:44 +0000 Subject: [PATCH 2/9] utils: remove arrIn polyfill --- src/common/lib/util/utils.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/common/lib/util/utils.ts b/src/common/lib/util/utils.ts index 78367d5ff5..4d03282635 100644 --- a/src/common/lib/util/utils.ts +++ b/src/common/lib/util/utils.ts @@ -197,10 +197,6 @@ export function arrUniqueBy(arr: Array, iteratee: (value: T) => unknown): return arr.filter((value, index) => arrIndexOfBy(arr, iteratee, iteratee(value)) === index); } -export function arrIn(arr: Array, val: unknown): boolean { - return arr.indexOf(val) !== -1; -} - export function arrDeleteValue(arr: Array, val: T): boolean { const idx = arr.indexOf(val); const res = idx != -1; From 1ccee4a2dab3c6898d905ba275de380985a61550 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Fri, 8 Mar 2024 10:17:35 +0000 Subject: [PATCH 3/9] docs: rename module to plugin --- modular.d.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modular.d.ts b/modular.d.ts index 936d192a2e..b4b5a3b812 100644 --- a/modular.d.ts +++ b/modular.d.ts @@ -261,7 +261,7 @@ export interface ModularPlugins { RealtimePresence?: typeof RealtimePresence; /** - * See {@link ChannelGroups | documentation for the `ChannelGroups` module}. + * See {@link ChannelGroups | documentation for the `ChannelGroups` plugin}. */ ChannelGroups?: typeof ChannelGroups; From fb78dd7d7ee78ce5aa20a57af9120c643a3db533 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Fri, 8 Mar 2024 11:10:58 +0000 Subject: [PATCH 4/9] test: add channelgroup test to browserlist --- test/support/browser_file_list.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/support/browser_file_list.js b/test/support/browser_file_list.js index 03ff54169b..dc11c72c99 100644 --- a/test/support/browser_file_list.js +++ b/test/support/browser_file_list.js @@ -29,6 +29,7 @@ window.__testFiles__.files = { 'test/support/test_helper.js': true, 'test/realtime/auth.test.js': true, 'test/realtime/channel.test.js': true, + 'test/realtime/channelgroup.test.js': true, 'test/realtime/connection.test.js': true, 'test/realtime/connectivity.test.js': true, 'test/realtime/crypto.test.js': true, From d3b727272c6573c718fb1d7d1da43969e96fa873 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Tue, 12 Mar 2024 08:53:06 +0000 Subject: [PATCH 5/9] channelgroup: deduplicate presence set by clientId Each client will be entered in the presence set once per connection, so we need to deduplicate the presence set by clientId. --- src/common/lib/client/baserealtime.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/common/lib/client/baserealtime.ts b/src/common/lib/client/baserealtime.ts index 38ab54dbbf..e4fec2281c 100644 --- a/src/common/lib/client/baserealtime.ts +++ b/src/common/lib/client/baserealtime.ts @@ -189,7 +189,11 @@ class ConsumerGroup extends EventEmitter { try { const result = await this.channel.presence.get({ waitForSync: true }); - const memberIds = result?.filter((member) => member.clientId).map((member) => member.clientId!) || []; + let memberIds = result?.filter((member) => member.clientId).map((member) => member.clientId!) || []; + // each member will be entered into the presence set once per connection, + // so we need to deduplicate the members by client ID + memberIds = memberIds.filter((id, index) => memberIds.indexOf(id) === index); + const { add, remove } = diffSets(this.locator.getNodes(), memberIds); Logger.logAction( From 3a5caae7b44976fc1f89ea9ee0fb7bc8ddf2ef55 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Mon, 8 Apr 2024 20:36:33 +0100 Subject: [PATCH 6/9] channelgroup: emit channel assigned/active events --- ably.d.ts | 13 ++++++++++++- src/common/lib/client/baserealtime.ts | 6 +++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index d670ac978b..4c0f929359 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -106,6 +106,11 @@ export type ChannelEvent = | ChannelEvents.DETACHING | ChannelEvents.UPDATE; +/** + * Describes the events emitted by a {@link ChannelGroup} object. + */ +export type ChannelGroupEvent = 'active.updated' | 'assigned.updated'; + /** * The `ConnectionStates` namespace describes the possible values of the {@link ConnectionState} type. */ @@ -1489,6 +1494,12 @@ export type channelAndMessageCallback = (channel: string, message: T) => void * @param changeStateChange - The state change that occurred. */ export type channelEventCallback = (changeStateChange: ChannelStateChange) => void; +/** + * The callback used for the events emitted by {@link ChannelGroup}. + * + * @param channels - The set of channels. + */ +export type channelGroupCallback = (channels: string[]) => void; /** * The callback used for the events emitted by {@link Connection}. * @@ -2020,7 +2031,7 @@ export declare interface Channel { * * Enables messages to be subscribed to on a group of channels. */ -export declare interface ChannelGroup { +export declare interface ChannelGroup extends EventEmitter { /** * Registers a listener for messages on this channel group. The caller supplies a listener function, which is called each time one or more messages arrives on the channels in the group. * diff --git a/src/common/lib/client/baserealtime.ts b/src/common/lib/client/baserealtime.ts index e4fec2281c..28a1bb2fb1 100644 --- a/src/common/lib/client/baserealtime.ts +++ b/src/common/lib/client/baserealtime.ts @@ -235,7 +235,7 @@ class ConsumerGroup extends EventEmitter { } } -class ChannelGroup { +class ChannelGroup extends EventEmitter { activeChannels: string[] = []; assignedChannels: string[] = []; active: RealtimeChannel; @@ -246,6 +246,7 @@ class ChannelGroup { consumerGroup: ConsumerGroup; constructor(readonly channels: Channels, readonly filter: string, readonly options?: API.ChannelGroupOptions) { + super(); this.subscriptions = new EventEmitter(); this.active = channels.get(this.safeChannelName(options?.activeChannel || '$ably:active')); this.consumerGroup = new ConsumerGroup(channels, options?.consumerGroup?.name); @@ -266,6 +267,7 @@ class ChannelGroup { await this.active.setOptions({ params: { rewind: '1' } }); await this.active.subscribe((msg: any) => { this.activeChannels = msg.data.active; + this.emit('active.updated', this.activeChannels); this.updateAssignedChannels(); }); } @@ -275,6 +277,7 @@ class ChannelGroup { await this.active.detach(); await this.consumerGroup.leave(); this.assignedChannels = []; + this.emit('assigned.updated', this.assignedChannels); this.removeSubscriptions(Object.keys(this.subscribedChannels)); } @@ -310,6 +313,7 @@ class ChannelGroup { 'ChannelGroups.updateAssignedChannels', 'assignedChannels=' + this.assignedChannels + ' consumerId=' + this.consumerGroup.consumerId, ); + this.emit('assigned.updated', this.assignedChannels); } private unsubscribeTimeout(channel: string) { From 2dab4c72ef8247a47024368da974c89fe553d890 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Wed, 10 Apr 2024 16:58:24 +0100 Subject: [PATCH 7/9] consumergroup: emit membership updated event --- ably.d.ts | 2 +- src/common/lib/client/baserealtime.ts | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index 4c0f929359..3e5effddc6 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -109,7 +109,7 @@ export type ChannelEvent = /** * Describes the events emitted by a {@link ChannelGroup} object. */ -export type ChannelGroupEvent = 'active.updated' | 'assigned.updated'; +export type ChannelGroupEvent = 'active.updated' | 'assigned.updated' | 'membership.updated'; /** * The `ConnectionStates` namespace describes the possible values of the {@link ConnectionState} type. diff --git a/src/common/lib/client/baserealtime.ts b/src/common/lib/client/baserealtime.ts index 28a1bb2fb1..663d324b62 100644 --- a/src/common/lib/client/baserealtime.ts +++ b/src/common/lib/client/baserealtime.ts @@ -210,7 +210,7 @@ class ConsumerGroup extends EventEmitter { this.locator.remove(member); }); - this.emit('membership'); + this.emit('membership', memberIds); Logger.logAction( Logger.LOG_MAJOR, 'ConsumerGroup.computeMembership()', @@ -250,7 +250,10 @@ class ChannelGroup extends EventEmitter { this.subscriptions = new EventEmitter(); this.active = channels.get(this.safeChannelName(options?.activeChannel || '$ably:active')); this.consumerGroup = new ConsumerGroup(channels, options?.consumerGroup?.name); - this.consumerGroup.on('membership', () => this.updateAssignedChannels()); + this.consumerGroup.on('membership', (memberIds: string[]) => { + this.emit('membership.updated', memberIds); + this.updateAssignedChannels(); + }); this.expression = new RegExp(filter); // eslint-disable-line security/detect-non-literal-regexp } From 029f5f523508741516b357ebb6aa7d4950e13953 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Fri, 26 Apr 2024 08:50:47 +0100 Subject: [PATCH 8/9] channelgroup: only emit active channels on changed Since the set of active channels are surfaced on $ably:active is a superset of those specified by a particular channel group expression, this commit ensures we only emit an active.updated event when the set of channels matching the consumer group expression have actually changed. --- src/common/lib/client/baserealtime.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/common/lib/client/baserealtime.ts b/src/common/lib/client/baserealtime.ts index 663d324b62..29d9d78842 100644 --- a/src/common/lib/client/baserealtime.ts +++ b/src/common/lib/client/baserealtime.ts @@ -269,7 +269,15 @@ class ChannelGroup extends EventEmitter { await this.consumerGroup.join(); await this.active.setOptions({ params: { rewind: '1' } }); await this.active.subscribe((msg: any) => { - this.activeChannels = msg.data.active; + if (!msg.data || !msg.data.active || !Array.isArray(msg.data.active)) { + return; + } + const newActive = msg.data.active.filter((x: string) => this.expression.test(x)); + newActive.sort(); + if (Utils.arrEquals(newActive, this.activeChannels)) { + return; + } + this.activeChannels = newActive; this.emit('active.updated', this.activeChannels); this.updateAssignedChannels(); }); @@ -296,9 +304,7 @@ class ChannelGroup extends EventEmitter { this.consumerGroup.consumerId, ); - const matched = this.activeChannels - .filter((x) => this.expression.test(x)) - .filter((x) => this.consumerGroup.assigned(x)); + const matched = this.activeChannels.filter((channel) => this.consumerGroup.assigned(channel)); const { add, remove } = diffSets(this.assignedChannels, matched); this.assignedChannels = matched; From 746280646c78ddd1acfb3518fa22745d59ab2ba9 Mon Sep 17 00:00:00 2001 From: Mike Christensen Date: Fri, 26 Apr 2024 09:26:56 +0100 Subject: [PATCH 9/9] channelgroups: emit empty active channels on leave --- src/common/lib/client/baserealtime.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/common/lib/client/baserealtime.ts b/src/common/lib/client/baserealtime.ts index 29d9d78842..2bcda79186 100644 --- a/src/common/lib/client/baserealtime.ts +++ b/src/common/lib/client/baserealtime.ts @@ -288,7 +288,9 @@ class ChannelGroup extends EventEmitter { await this.active.detach(); await this.consumerGroup.leave(); this.assignedChannels = []; + this.activeChannels = []; this.emit('assigned.updated', this.assignedChannels); + this.emit('active.updated', this.activeChannels); this.removeSubscriptions(Object.keys(this.subscribedChannels)); }