From 346e5ebd37389651db30e989c8576ed1882f64e1 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Mon, 29 May 2023 08:53:42 -0300 Subject: [PATCH] Make RealtimeChannel#processMessage async Preparation for #1293 (making ICipher.decrypt asynchronous). --- src/common/lib/client/realtime.ts | 2 +- src/common/lib/client/realtimechannel.ts | 3 +- test/realtime/failure.test.js | 8 +- test/realtime/presence.test.js | 66 ++++++---- test/realtime/sync.test.js | 146 ++++++++++++----------- 5 files changed, 128 insertions(+), 97 deletions(-) diff --git a/src/common/lib/client/realtime.ts b/src/common/lib/client/realtime.ts index caba09bbbf..7911c90966 100644 --- a/src/common/lib/client/realtime.ts +++ b/src/common/lib/client/realtime.ts @@ -103,7 +103,7 @@ class Channels extends EventEmitter { ); return; } - channel.processMessage(msg); + await channel.processMessage(msg); } /* called when a transport becomes connected; reattempt attach/detach diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 0ce7d57cca..b1c096ad05 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -582,7 +582,8 @@ class RealtimeChannel extends Channel { this.sendMessage(msg, callback); } - processMessage(message: ProtocolMessage): void { + // Access to this method is synchronised by ConnectionManager#processChannelMessage, in order to synchronise access to the state stored in _decodingContext. + async processMessage(message: ProtocolMessage): Promise { if ( message.action === actions.ATTACHED || message.action === actions.MESSAGE || diff --git a/test/realtime/failure.test.js b/test/realtime/failure.test.js index d660d47c1a..142ac9c5f1 100644 --- a/test/realtime/failure.test.js +++ b/test/realtime/failure.test.js @@ -329,11 +329,11 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async channel = realtime.channels.get('failed_attach'), originalProcessMessage = channel.processMessage.bind(channel); - channel.processMessage = function (message) { + channel.processMessage = async function (message) { if (message.action === 11) { return; } - originalProcessMessage(message); + await originalProcessMessage(message); }; realtime.connection.once('connected', function () { @@ -371,12 +371,12 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async var performance = isBrowser ? window.performance : require('perf_hooks').performance; - channel.processMessage = function (message) { + channel.processMessage = async function (message) { // Ignore ATTACHED messages if (message.action === 11) { return; } - originalProcessMessage(message); + await originalProcessMessage(message); }; realtime.connection.on('connected', function () { diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index de74a1df6f..1c90d988d9 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -1732,18 +1732,27 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }, function (cb) { /* Inject an additional member locally */ - channel.processMessage({ - action: 14, - id: 'messageid:0', - connectionId: 'connid', - timestamp: utils.now(), - presence: [ - { - clientId: goneClientId, - action: 'enter', - }, - ], - }); + channel + .processMessage({ + action: 14, + id: 'messageid:0', + connectionId: 'connid', + timestamp: utils.now(), + presence: [ + { + clientId: goneClientId, + action: 'enter', + }, + ], + }) + .then(function () { + cb(null); + }) + .catch(function (err) { + cb(err); + }); + }, + function (cb) { channel.presence.get(function (err, members) { try { expect(members && members.length).to.equal(2, 'Check two members present'); @@ -1812,18 +1821,27 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }, function (cb) { /* Inject a member locally */ - channel.processMessage({ - action: 14, - id: 'messageid:0', - connectionId: 'connid', - timestamp: utils.now(), - presence: [ - { - clientId: fakeClientId, - action: 'enter', - }, - ], - }); + channel + .processMessage({ + action: 14, + id: 'messageid:0', + connectionId: 'connid', + timestamp: utils.now(), + presence: [ + { + clientId: fakeClientId, + action: 'enter', + }, + ], + }) + .then(function () { + cb(); + }) + .catch(function () { + cb(err); + }); + }, + function (cb) { channel.presence.get(function (err, members) { try { expect(members && members.length).to.equal(1, 'Check one member present'); diff --git a/test/realtime/sync.test.js b/test/realtime/sync.test.js index e6f7eb10ae..5f62dabe98 100644 --- a/test/realtime/sync.test.js +++ b/test/realtime/sync.test.js @@ -47,7 +47,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async channelName = 'syncexistingset', channel = realtime.channels.get(channelName); - channel.processMessage( + await channel.processMessage( createPM({ action: 11, channel: channelName, @@ -63,27 +63,33 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async async.series( [ function (cb) { - channel.processMessage({ - action: 16, - channel: channelName, - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - { - action: 'present', - clientId: 'two', - connectionId: 'two_connid', - id: 'two_connid:0:0', - timestamp: 1e12, - }, - ], - }); - cb(); + channel + .processMessage({ + action: 16, + channel: channelName, + presence: [ + { + action: 'present', + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + { + action: 'present', + clientId: 'two', + connectionId: 'two_connid', + id: 'two_connid:0:0', + timestamp: 1e12, + }, + ], + }) + .then(function () { + cb(); + }) + .catch(function (err) { + cb(err); + }); }, function (cb) { channel.presence.get(function (err, results) { @@ -100,27 +106,33 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }, function (cb) { /* Trigger another sync. Two has gone without so much as a `leave` message! */ - channel.processMessage({ - action: 16, - channel: channelName, - presence: [ - { - action: 'present', - clientId: 'one', - connectionId: 'one_connid', - id: 'one_connid:0:0', - timestamp: 1e12, - }, - { - action: 'present', - clientId: 'three', - connectionId: 'three_connid', - id: 'three_connid:0:0', - timestamp: 1e12, - }, - ], - }); - cb(); + channel + .processMessage({ + action: 16, + channel: channelName, + presence: [ + { + action: 'present', + clientId: 'one', + connectionId: 'one_connid', + id: 'one_connid:0:0', + timestamp: 1e12, + }, + { + action: 'present', + clientId: 'three', + connectionId: 'three_connid', + id: 'three_connid:0:0', + timestamp: 1e12, + }, + ], + }) + .then(function () { + cb(); + }) + .catch(function (err) { + cb(err); + }); }, function (cb) { channel.presence.get(function (err, results) { @@ -155,7 +167,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async channelName = 'sync_member_arrives_in_middle', channel = realtime.channels.get(channelName); - channel.processMessage( + await channel.processMessage( createPM({ action: 11, channel: channelName, @@ -164,7 +176,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async ); /* First sync */ - channel.processMessage({ + await channel.processMessage({ action: 16, channel: channelName, presence: [ @@ -179,7 +191,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }); /* A second sync, this time in multiple parts, with a presence message in the middle */ - channel.processMessage({ + await channel.processMessage({ action: 16, channel: channelName, channelSerial: 'serial:cursor', @@ -194,7 +206,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async ], }); - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, presence: [ @@ -208,7 +220,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async ], }); - channel.processMessage({ + await channel.processMessage({ action: 16, channel: channelName, channelSerial: 'serial:', @@ -257,7 +269,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async channelName = 'sync_member_arrives_normally_after_came_in_sync', channel = realtime.channels.get(channelName); - channel.processMessage( + await channel.processMessage( createPM({ action: 11, channel: channelName, @@ -265,7 +277,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }) ); - channel.processMessage({ + await channel.processMessage({ action: 16, channel: channelName, channelSerial: 'serial:cursor', @@ -280,7 +292,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async ], }); - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, presence: [ @@ -294,7 +306,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async ], }); - channel.processMessage({ + await channel.processMessage({ action: 16, channel: channelName, channelSerial: 'serial:', @@ -340,7 +352,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async channelName = 'sync_member_arrives_normally_before_comes_in_sync', channel = realtime.channels.get(channelName); - channel.processMessage( + await channel.processMessage( createPM({ action: 11, channel: channelName, @@ -348,7 +360,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }) ); - channel.processMessage({ + await channel.processMessage({ action: 16, channel: channelName, channelSerial: 'serial:cursor', @@ -363,7 +375,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async ], }); - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, presence: [ @@ -377,7 +389,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async ], }); - channel.processMessage({ + await channel.processMessage({ action: 16, channel: channelName, channelSerial: 'serial:', @@ -424,7 +436,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async channelName = 'sync_ordering', channel = realtime.channels.get(channelName); - channel.processMessage( + await channel.processMessage( createPM({ action: 11, channel: channelName, @@ -432,7 +444,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async ); /* One enters */ - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, id: 'one_connid:1', @@ -447,7 +459,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }); /* An earlier leave from one (should be ignored) */ - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, connectionId: 'one_connid', @@ -462,7 +474,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }); /* One adds some data in a newer msgSerial */ - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, connectionId: 'one_connid', @@ -478,7 +490,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }); /* Two enters */ - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, connectionId: 'two_connid', @@ -493,7 +505,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }); /* Two updates twice in the same message */ - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, connectionId: 'two_connid', @@ -514,7 +526,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }); /* Three enters */ - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, connectionId: 'three_connid', @@ -530,7 +542,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async /* Synthesized leave for three (with earlier msgSerial, incompatible id, * and later timestamp) */ - channel.processMessage({ + await channel.processMessage({ action: 14, channel: channelName, connectionId: 'synthesized', @@ -614,12 +626,12 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }, function (cb) { var originalProcessMessage = syncerChannel.processMessage; - syncerChannel.processMessage = function (message) { - originalProcessMessage.apply(this, arguments); + syncerChannel.processMessage = async function (message) { + await originalProcessMessage.apply(this, arguments); /* Inject an additional presence message after the first sync */ if (message.action === 16) { syncerChannel.processMessage = originalProcessMessage; - syncerChannel.processMessage({ + await syncerChannel.processMessage({ action: 14, id: 'messageid:0', connectionId: 'connid',