Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materialisation: Added new message fields. #1888

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2281,7 +2281,7 @@
* and {@link ChannelOptions}, or returns the existing channel object.
*
* @experimental This is a preview feature and may change in a future non-major release.
* This experimental method allows you to create custom realtime data feeds by selectively subscribing

Check warning on line 2284 in ably.d.ts

View workflow job for this annotation

GitHub Actions / lint

Expected no lines between tags
* to receive only part of the data from the channel.
* See the [announcement post](https://pages.ably.com/subscription-filters-preview) for more information.
*
Expand Down Expand Up @@ -2335,12 +2335,104 @@
* Timestamp of when the message was received by Ably, as milliseconds since the Unix epoch.
*/
timestamp?: number;
/**
* The action type of the message, one of the {@link MessageAction} enum values.
*/
action?: MessageAction;
/**
* This message's unique serial.
*/
serial?: string;
/**
* The serial of the message that this message is a reference to.
*/
refSerial?: string;
/**
* The type of reference this message is, in relation to the message it references.
*/
refType?: string;
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
/**
* If an `update` operation was applied to this message, this will be the timestamp the update occurred.
*/
updatedAt?: number;
/**
* If a `deletion` operation was applied to this message, this will be the timestamp the deletion occurred.
*/
deletedAt?: number;
/**
* If this message resulted from an operation, this will contain the operation details.
*/
operation?: Operation;
}

/**
* Contains the details of an operation, such as update or deletion, supplied by the actioning client.
*/
export interface Operation {
/**
* The client ID of the client that initiated the operation.
*/
clientId?: string;
/**
* The description provided by the client that initiated the operation.
*/
description?: string;
/**
* A JSON object of string key-value pairs that may contain metadata associated with the operation.
*/
metadata?: Record<string, string>;
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
}
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved

/**
* The namespace containing the different types of message actions.
*/
declare namespace MessageActions {
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Message action has not been set.
*/
type MESSAGE_UNSET = 'message.unset';
/**
* Message action for a newly created message.
*/
type MESSAGE_CREATE = 'message.create';
/**
* Message action for an updated message.
*/
type MESSAGE_UPDATE = 'message.update';
/**
* Message action for a deleted message.
*/
type MESSAGE_DELETE = 'message.delete';
/**
* Message action for a newly created annotation.
*/
type ANNOTATION_CREATE = 'annotation.create';
/**
* Message action for a deleted annotation.
*/
type ANNOTATION_DELETE = 'annotation.delete';
/**
* Message action for a meta-message that contains channel occupancy information.
*/
type META_OCCUPANCY = 'meta.occupancy';
}

/**
* Describes the possible action types used on an {@link Message}.
*/
export type MessageAction =
| MessageActions.MESSAGE_UNSET
| MessageActions.MESSAGE_CREATE
| MessageActions.MESSAGE_UPDATE
| MessageActions.MESSAGE_DELETE
| MessageActions.ANNOTATION_CREATE
| MessageActions.ANNOTATION_DELETE
| MessageActions.META_OCCUPANCY;
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved

/**
* A message received from Ably.
*/
export type InboundMessage = Message & Required<Pick<Message, 'id' | 'timestamp'>>;
export type InboundMessage = Message & Required<Pick<Message, 'id' | 'timestamp' | 'serial' | 'action'>>;
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Static utilities related to messages.
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
"grunt": "grunt",
"test": "npm run test:node",
"test:node": "npm run build:node && npm run build:push && mocha",
"test:grep": "npm run build:node && npm run build:push && mocha --grep",
"test:node:skip-build": "mocha",
"test:webserver": "grunt test:webserver",
"test:playwright": "node test/support/runPlaywrightTests.js",
Expand Down
2 changes: 1 addition & 1 deletion scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { gzip } from 'zlib';
import Table from 'cli-table';

// The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel)
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 98, gzip: 30 };
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 };

const baseClientNames = ['BaseRest', 'BaseRealtime'];

Expand Down
4 changes: 2 additions & 2 deletions src/common/lib/client/restchannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import * as Utils from '../util/utils';
import Logger from '../util/logger';
import RestPresence from './restpresence';
import Message, {
fromValues as messageFromValues,
fromValuesArray as messagesFromValuesArray,
encodeArray as encodeMessagesArray,
serialize as serializeMessage,
getMessagesSize,
CipherOptions,
fromValues as messageFromValues,
fromValuesArray as messagesFromValuesArray,
} from '../types/message';
import ErrorInfo from '../types/errorinfo';
import { PaginatedResult } from './paginatedresource';
Expand Down
11 changes: 6 additions & 5 deletions src/common/lib/types/defaultmessage.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import Message, {
CipherOptions,
fromEncoded,
fromEncodedArray,
encode,
decode,
encode,
EncodingDecodingContext,
fromEncoded,
fromEncodedArray,
fromValues,
} from './message';
import * as API from '../../../../ably';
import Platform from 'common/platform';
Expand All @@ -25,8 +26,8 @@ export class DefaultMessage extends Message {
}

// Used by tests
static fromValues(values: unknown): Message {
return Object.assign(new Message(), values);
static fromValues(values: Message | Record<string, unknown>, options?: { stringifyAction?: boolean }): Message {
return fromValues(values, options);
}

// Used by tests
Expand Down
62 changes: 58 additions & 4 deletions src/common/lib/types/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,30 @@ import * as API from '../../../../ably';
import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic';
import { MsgPack } from 'common/types/msgpack';

const MessageActionArray: API.MessageAction[] = [
'message.unset',
'message.create',
'message.update',
'message.delete',
'annotation.create',
'annotation.delete',
'meta.occupancy',
];

const MessageActionMap = new Map<API.MessageAction, number>(MessageActionArray.map((action, index) => [action, index]));

const ReverseMessageActionMap = new Map<number, API.MessageAction>(
MessageActionArray.map((action, index) => [index, action]),
);

function toMessageActionString(actionNumber: number): API.MessageAction | undefined {
return ReverseMessageActionMap.get(actionNumber);
}

function toMessageActionNumber(messageAction?: API.MessageAction): number | undefined {
return messageAction ? MessageActionMap.get(messageAction) : undefined;
}
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved

export type CipherOptions = {
channelCipher: {
encrypt: Function;
Expand Down Expand Up @@ -82,7 +106,7 @@ export async function fromEncoded(
encoded: unknown,
inputOptions?: API.ChannelOptions,
): Promise<Message> {
const msg = fromValues(encoded);
const msg = fromValues(encoded as Message | Record<string, unknown>, { stringifyAction: true });
const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null);
/* if decoding fails at any point, catch and return the message decoded to
* the fullest extent possible */
Expand Down Expand Up @@ -260,7 +284,7 @@ export async function fromResponseBody(
}

for (let i = 0; i < body.length; i++) {
const msg = (body[i] = fromValues(body[i]));
const msg = (body[i] = fromValues(body[i], { stringifyAction: true }));
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
try {
await decode(msg, options);
} catch (e) {
Expand All @@ -270,14 +294,22 @@ export async function fromResponseBody(
return body;
}

export function fromValues(values: unknown): Message {
export function fromValues(
values: Message | Record<string, unknown>,
options?: { stringifyAction?: boolean },
): Message {
const stringifyAction = options?.stringifyAction;
if (stringifyAction) {
const action = toMessageActionString(values.action as number) || values.action;
return Object.assign(new Message(), { ...values, action });
}
return Object.assign(new Message(), values);
}

export function fromValuesArray(values: unknown[]): Message[] {
const count = values.length,
result = new Array(count);
for (let i = 0; i < count; i++) result[i] = fromValues(values[i]);
for (let i = 0; i < count; i++) result[i] = fromValues(values[i] as Record<string, unknown>);
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
return result;
}

Expand All @@ -304,6 +336,13 @@ class Message {
encoding?: string | null;
extras?: any;
size?: number;
action?: API.MessageAction | number;
serial?: string;
refSerial?: string;
refType?: string;
updatedAt?: number;
deletedAt?: number;
operation?: API.Operation;

/**
* Overload toJSON() to intercept JSON.stringify()
Expand Down Expand Up @@ -334,6 +373,13 @@ class Message {
connectionId: this.connectionId,
connectionKey: this.connectionKey,
extras: this.extras,
serial: this.serial,
action: toMessageActionNumber(this.action as API.MessageAction) || this.action,
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
refSerial: this.refSerial,
refType: this.refType,
updatedAt: this.updatedAt,
deletedAt: this.deletedAt,
operation: this.operation,
encoding,
data,
};
Expand All @@ -355,6 +401,14 @@ class Message {
else result += '; data (json)=' + JSON.stringify(this.data);
}
if (this.extras) result += '; extras=' + JSON.stringify(this.extras);

if (this.action) result += '; action=' + this.action;
if (this.serial) result += '; serial=' + this.serial;
if (this.refSerial) result += '; refSerial=' + this.refSerial;
if (this.refType) result += '; refType=' + this.refType;
if (this.updatedAt) result += '; updatedAt=' + this.updatedAt;
if (this.deletedAt) result += '; deletedAt=' + this.deletedAt;
if (this.operation) result += '; operation=' + JSON.stringify(this.operation);
splindsay-92 marked this conversation as resolved.
Show resolved Hide resolved
result += ']';
return result;
}
Expand Down
18 changes: 14 additions & 4 deletions src/common/lib/types/protocolmessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,26 @@ export function fromDeserialized(
presenceMessagePlugin: PresenceMessagePlugin | null,
): ProtocolMessage {
const error = deserialized.error;
if (error) deserialized.error = ErrorInfo.fromValues(error as ErrorInfo);
if (error) {
deserialized.error = ErrorInfo.fromValues(error as ErrorInfo);
}

const messages = deserialized.messages as Message[];
if (messages) for (let i = 0; i < messages.length; i++) messages[i] = messageFromValues(messages[i]);
if (messages) {
for (let i = 0; i < messages.length; i++) {
messages[i] = messageFromValues(messages[i], { stringifyAction: true });
}
}

const presence = presenceMessagePlugin ? (deserialized.presence as PresenceMessage[]) : undefined;
if (presenceMessagePlugin) {
if (presence && presenceMessagePlugin)
for (let i = 0; i < presence.length; i++)
if (presence && presenceMessagePlugin) {
for (let i = 0; i < presence.length; i++) {
presence[i] = presenceMessagePlugin.presenceMessageFromValues(presence[i], true);
}
}
}

return Object.assign(new ProtocolMessage(), { ...deserialized, presence });
}

Expand Down
2 changes: 2 additions & 0 deletions test/realtime/crypto.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
helper.recordPrivateApi('call.msgpack.decode');
var messageFromMsgpack = Message.fromValues(
msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)),
{ stringifyAction: true },
);

try {
Expand Down Expand Up @@ -439,6 +440,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
helper.recordPrivateApi('call.msgpack.decode');
var messageFromMsgpack = Message.fromValues(
msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)),
{ stringifyAction: true },
);

try {
Expand Down
51 changes: 51 additions & 0 deletions test/realtime/message.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
var expect = chai.expect;
let config = Ably.Realtime.Platform.Config;
var createPM = Ably.protocolMessageFromDeserialized;
var Message = Ably.Realtime.Message;

var publishIntervalHelper = function (currentMessageNum, channel, dataFn, onPublish) {
return function () {
Expand Down Expand Up @@ -1271,6 +1272,56 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async
channel.publish('end', null);
});
});
/**
* @spec TM2j
*/
describe('DefaultMessage.fromValues stringify action', function () {
const testCases = [
{
description: 'should stringify the numeric action',
action: 1,
options: { stringifyAction: true },
expectedString: '[Message; action=message.create]',
expectedJSON: { action: 1 },
},
{
description: 'should not stringify the numeric action',
action: 1,
options: { stringifyAction: false },
expectedString: '[Message; action=1]',
expectedJSON: { action: 1 },
},
{
description: 'should accept an already stringified action',
action: 'message.update',
options: { stringifyAction: true },
expectedString: '[Message; action=message.update]',
expectedJSON: { action: 2 },
},
{
description: 'should handle no action provided',
action: undefined,
options: { stringifyAction: true },
expectedString: '[Message]',
expectedJSON: { action: undefined },
},
{
description: 'should handle unknown action provided',
action: 10,
options: { stringifyAction: true },
expectedString: '[Message; action=10]',
expectedJSON: { action: 10 },
},
];
testCases.forEach(({ description, action, options, expectedString, expectedJSON }) => {
it(description, function () {
const values = { action };
const message = Message.fromValues(values, options);
expect(message.toString()).to.equal(expectedString);
expect(message.toJSON()).to.deep.contains(expectedJSON);
});
});
});

/**
* @spec RTS5
Expand Down
Loading
Loading