Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental v2/channel and consumer group #1661

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ ably-js.iml
node_modules
npm-debug.log
.tool-versions
*.swp
*.swo
build/
react/
typedoc/generated/
Expand Down
126 changes: 125 additions & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ export type ChannelEvent =
| ChannelEvents.DETACHING
| ChannelEvents.UPDATE;

/**
* Describes the events emitted by a {@link ChannelGroup} object.
*/
export type ChannelGroupEvent = 'active.updated' | 'assigned.updated' | 'membership.updated';

/**
* The `ConnectionStates` namespace describes the possible values of the {@link ConnectionState} type.
*/
Expand Down Expand Up @@ -859,6 +864,56 @@ export interface ChannelOptions {
modes?: ChannelMode[];
}

/**
* Describes the consumer group. Consumers in the same group will partition the channels of that group.
*/
interface ConsumerGroupOptions {
/**
* The name of the consumer group.
* Channels will be partitioned across consumers in the same group identified by this name.
*/
name: string;
}

/**
* Allows specifying properties of a {@link ChannelGroup}
*/
interface ChannelGroupOptions {
/**
* Options for a consumer group used to partition the channels in the channel group across consumers in the consumer group.
*/
consumerGroup?: ConsumerGroupOptions;
/**
* The name of the channel that receives the set of active channels in the group.
* For correct behaviour, this channel should have persist-last enabled.
* Note that this is a temporary option only required for the client-side simulation of channel groups.
*
* @defaultValue $ably:active
*/
activeChannel?: string;
/**
* The rewind interval to use when attaching to the matched channels in the group.
* This faciltates at-least-once delivery in the event of a consumer group scaling event.
* Note that this is a temporary option only required for the client-side simulation of channel groups.
*
* @defaultValue 5s
*/
rewind?: string;

/**
* The time for which the client will remain subscribed to a given channel after the last message is received.
* In the client-side simulation of channel groups, subscribing to the channel has the effect of keeping
* the channel active for the duration of the subscription. This timeout is used to determine how long to remain
* subscribed to a channel while no messages are received on it. It is possible that a message is sent after the
* client unsubscribes and before the channel becomes inactive, which means the client may not re-subscribe to the
* channel and could miss a message.
* Note that this is a temporary option only required for the client-side simulation of channel groups.
*
* @defaultValue 60 * 60 * 1000 (1 hour)
*/
subscriptionTimeout?: number;
}

/**
* Passes additional properties to a {@link RealtimeChannel} name to produce a new derived channel
*/
Expand Down Expand Up @@ -1426,12 +1481,25 @@ export interface TokenRevocationFailureResult {
* @param message - The message which triggered the callback.
*/
export type messageCallback<T> = (message: T) => void;
/**
* A callback which returns a channel and message argument, used for {@link ChannelGroup} subscriptions.
*
* @param channel - The channel name on which the message was received.
* @param message - The message which triggered the callback.
*/
export type channelAndMessageCallback<T> = (channel: string, message: T) => void;
/**
* The callback used for the events emitted by {@link RealtimeChannel}.
*
* @param changeStateChange - The state change that occurred.
*/
export type channelEventCallback = (changeStateChange: ChannelStateChange) => void;
/**
* The callback used for the events emitted by {@link ChannelGroup}.
*
* @param channels - The set of channels.
*/
export type channelGroupCallback = (channels: string[]) => void;
/**
* The callback used for the events emitted by {@link Connection}.
*
Expand Down Expand Up @@ -1958,6 +2026,41 @@ export declare interface Channel {
status(): Promise<ChannelDetails>;
}

/**
* This is a preview feature and may change in a future non-major release.
*
* Enables messages to be subscribed to on a group of channels.
*/
export declare interface ChannelGroup extends EventEmitter<channelGroupCallback, string[], ChannelGroupEvent> {
/**
* Registers a listener for messages on this channel group. The caller supplies a listener function, which is called each time one or more messages arrives on the channels in the group.
*
* @param callback - An event listener function.
*/
subscribe(callback: channelAndMessageCallback<InboundMessage>): Promise<void>;

/**
* Deregisters a listener for messages on this channel group.
*
* @param callback - An event listener function.
*/
unsubscribe(callback: channelAndMessageCallback<InboundMessage>): void;

/**
* Joins the consumer group if one was created for this channel group,
* and returns a promise that is resolved when the channel group is attached
* to the active channel.
*/
join(): Promise<void>;

/**
* Leaves the consumer group if one was created for this channel group,
* and returns a promise that is resolved when the channel group is detached
* from the active channel.
*/
leave(): Promise<void>;
}

/**
* Enables messages to be published and subscribed to. Also enables historic messages to be retrieved and provides access to the {@link RealtimePresence} object of a channel.
*/
Expand Down Expand Up @@ -2168,7 +2271,8 @@ export declare interface Channels<T> {
*/
get(name: string, channelOptions?: ChannelOptions): T;
/**
* @experimental This is a preview feature and may change in a future non-major release.
* This is a preview feature and may change in a future non-major release.
*
* This experimental method allows you to create custom realtime data feeds by selectively subscribing
* to receive only part of the data from the channel.
* See the [announcement post](https://pages.ably.com/subscription-filters-preview) for more information.
Expand All @@ -2190,6 +2294,22 @@ export declare interface Channels<T> {
release(name: string): void;
}

/**
* This is a preview feature and may change in a future non-major release.
*
* Creates and destroys {@link ChannelGroup} objects.
*/
export declare interface ChannelGroups {
/**
* Creates a new {@link ChannelGroup} object, with the specified {@link ChannelGroupOptions}, or returns the existing channel group object.
*
* @param filter - A regular expression defining the channel group. Only wildcards are supported.
* @param options - A {@link ChannelGroupOptions} object.
* @returns A {@link ChannelGroup} object.
*/
get(filter: string, options?: ChannelGroupOptions): ChannelGroup;
}

/**
* Contains an individual message that is sent to, or received from, Ably.
*/
Expand Down Expand Up @@ -2725,6 +2845,10 @@ export declare class Realtime implements RealtimeClient {
connect(): void;
auth: Auth;
channels: Channels<RealtimeChannel>;
/**
* This is a preview feature and may change in a future non-major release.
*/
channelGroups: ChannelGroups;
connection: Connection;
request<T = any>(
method: string,
Expand Down
29 changes: 29 additions & 0 deletions modular.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
RealtimeClient,
Auth,
Channels,
ChannelGroups as ChannelGroupsImpl,
Channel,
HttpPaginatedResponse,
StatsParams,
Expand Down Expand Up @@ -131,6 +132,25 @@ export declare const MsgPack: unknown;
*/
export declare const RealtimePresence: unknown;

/**
* The module is experimental and the API is subject to change.
mschristensen marked this conversation as resolved.
Show resolved Hide resolved
*
* Provides a {@link BaseRealtime} instance with the ability to interact with the experiemental channel groups feature.
*
* To create a client that includes this module, include it in the `ModulesMap` that you pass to the {@link BaseRealtime.constructor}:
*
* ```javascript
* import { BaseRealtime, WebSocketTransport, FetchRequest, RealtimePresence, ChannelGroups } from 'ably/modules';
* const realtime = new BaseRealtime(options, { WebSocketTransport, FetchRequest, RealtimePresence, ChannelGroups });
* ```
*
* If you do not provide this module, then attempting to use the functionality of channel groups will cause a runtime error.
*
* Note that in the experimental client-side simulation of ChannelGroups, you must provide the RealtimePresence module
* which is required for internal coordination among consumers.
*/
export declare const ChannelGroups: unknown;

/**
* Provides a {@link BaseRealtime} instance with the ability to establish a connection with the Ably realtime service using a [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) connection.
*
Expand Down Expand Up @@ -240,6 +260,11 @@ export interface ModularPlugins {
*/
RealtimePresence?: typeof RealtimePresence;

/**
* See {@link ChannelGroups | documentation for the `ChannelGroups` plugin}.
*/
ChannelGroups?: typeof ChannelGroups;

/**
* See {@link WebSocketTransport | documentation for the `WebSocketTransport` plugin}.
*/
Expand Down Expand Up @@ -353,6 +378,10 @@ export declare class BaseRealtime implements RealtimeClient {
connect(): void;
auth: Auth;
channels: Channels<RealtimeChannel>;
/**
* This is a preview feature and may change in a future non-major release.
*/
channelGroups: ChannelGroupsImpl;
connection: Connection;
request<T = any>(
method: string,
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const pluginNames = [
'XHRRequest',
'FetchRequest',
'MessageInteractions',
'ChannelGroups',
];

// List of all free-standing functions exported by the library along with the
Expand Down
Loading