From 51cf0b048a42b203a04bfe3d1c1e3cb715fc7ead Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Thu, 25 May 2023 17:32:14 -0300 Subject: [PATCH] Make ConnectionManager#onChannelMessages process messages asynchronously MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is preparation for #1293 (making ICipher.decrypt asynchronous). This will require us to make RealtimeChannel#processMessage asynchronous. Since RealtimeChannel#processMessage reads from and writes to the _decodingContext.baseEncodedPreviousPayload property of the channel, we need to ensure that, once this method becomes asynchronous, we serialise access to this method — that is, we wait for one call to complete before performing the next. To do this, we need to introduce a queue. I decided to put this queue at the level of the ConnectionManager instead of RealtimeChannel. This is because ConnectionManager also has its own logic for deciding whether a message should be processed — specifically, whether it comes from the current transport — and I thought it made sense to evaluate these conditions at the moment we pass the message to the channel. I’m not 100% sure this is the right choice, though, since it means that the synchronisation is now the concern of three components (ConnectionManager, Channels, RealtimeChannel) when it instead could be the concern of just RealtimeChannel. But we can always revisit this. The handling of the case where ConnectionManager#processChannelMessage throws an error is copied from the places where this error was previously handled — namely, WebSocketTransport.onWsData and CometTransport.onData, both of which log an error message without affecting the processing of subsequent messages. (Note also that this marks the first use of `async` or promises internally in the library. We have avoided this until now because we were not guaranteed to be running in browsers with Promise support, but since the library will _only_ be exposing a Promise API as of #1199, which is also scheduled for version 2.0, we’re fine to start doing so now.) --- src/common/lib/transport/connectionmanager.ts | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/common/lib/transport/connectionmanager.ts b/src/common/lib/transport/connectionmanager.ts index 14f713886d..0221c47ad2 100644 --- a/src/common/lib/transport/connectionmanager.ts +++ b/src/common/lib/transport/connectionmanager.ts @@ -215,6 +215,12 @@ class ConnectionManager extends EventEmitter { suspendTimer?: number | NodeJS.Timeout | null; retryTimer?: number | NodeJS.Timeout | null; disconnectedRetryCount: number = 0; + pendingChannelMessagesState: { + // Whether a message is currently being processed + isProcessing: boolean; + // The messages remaining to be processed (excluding any message currently being processed) + queue: { message: ProtocolMessage; transport: Transport }[]; + } = { isProcessing: false, queue: [] }; constructor(realtime: Realtime, options: ClientOptions) { super(); @@ -1966,6 +1972,34 @@ class ConnectionManager extends EventEmitter { } onChannelMessage(message: ProtocolMessage, transport: Transport): void { + this.pendingChannelMessagesState.queue.push({ message, transport }); + + if (!this.pendingChannelMessagesState.isProcessing) { + this.processNextPendingChannelMessage(); + } + } + + private processNextPendingChannelMessage() { + if (this.pendingChannelMessagesState.queue.length > 0) { + this.pendingChannelMessagesState.isProcessing = true; + + const pendingChannelMessage = this.pendingChannelMessagesState.queue.shift()!; + this.processChannelMessage(pendingChannelMessage.message, pendingChannelMessage.transport) + .catch((err) => { + Logger.logAction( + Logger.LOG_ERROR, + 'ConnectionManager.processNextPendingChannelMessage() received error ', + err + ); + }) + .finally(() => { + this.pendingChannelMessagesState.isProcessing = false; + this.processNextPendingChannelMessage(); + }); + } + } + + private async processChannelMessage(message: ProtocolMessage, transport: Transport) { const onActiveTransport = this.activeProtocol && transport === this.activeProtocol.getTransport(), onUpgradeTransport = Utils.arrIn(this.pendingTransports, transport) && this.state == this.states.synchronizing;