Skip to content

Commit

Permalink
Make ConnectionManager#processChannelMessage async
Browse files Browse the repository at this point in the history
This is preparation for #1293 (making ICipher.decrypt asynchronous).
This will require us to make RealtimeChannel#processMessage asynchronous.

Since RealtimeChannel#processMessage reads from and writes to the
_decodingContext.baseEncodedPreviousPayload property of the channel, we
need to ensure that, once this method becomes asynchronous, we serialise
access to this method — that is, we wait for one call to complete before
performing the next.

To do this, we need to introduce a queue. I decided to put this queue at
the level of the ConnectionManager instead of RealtimeChannel. This is
because ConnectionManager also has its own logic for deciding whether a
message should be processed — specifically, whether it comes from the
current transport — and I thought it made sense to evaluate these
conditions at the moment we pass the message to the channel. I’m not
100% sure this is the right choice, though, since it means that the
synchronisation is now the concern of three components
(ConnectionManager, Channels, RealtimeChannel) when it instead could be
the concern of just RealtimeChannel. But we can always revisit this.

The handling of the case where ConnectionManager#processChannelMessage
throws an error is copied from the places where this error was
previously handled — namely, WebSocketTransport.onWsData and
CometTransport.onData, both of which log an error message without
affecting the processing of subsequent messages.

(Note also that this marks the first use of `async` or promises
internally in the library. We have avoided this until now because we
were not guaranteed to be running in browsers with Promise support, but
since the library will _only_ be exposing a Promise API as of #1199,
which is also scheduled for version 2.0, we’re fine to start doing so
now.)
  • Loading branch information
lawrence-forooghian committed Jun 1, 2023
1 parent 046b8f7 commit 05cc23d
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions src/common/lib/transport/connectionmanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ class ConnectionManager extends EventEmitter {
suspendTimer?: number | NodeJS.Timeout | null;
retryTimer?: number | NodeJS.Timeout | null;
disconnectedRetryCount: number = 0;
pendingChannelMessagesState: {
// Whether a message is currently being processed
isProcessing: boolean;
// The messages remaining to be processed (excluding any message currently being processed)
queue: { message: ProtocolMessage; transport: Transport }[];
} = { isProcessing: false, queue: [] };

constructor(realtime: Realtime, options: ClientOptions) {
super();
Expand Down Expand Up @@ -1966,6 +1972,34 @@ class ConnectionManager extends EventEmitter {
}

onChannelMessage(message: ProtocolMessage, transport: Transport): void {
this.pendingChannelMessagesState.queue.push({ message, transport });

if (!this.pendingChannelMessagesState.isProcessing) {
this.processNextPendingChannelMessage();
}
}

private processNextPendingChannelMessage() {
if (this.pendingChannelMessagesState.queue.length > 0) {
this.pendingChannelMessagesState.isProcessing = true;

const pendingChannelMessage = this.pendingChannelMessagesState.queue.shift()!;
this.processChannelMessage(pendingChannelMessage.message, pendingChannelMessage.transport)
.catch((err) => {
Logger.logAction(
Logger.LOG_ERROR,
'ConnectionManager.processNextPendingChannelMessage() received error ',
err
);
})
.finally(() => {
this.pendingChannelMessagesState.isProcessing = false;
this.processNextPendingChannelMessage();
});
}
}

private async processChannelMessage(message: ProtocolMessage, transport: Transport) {
const onActiveTransport = this.activeProtocol && transport === this.activeProtocol.getTransport(),
onUpgradeTransport = Utils.arrIn(this.pendingTransports, transport) && this.state == this.states.synchronizing;

Expand Down

0 comments on commit 05cc23d

Please sign in to comment.