Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster joins: Avoid starting duplicate partial state syncs #14844

Merged
merged 8 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14844.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add check to avoid starting duplicate partial state syncs.
90 changes: 82 additions & 8 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Iterable,
List,
Optional,
Set,
Tuple,
Union,
)
Expand Down Expand Up @@ -171,12 +172,23 @@ def __init__(self, hs: "HomeServer"):

self.third_party_event_rules = hs.get_third_party_event_rules()

# Tracks running partial state syncs by room ID.
# Partial state syncs currently only run on the main process, so it's okay to
# track them in-memory for now.
self._active_partial_state_syncs: Set[str] = set()
# Tracks partial state syncs we may want to restart.
# A dictionary mapping room IDs to (initial destination, other destinations)
# tuples.
self._partial_state_syncs_to_restart: Dict[
str, Tuple[Optional[str], Collection[str]]
] = {}

# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
# were shut down.
if not hs.config.worker.worker_app:
run_as_background_process(
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
"resume_sync_partial_state_room", self._resume_partial_state_room_sync
)

@trace
Expand Down Expand Up @@ -679,9 +691,7 @@ async def do_invite_join(
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
self._start_partial_state_room_sync(
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
Expand Down Expand Up @@ -1660,20 +1670,84 @@ async def get_room_complexity(
# well.
return None

async def _resume_sync_partial_state_room(self) -> None:
async def _resume_partial_state_room_sync(self) -> None:
"""Resumes resyncing of all partial-state rooms after a restart."""
assert not self.config.worker.worker_app

partial_state_rooms = await self.store.get_partial_state_room_resync_info()
for room_id, resync_info in partial_state_rooms.items():
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
self._start_partial_state_room_sync(
initial_destination=resync_info.joined_via,
other_destinations=resync_info.servers_in_room,
room_id=room_id,
)

def _start_partial_state_room_sync(
self,
initial_destination: Optional[str],
other_destinations: Collection[str],
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
room_id: str,
) -> None:
"""Starts the background process to resync the state of a partial-state room,
if it is not already running.

Args:
initial_destination: the initial homeserver to pull the state from
other_destinations: other homeservers to try to pull the state from, if
`initial_destination` is unavailable
room_id: room to be resynced
"""

async def _sync_partial_state_room_wrapper() -> None:
if room_id in self._active_partial_state_syncs:
# Mark the partial state sync as possibly needing a restart.
# We want to do this when the partial state sync is about to fail
# because we've been kicked from the room, but we rejoin before the sync
# finishes falling over.
self._partial_state_syncs_to_restart[room_id] = (
initial_destination,
other_destinations,
)
return

self._active_partial_state_syncs.add(room_id)

try:
await self._sync_partial_state_room(
initial_destination=initial_destination,
other_destinations=other_destinations,
room_id=room_id,
)
finally:
# Check whether the room is still partial stated, while we still claim
# to be the active sync. Usually, the partial state flag will be gone,
# unless we left and rejoined the room, or the sync failed.
is_still_partial_state_room = await self.store.is_partial_state_room(
room_id
)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
self._active_partial_state_syncs.remove(room_id)

# Check if we need to restart the sync.
if room_id in self._partial_state_syncs_to_restart:
(
restart_initial_destination,
restart_other_destinations,
) = self._partial_state_syncs_to_restart[room_id]

# Clear the restart flag.
self._partial_state_syncs_to_restart.pop(room_id, None)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

if is_still_partial_state_room:
self._start_partial_state_room_sync(
initial_destination=restart_initial_destination,
other_destinations=restart_other_destinations,
room_id=room_id,
)

DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
run_as_background_process(
desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
)

async def _sync_partial_state_room(
self,
initial_destination: Optional[str],
Expand Down