Skip to content

Commit

Permalink
conn: remove connection serial
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Aug 23, 2022
1 parent f512f21 commit 23de044
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 83 deletions.
4 changes: 2 additions & 2 deletions src/common/lib/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Connection extends EventEmitter {
state: string;
key?: string;
id?: string;
serial: undefined;
// serial: undefined;
errorReason: ErrorInfo | null;

constructor(ably: Realtime, options: NormalisedClientOptions) {
Expand All @@ -26,7 +26,7 @@ class Connection extends EventEmitter {
this.state = this.connectionManager.state.state;
this.key = undefined;
this.id = undefined;
this.serial = undefined;
// this.serial = undefined;
this.errorReason = null;

this.connectionManager.on('connectionstate', (stateChange: ConnectionStateChange) => {
Expand Down
1 change: 1 addition & 0 deletions src/common/lib/client/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class Channels extends EventEmitter {
}
}

// TODO(AD) is this still needed?
reattach(reason: ErrorInfo) {
for (const channelId in this.all) {
const channel = this.all[channelId];
Expand Down
168 changes: 87 additions & 81 deletions src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export class TransportParams {
mode: string;
format?: Utils.Format;
connectionKey?: string;
connectionSerial?: number;
// connectionSerial?: number;
stream?: any;
heartbeats?: boolean;

Expand All @@ -100,7 +100,7 @@ export class TransportParams {
this.connectionKey = connectionKey;
this.format = options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json;

this.connectionSerial = undefined;
// this.connectionSerial = undefined;
}

getConnectParams(authParams: Record<string, unknown>): Record<string, string> {
Expand Down Expand Up @@ -153,9 +153,9 @@ export class TransportParams {
if (this.connectionKey) {
result += ',connectionKey=' + this.connectionKey;
}
if (this.connectionSerial !== undefined) {
result += ',connectionSerial=' + this.connectionSerial;
}
// if (this.connectionSerial !== undefined) {
// result += ',connectionSerial=' + this.connectionSerial;
// }
if (this.format) {
result += ',format=' + this.format;
}
Expand Down Expand Up @@ -237,7 +237,7 @@ class ConnectionManager extends EventEmitter {
connectionDetails?: Record<string, any>;
connectionId?: string;
connectionKey?: string;
connectionSerial?: number;
// connectionSerial?: number;
connectionStateTtl: number;
maxIdleInterval: number | null;
transports: string[];
Expand Down Expand Up @@ -336,7 +336,7 @@ class ConnectionManager extends EventEmitter {
this.connectionDetails = undefined;
this.connectionId = undefined;
this.connectionKey = undefined;
this.connectionSerial = undefined;
// this.connectionSerial = undefined;
this.connectionStateTtl = timeouts.connectionStateTtl;
this.maxIdleInterval = null;

Expand Down Expand Up @@ -442,11 +442,11 @@ class ConnectionManager extends EventEmitter {
}

createTransportParams(host: string | null, mode: string): TransportParams {
const params = new TransportParams(this.options, host, mode, this.connectionKey);
if (this.connectionSerial !== undefined) {
params.connectionSerial = this.connectionSerial;
}
return params;
return new TransportParams(this.options, host, mode, this.connectionKey);
// if (this.connectionSerial !== undefined) {
// params.connectionSerial = this.connectionSerial;
// }
// return params;
}

getTransportParams(callback: Function): void {
Expand Down Expand Up @@ -619,13 +619,13 @@ class ConnectionManager extends EventEmitter {
Utils.arrIn(this.getUpgradePossibilities(), optimalTransport)
) {
setTimeout(() => {
this.scheduleTransportActivation(error, transport, connectionId, connectionDetails, connectionPosition);
this.scheduleTransportActivation(error, transport, connectionId, connectionDetails);
}, this.options.timeouts.parallelUpgradeDelay);
} else {
this.scheduleTransportActivation(error, transport, connectionId, connectionDetails, connectionPosition);
this.scheduleTransportActivation(error, transport, connectionId, connectionDetails);
}
} else {
this.activateTransport(error, transport, connectionId, connectionDetails, connectionPosition);
this.activateTransport(error, transport, connectionId, connectionDetails);

/* allow connectImpl to start the upgrade process if needed, but allow
* other event handlers, including activating the transport, to run first */
Expand Down Expand Up @@ -658,14 +658,12 @@ class ConnectionManager extends EventEmitter {
* @param transport
* @param connectionId
* @param connectionDetails
* @param upgradeConnectionPosition
*/
scheduleTransportActivation(
error: ErrorInfo,
transport: Transport,
connectionId: string,
connectionDetails: Record<string, any>,
upgradeConnectionPosition?: ConnectionManager
): void {
const currentTransport = this.activeProtocol && this.activeProtocol.getTransport(),
abandon = () => {
Expand Down Expand Up @@ -750,19 +748,15 @@ class ConnectionManager extends EventEmitter {
* it's still an upgrade, realtime still expects a sync - it just needs to
* be a sync with the new connection position. (And it
* needs to be set in the library, which is done by activateTransport). */
const connectionReset = connectionId !== this.connectionId,
syncPosition = connectionReset ? upgradeConnectionPosition : this;
// TODO(AD) Does this even matter any more? We're still just reattaching
// so who cares if its a different connection?
const connectionReset = connectionId !== this.connectionId;

if (connectionReset) {
Logger.logAction(
Logger.LOG_ERROR,
'ConnectionManager.scheduleTransportActivation()',
'Upgrade resulted in new connectionId; resetting library connection position from ' +
this.connectionSerial +
' to ' +
(syncPosition as ConnectionManager).connectionSerial +
'; upgrade error was ' +
error
'Upgrade resulted in new connectionId; err = ' + error
);
}

Expand All @@ -779,9 +773,9 @@ class ConnectionManager extends EventEmitter {
'ConnectionManager.scheduleTransportActivation()',
'Activating transport; transport = ' + transport
);
// TODO(AD) why would connection ID change from SYNC?
// @ts-ignore
this.activateTransport(error, transport, this.connectionId, connectionDetails, { connectionSerial: this.connectionSerial });
// TODO(AD) fix old commit that passes this.connectionId rather than
// connectionId
this.activateTransport(error, transport, connectionId, connectionDetails);
/* Restore pre-sync state. If state has changed in the meantime,
* don't touch it -- since the websocket transport waits a tick before
* disposing itself, it's possible for it to have happily synced
Expand Down Expand Up @@ -834,14 +828,13 @@ class ConnectionManager extends EventEmitter {
* @param transport the transport instance
* @param connectionId the id of the new active connection
* @param connectionDetails the details of the new active connection
* @param connectionPosition the position at the point activation, {connectionSerial: <serial>}
*/
activateTransport(
error: ErrorInfo,
transport: Transport,
connectionId: string,
connectionDetails: Record<string, any>,
connectionPosition: ConnectionManager
// connectionPosition: ConnectionManager
): boolean {
Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager.activateTransport()', 'transport = ' + transport);
if (error) {
Expand All @@ -857,13 +850,13 @@ class ConnectionManager extends EventEmitter {
'connectionDetails = ' + JSON.stringify(connectionDetails)
);
}
if (connectionPosition) {
Logger.logAction(
Logger.LOG_MICRO,
'ConnectionManager.activateTransport()',
'serial = ' + connectionPosition.connectionSerial
);
}
// if (connectionPosition) {
// Logger.logAction(
// Logger.LOG_MICRO,
// 'ConnectionManager.activateTransport()',
// 'serial = ' + connectionPosition.connectionSerial
// );
// }

this.persistTransportPreference(transport);

Expand Down Expand Up @@ -912,7 +905,7 @@ class ConnectionManager extends EventEmitter {

const connectionKey = connectionDetails.connectionKey;
if (connectionKey && this.connectionKey != connectionKey) {
this.setConnection(connectionId, connectionDetails, connectionPosition, !!error);
this.setConnection(connectionId, connectionDetails, !!error);
}

/* Rebroadcast any new connectionDetails from the active transport, which
Expand Down Expand Up @@ -1147,7 +1140,6 @@ class ConnectionManager extends EventEmitter {
setConnection(
connectionId: string,
connectionDetails: Record<string, any>,
connectionPosition: ConnectionManager,
hasConnectionError?: boolean
): void {
/* if connectionKey changes but connectionId stays the same, then just a
Expand Down Expand Up @@ -1176,6 +1168,8 @@ class ConnectionManager extends EventEmitter {
* state will be updated and so that it will be applied after
* Channels#onTransportUpdate, else channels will not have an ATTACHED
* sent twice (once from this and once from that). */
// TODO(AD) is this still needed given we will reattach on the
// new transport anyway
Platform.Config.nextTick(() => {
this.realtime.channels.reattach();
});
Expand All @@ -1192,61 +1186,66 @@ class ConnectionManager extends EventEmitter {
);
clearTimeout(this.channelResumeCheckTimer as number);
this.realtime.channels.resetAttachedMsgIndicators();
// TODO(AD) is this still needed given we're sending ATTACH anyway
this.channelResumeCheckTimer = setTimeout(() => {
this.realtime.channels.checkAttachedMsgIndicators(connectionId);
}, 30000);
}
this.realtime.connection.id = this.connectionId = connectionId;
this.realtime.connection.key = this.connectionKey = connectionDetails.connectionKey;
const forceResetMessageSerial = connIdChanged || !prevConnId;
this.setConnectionSerial(connectionPosition, forceResetMessageSerial, false);
// TODO(AD) will need to force channel to accept when connection ID changes?
// or I guess that won't matter anymore as the channel won't change - so
// no reason to force
// const forceResetMessageSerial = connIdChanged || !prevConnId;
// this.setConnectionSerial(connectionPosition, forceResetMessageSerial, false);
}

clearConnection(): void {
this.realtime.connection.id = this.connectionId = undefined;
this.realtime.connection.key = this.connectionKey = undefined;
this.clearConnectionSerial();
// this.clearConnectionSerial();
this.msgSerial = 0;
this.unpersistConnection();
}

/* force: set the connectionSerial even if it's less than the current
* connectionSerial. Used for new connections.
* Returns true iff the message was rejected as a duplicate. */
setConnectionSerial(connectionPosition: any, force?: boolean, fromChannelMessage?: boolean): void | true {
const connectionSerial = connectionPosition.connectionSerial;
Logger.logAction(
Logger.LOG_MICRO,
'ConnectionManager.setConnectionSerial()',
'Updating connection serial; serial = ' +
connectionSerial +
'; force = ' +
force +
'; previous = ' +
this.connectionSerial
);
if (connectionSerial !== undefined) {
if (connectionSerial <= (this.connectionSerial as number) && !force) {
if (fromChannelMessage) {
Logger.logAction(
Logger.LOG_ERROR,
'ConnectionManager.setConnectionSerial()',
'received message with connectionSerial ' +
connectionSerial +
', but current connectionSerial is ' +
this.connectionSerial +
'; assuming message is a duplicate and discarding it'
);
}
return true;
}
this.realtime.connection.serial = this.connectionSerial = connectionSerial;
}
}

clearConnectionSerial(): void {
this.realtime.connection.serial = this.connectionSerial = undefined;
}
// TODO remove
// setConnectionSerial(connectionPosition: any, force?: boolean, fromChannelMessage?: boolean): void | true {
// const connectionSerial = connectionPosition.connectionSerial;
// Logger.logAction(
// Logger.LOG_MICRO,
// 'ConnectionManager.setConnectionSerial()',
// 'Updating connection serial; serial = ' +
// connectionSerial +
// '; force = ' +
// force +
// '; previous = ' +
// this.connectionSerial
// );
// if (connectionSerial !== undefined) {
// if (connectionSerial <= (this.connectionSerial as number) && !force) {
// if (fromChannelMessage) {
// Logger.logAction(
// Logger.LOG_ERROR,
// 'ConnectionManager.setConnectionSerial()',
// 'received message with connectionSerial ' +
// connectionSerial +
// ', but current connectionSerial is ' +
// this.connectionSerial +
// '; assuming message is a duplicate and discarding it'
// );
// }
// return true;
// }
// this.realtime.connection.serial = this.connectionSerial = connectionSerial;
// }
// }

// clearConnectionSerial(): void {
// this.realtime.connection.serial = this.connectionSerial = undefined;
// }

getRecoveryKey(): string | null {
if (!this.connectionKey) {
Expand Down Expand Up @@ -2143,15 +2142,22 @@ class ConnectionManager extends EventEmitter {
* idle), message can validly arrive on it even though it isn't active */
if (onActiveTransport || onUpgradeTransport) {
if (notControlMsg) {
const suppressed = this.setConnectionSerial(message, false, true);
if (suppressed) {
return;
}
// TODO(AD) need setChannelSerial to check for duplicate given
// channelSerial
// const suppressed = this.setConnectionSerial(message, false, true);
// if (suppressed) {
// return;
// }

// TODO(AD) Keep this? Not sure why its needed though given we've already
// checked channel serial
// TODO(AD) This will not run before the channel serial is checked - is this
// ok?
if (ProtocolMessage.isDuplicate(message, this.mostRecentMsg)) {
Logger.logAction(
Logger.LOG_ERROR,
'ConnectionManager.onChannelMessage()',
'received message with different connectionSerial, but same message id as a previous; discarding; id = ' +
'received message with the same message id as a previous; discarding; id = ' +
message.id
);
return;
Expand Down

0 comments on commit 23de044

Please sign in to comment.