Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't send normal presence updates over federation replication stream #9828

Merged
merged 7 commits into from
Apr 19, 2021
29 changes: 24 additions & 5 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,11 @@ def __init__(self, hs: "HomeServer"):
self.state = hs.get_state_handler()

self._federation = None
if hs.should_send_federation():
if hs.should_send_federation() or not hs.config.worker_app:
self._federation = hs.get_federation_sender()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this bit going away again soon? If so, it's ok. If not, it could do with more comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it is.


self._send_federation = hs.should_send_federation()

self._busy_presence_enabled = hs.config.experimental.msc3026_enabled

active_presence = self.store.take_presence_startup_info()
Expand Down Expand Up @@ -262,9 +264,12 @@ async def maybe_send_presence_to_interested_destinations(
destinations that are interested.
"""

if not self._federation:
if not self._send_federation:
return

# If this worker sends federation we must have a FederationSender.
assert self._federation

hosts_and_states = await get_interested_remotes(
self.store,
self.presence_router,
Expand Down Expand Up @@ -491,7 +496,6 @@ def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
self.federation = hs.get_federation_sender()
self._presence_enabled = hs.config.use_presence

federation_registry = hs.get_federation_registry()
Expand Down Expand Up @@ -698,6 +702,13 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
self.unpersisted_users_changes |= {s.user_id for s in new_states}
self.unpersisted_users_changes -= set(to_notify.keys())

# Check if we need to resend any presence states to remote hosts. We
# only do this for states that haven't been updated in a while to
# ensure that the remote host doesn't time the presence state out.
#
# Note that since these are states that have *not* been updated,
# they won't get sent down the normal presence replication stream,
# and so we have to explicitly send them via the federation stream.
to_federation_ping = {
user_id: state
for user_id, state in to_federation_ping.items()
Expand All @@ -713,8 +724,12 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
self.state,
)

# Since this is master we know that we have a federation sender or
# queue, and so this will be defined.
assert self._federation

for destinations, states in hosts_and_states:
self.federation.send_presence_to_destinations(states, destinations)
self._federation.send_presence_to_destinations(states, destinations)

async def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
Expand Down Expand Up @@ -1193,9 +1208,13 @@ async def _handle_state_delta(self, deltas):
user_presence_states
)

# Since this is master we know that we have a federation sender or
# queue, and so this will be defined.
assert self._federation

# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self.federation.send_presence_to_destinations(
self._federation.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)

Expand Down