From 1f2a2a0d41034b12f612b01a5fb816fb220a8b41 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 4 Aug 2023 11:24:22 -0400 Subject: [PATCH] Track presence state per-device and amalgamate to a user state. Tracks presence on an individual per-device basis and combines the per-device state into a per-user state. This should help in situations where a user has two devices with conflicting status (e.g. one is syncing with unavailable and one is syncing with online). The tie-breaking is done by priority: BUSY > ONLINE > UNAVAILABLE > OFFLINE --- changelog.d/16066.bugfix | 2 + synapse/api/presence.py | 10 ++++ synapse/handlers/events.py | 1 + synapse/handlers/presence.py | 77 ++++++++++++++++++++++++--- synapse/replication/http/presence.py | 5 +- synapse/rest/client/presence.py | 2 +- synapse/rest/client/sync.py | 1 + tests/handlers/test_presence.py | 78 ++++++++++++++++++++-------- 8 files changed, 147 insertions(+), 29 deletions(-) create mode 100644 changelog.d/16066.bugfix diff --git a/changelog.d/16066.bugfix b/changelog.d/16066.bugfix new file mode 100644 index 000000000000..a2c4e7ffeb35 --- /dev/null +++ b/changelog.d/16066.bugfix @@ -0,0 +1,2 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. + diff --git a/synapse/api/presence.py b/synapse/api/presence.py index b80aa83cb3d6..70b9bbe17974 100644 --- a/synapse/api/presence.py +++ b/synapse/api/presence.py @@ -20,6 +20,16 @@ from synapse.types import JsonDict +@attr.s(slots=True, auto_attribs=True) +class UserDevicePresenceState: + user_id: str + device_id: Optional[str] + state: str + last_active_ts: int + last_user_sync_ts: int + status_msg: Optional[str] + + @attr.s(slots=True, frozen=True, auto_attribs=True) class UserPresenceState: """Represents the current presence state of the user. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 33359f6ed748..d12803bf0f31 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -67,6 +67,7 @@ async def get_stream( context = await presence_handler.user_syncing( requester.user.to_string(), + requester.device_id, affect_presence=affect_presence, presence_state=PresenceState.ONLINE, ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cd7df0525f4f..02405a4847d4 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -49,7 +49,7 @@ import synapse.metrics from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError -from synapse.api.presence import UserPresenceState +from synapse.api.presence import UserDevicePresenceState, UserPresenceState from synapse.appservice import ApplicationService from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background @@ -150,11 +150,16 @@ def __init__(self, hs: "HomeServer"): self._busy_presence_enabled = hs.config.experimental.msc3026_enabled active_presence = self.store.take_presence_startup_info() + # The combine status across all user devices. self.user_to_current_state = {state.user_id: state for state in active_presence} @abc.abstractmethod async def user_syncing( - self, user_id: str, affect_presence: bool, presence_state: str + self, + user_id: str, + device_id: Optional[str], + affect_presence: bool, + presence_state: str, ) -> ContextManager[None]: """Returns a context manager that should surround any stream requests from the user. @@ -241,6 +246,7 @@ async def current_state_for_user(self, user_id: str) -> UserPresenceState: async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False, @@ -368,7 +374,9 @@ async def send_full_presence_to_users(self, user_ids: StrCollection) -> None: # We set force_notify=True here so that this presence update is guaranteed to # increment the presence stream ID (which resending the current user's presence # otherwise would not do). - await self.set_state(UserID.from_string(user_id), state, force_notify=True) + await self.set_state( + UserID.from_string(user_id), None, state, force_notify=True + ) async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: raise NotImplementedError( @@ -472,7 +480,11 @@ def send_stop_syncing(self) -> None: self.send_user_sync(user_id, False, last_sync_ms) async def user_syncing( - self, user_id: str, affect_presence: bool, presence_state: str + self, + user_id: str, + device_id: Optional[str], + affect_presence: bool, + presence_state: str, ) -> ContextManager[None]: """Record that a user is syncing. @@ -490,7 +502,10 @@ async def user_syncing( # what the spec wants: see comment in the BasePresenceHandler version # of this function. await self.set_state( - UserID.from_string(user_id), {"presence": presence_state}, True + UserID.from_string(user_id), + device_id, + {"presence": presence_state}, + True, ) curr_sync = self._user_to_num_current_syncs.get(user_id, 0) @@ -586,6 +601,7 @@ def get_currently_syncing_users_for_replication(self) -> Iterable[str]: async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False, @@ -623,6 +639,7 @@ async def set_state( await self._set_state_client( instance_name=self._presence_writer_instance, user_id=user_id, + device_id=device_id, state=state, ignore_status_msg=ignore_status_msg, force_notify=force_notify, @@ -755,6 +772,11 @@ def run_persister() -> Awaitable[None]: self._event_pos = self.store.get_room_max_stream_ordering() self._event_processing = False + # The per-device presence state, maps user to devices to per-device presence state. + self.user_to_device_to_current_state: Dict[ + str, Dict[Optional[str], UserDevicePresenceState] + ] = {} + async def _on_shutdown(self) -> None: """Gets called when shutting down. This lets us persist any updates that we haven't yet persisted, e.g. updates that only changes some internal @@ -973,6 +995,7 @@ async def bump_presence_active_time(self, user: UserID) -> None: async def user_syncing( self, user_id: str, + device_id: Optional[str], affect_presence: bool = True, presence_state: str = PresenceState.ONLINE, ) -> ContextManager[None]: @@ -985,6 +1008,7 @@ async def user_syncing( Args: user_id + device_id affect_presence: If false this function will be a no-op. Useful for streams that are not associated with an actual client that is being used by a user. @@ -1010,7 +1034,10 @@ async def user_syncing( # updated always, which is not what the spec calls for, but synapse has done # this for... forever, I think. await self.set_state( - UserID.from_string(user_id), {"presence": presence_state}, True + UserID.from_string(user_id), + device_id, + {"presence": presence_state}, + True, ) # Retrieve the new state for the logic below. This should come from the # in-memory cache. @@ -1213,6 +1240,7 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None: async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False, @@ -1221,6 +1249,7 @@ async def set_state( Args: target_user: The ID of the user to set the presence state of. + device_id: The optional device ID. state: The presence state as a JSON dictionary. ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. If False, the user's current status will be updated. @@ -1249,6 +1278,41 @@ async def set_state( prev_state = await self.current_state_for_user(user_id) + # Always update the device specific information. + device_state = self.user_to_device_to_current_state.setdefault( + user_id, {} + ).setdefault( + device_id, + UserDevicePresenceState( + user_id, + device_id, + presence, + last_active_ts=self.clock.time_msec(), + last_user_sync_ts=self.clock.time_msec(), + status_msg=None, + ), + ) + device_state.state = presence + if presence: + device_state.status_msg = status_msg + device_state.last_active_ts = self.clock.time_msec() + device_state.last_user_sync_ts = self.clock.time_msec() + + # Based on (all) the user's devices calculate the new presence state. + presence_by_priority = { + PresenceState.BUSY: 4, + PresenceState.ONLINE: 3, + PresenceState.UNAVAILABLE: 2, + PresenceState.OFFLINE: 1, + } + for device_state in self.user_to_device_to_current_state[user_id].values(): + if ( + presence_by_priority[device_state.state] + > presence_by_priority[presence] + ): + presence = device_state.state + + # The newly updated status as an amalgamation of all the device statuses. new_fields = {"state": presence} if not ignore_status_msg: @@ -1962,6 +2026,7 @@ def handle_update( # If the users are ours then we want to set up a bunch of timers # to time things out. if is_mine: + # TODO Maybe don't do this if currently active? if new_state.state == PresenceState.ONLINE: # Idle timer wheel_timer.insert( diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index db16aac9c206..dd5943dff799 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from twisted.web.server import Request @@ -95,11 +95,13 @@ def __init__(self, hs: "HomeServer"): @staticmethod async def _serialize_payload( # type: ignore[override] user_id: str, + device_id: Optional[str], state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False, ) -> JsonDict: return { + "device_id": device_id, "state": state, "ignore_status_msg": ignore_status_msg, "force_notify": force_notify, @@ -110,6 +112,7 @@ async def _handle_request( # type: ignore[override] ) -> Tuple[int, JsonDict]: await self._presence_handler.set_state( UserID.from_string(user_id), + content["device_id"], content["state"], content["ignore_status_msg"], content["force_notify"], diff --git a/synapse/rest/client/presence.py b/synapse/rest/client/presence.py index 8e193330f8bc..d578faa96984 100644 --- a/synapse/rest/client/presence.py +++ b/synapse/rest/client/presence.py @@ -97,7 +97,7 @@ async def on_PUT( raise SynapseError(400, "Unable to parse state") if self._use_presence: - await self.presence_handler.set_state(user, state) + await self.presence_handler.set_state(user, requester.device_id, state) return 200, {} diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index d7854ed4fd9d..42bdd3bb108b 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -205,6 +205,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: context = await self.presence_handler.user_syncing( user.to_string(), + requester.device_id, affect_presence=affect_presence, presence_state=set_presence, ) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index fd66d573d221..72cc2947c811 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -559,7 +559,7 @@ def test_user_goes_offline_by_timeout_status_msg_remain(self) -> None: # Mark user as online self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg + user_id, None, PresenceState.ONLINE, status_msg ) # Check that if we wait a while without telling the handler the user has @@ -591,13 +591,13 @@ def test_user_goes_offline_manually_with_no_status_msg(self) -> None: # Mark user as online self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg + user_id, None, PresenceState.ONLINE, status_msg ) # Mark user as offline self.get_success( self.presence_handler.set_state( - UserID.from_string(user_id), {"presence": PresenceState.OFFLINE} + UserID.from_string(user_id), None, {"presence": PresenceState.OFFLINE} ) ) @@ -616,12 +616,12 @@ def test_user_goes_offline_manually_with_status_msg(self) -> None: # Mark user as online self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg + user_id, None, PresenceState.ONLINE, status_msg ) # Mark user as offline self._set_presencestate_with_status_msg( - user_id, PresenceState.OFFLINE, "And now here." + user_id, None, PresenceState.OFFLINE, "And now here." ) def test_user_reset_online_with_no_status(self) -> None: @@ -633,13 +633,13 @@ def test_user_reset_online_with_no_status(self) -> None: # Mark user as online self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg + user_id, None, PresenceState.ONLINE, status_msg ) # Mark user as online again self.get_success( self.presence_handler.set_state( - UserID.from_string(user_id), {"presence": PresenceState.ONLINE} + UserID.from_string(user_id), None, {"presence": PresenceState.ONLINE} ) ) @@ -659,11 +659,11 @@ def test_set_presence_with_status_msg_none(self) -> None: # Mark user as online self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg + user_id, None, PresenceState.ONLINE, status_msg ) # Mark user as online and `status_msg = None` - self._set_presencestate_with_status_msg(user_id, PresenceState.ONLINE, None) + self._set_presencestate_with_status_msg(user_id, None, PresenceState.ONLINE, None) def test_set_presence_from_syncing_not_set(self) -> None: """Test that presence is not set by syncing if affect_presence is false""" @@ -671,11 +671,13 @@ def test_set_presence_from_syncing_not_set(self) -> None: status_msg = "I'm here!" self._set_presencestate_with_status_msg( - user_id, PresenceState.UNAVAILABLE, status_msg + user_id, None, PresenceState.UNAVAILABLE, status_msg ) self.get_success( - self.presence_handler.user_syncing(user_id, False, PresenceState.ONLINE) + self.presence_handler.user_syncing( + user_id, None, False, PresenceState.ONLINE + ) ) state = self.get_success( @@ -692,11 +694,35 @@ def test_set_presence_from_syncing_is_set(self) -> None: status_msg = "I'm here!" self._set_presencestate_with_status_msg( - user_id, PresenceState.UNAVAILABLE, status_msg + user_id, None, PresenceState.UNAVAILABLE, status_msg ) self.get_success( - self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE) + self.presence_handler.user_syncing( + user_id, None, True, PresenceState.ONLINE + ) + ) + + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + # we should now be online + self.assertEqual(state.state, PresenceState.ONLINE) + + def test_set_presence_from_syncing_multi_device(self) -> None: + """Test that presence is set to the highest priority of all devices.""" + user_id = "@test:server" + + self.get_success( + self.presence_handler.user_syncing( + user_id, "dev-1", True, PresenceState.ONLINE + ) + ) + + self.get_success( + self.presence_handler.user_syncing( + user_id, "dev-2", True, PresenceState.UNAVAILABLE + ) ) state = self.get_success( @@ -711,11 +737,13 @@ def test_set_presence_from_syncing_keeps_status(self) -> None: status_msg = "I'm here!" self._set_presencestate_with_status_msg( - user_id, PresenceState.UNAVAILABLE, status_msg + user_id, None, PresenceState.UNAVAILABLE, status_msg ) self.get_success( - self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE) + self.presence_handler.user_syncing( + user_id, None, True, PresenceState.ONLINE + ) ) state = self.get_success( @@ -755,14 +783,14 @@ def test_set_presence_from_syncing_keeps_busy( ) # Set presence to BUSY - self._set_presencestate_with_status_msg(user_id, PresenceState.BUSY, status_msg) + self._set_presencestate_with_status_msg(user_id, None, PresenceState.BUSY, status_msg) # Perform a sync with a presence state other than busy. This should NOT change # our presence status; we only change from busy if we explicitly set it via # /presence/*. self.get_success( worker_to_sync_against.get_presence_handler().user_syncing( - user_id, True, PresenceState.ONLINE + user_id, None, True, PresenceState.ONLINE ) ) @@ -774,18 +802,20 @@ def test_set_presence_from_syncing_keeps_busy( self.assertEqual(state.state, PresenceState.BUSY) def _set_presencestate_with_status_msg( - self, user_id: str, state: str, status_msg: Optional[str] + self, user_id: str, device_id: Optional[str], state: str, status_msg: Optional[str] ) -> None: """Set a PresenceState and status_msg and check the result. Args: user_id: User for that the status is to be set. + device_id: state: The new PresenceState. status_msg: Status message that is to be set. """ self.get_success( self.presence_handler.set_state( UserID.from_string(user_id), + device_id, {"presence": state, "status_msg": status_msg}, ) ) @@ -1032,7 +1062,9 @@ def test_remote_joins(self) -> None: # Mark test2 as online, test will be offline with a last_active of 0 self.get_success( self.presence_handler.set_state( - UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test2:server"), + None, + {"presence": PresenceState.ONLINE}, ) ) self.reactor.pump([0]) # Wait for presence updates to be handled @@ -1079,7 +1111,9 @@ def test_remote_gets_presence_when_local_user_joins(self) -> None: # Mark test as online self.get_success( self.presence_handler.set_state( - UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test:server"), + None, + {"presence": PresenceState.ONLINE}, ) ) @@ -1087,7 +1121,9 @@ def test_remote_gets_presence_when_local_user_joins(self) -> None: # Note we don't join them to the room yet self.get_success( self.presence_handler.set_state( - UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test2:server"), + None, + {"presence": PresenceState.ONLINE}, ) )