Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Faster joins: omit partial rooms from eager syncs until the resync co…
Browse files Browse the repository at this point in the history
…mpletes (#14870)

* Allow `AbstractSet` in `StrCollection`

Or else frozensets are excluded. This will be useful in an upcoming
commit where I plan to change a function that accepts `List[str]` to
accept `StrCollection` instead.

* `rooms_to_exclude` -> `rooms_to_exclude_globally`

I am about to make use of this exclusion mechanism to exclude rooms for
a specific user and a specific sync. This rename helps to clarify the
distinction between the global config and the rooms to exclude for a
specific sync.

* Better function names for internal sync methods

* Track a list of excluded rooms on SyncResultBuilder

I plan to feed a list of partially stated rooms for this sync to ignore

* Exclude partial state rooms during eager sync

using the mechanism established in the previous commit

* Track un-partial-state stream in sync tokens

So that we can work out which rooms have become fully-stated during a
given sync period.

* Fix mutation of `@cached` return value

This was fouling up a complement test added alongside this PR.
Excluding a room would mean the set of forgotten rooms in the cache
would be extended. This means that room could be erroneously considered
forgotten in the future.

Introduced in #12310, Synapse 1.57.0. I don't think this had any
user-visible side effects (until now).

* SyncResultBuilder: track rooms to force as newly joined

Similar plan as before. We've omitted rooms from certain sync responses;
now we establish the mechanism to reintroduce them into future syncs.

* Read new field, to present rooms as newly joined

* Force un-partial-stated rooms to be newly-joined

for eager incremental syncs only, provided they're still fully stated

* Notify user stream listeners to wake up long polling syncs

* Changelog

* Typo fix

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Unnecessary list cast

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Rephrase comment

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Another comment

Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>

* Fixup merge(?)

* Poke notifier when receiving un-partial-stated msg over replication

* Fixup merge whoops

Thanks MV :)

Co-authored-by: Mathieu Velen <mathieuv@matrix.org>

Co-authored-by: Mathieu Velten <mathieuv@matrix.org>
Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 23, 2023
1 parent 5e75771 commit 80d4406
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 44 deletions.
1 change: 1 addition & 0 deletions changelog.d/14870.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster joins: allow non-lazy-loading ("eager") syncs to complete after a partial join by omitting partial state rooms until they become fully stated.
15 changes: 5 additions & 10 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1868,22 +1868,17 @@ async def _sync_partial_state_room(

async with self._is_partial_state_room_linearizer.queue(room_id):
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
new_stream_id = await self.store.clear_partial_state_room(room_id)

# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

if success:
if new_stream_id is not None:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)

# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

await self._notifier.on_un_partial_stated_room(
room_id, new_stream_id
)
return

# we raced against more events arriving with partial state. Go round
Expand Down
65 changes: 55 additions & 10 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def __init__(self, hs: "HomeServer"):
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)

self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync

async def wait_for_sync_for_user(
self,
Expand Down Expand Up @@ -1340,7 +1340,10 @@ async def generate_sync_result(
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
user_id,
since_token.room_key,
now_token.room_key,
self.rooms_to_exclude_globally,
)

mem_last_change_by_room_id: Dict[str, EventBase] = {}
Expand Down Expand Up @@ -1375,12 +1378,39 @@ async def generate_sync_result(
else:
mutable_joined_room_ids.discard(room_id)

# Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
if not sync_config.filter_collection.lazy_load_members():
# Non-lazy syncs should never include partially stated rooms.
# Exclude all partially stated rooms from this sync.
for room_id in mutable_joined_room_ids:
if await self.store.is_partial_state_room(room_id):
mutable_rooms_to_exclude.add(room_id)

# Incremental eager syncs should additionally include rooms that
# - we are joined to
# - are full-stated
# - became fully-stated at some point during the sync period
# (These rooms will have been omitted during a previous eager sync.)
forced_newly_joined_room_ids = set()
if since_token and not sync_config.filter_collection.lazy_load_members():
un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
since_token.un_partial_stated_rooms_key,
now_token.un_partial_stated_rooms_key,
mutable_joined_room_ids,
)
)
for room_id in un_partial_stated_rooms:
if not await self.store.is_partial_state_room(room_id):
forced_newly_joined_room_ids.add(room_id)

# Now we have our list of joined room IDs, exclude as configured and freeze
joined_room_ids = frozenset(
(
room_id
for room_id in mutable_joined_room_ids
if room_id not in self.rooms_to_exclude
if room_id not in mutable_rooms_to_exclude
)
)

Expand All @@ -1397,6 +1427,8 @@ async def generate_sync_result(
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
excluded_room_ids=frozenset(mutable_rooms_to_exclude),
forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
membership_change_events=membership_change_events,
)

Expand Down Expand Up @@ -1834,14 +1866,16 @@ async def _generate_sync_entry_for_rooms(
# 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
room_changes = await self._get_room_changes_for_incremental_sync(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
room_changes = await self._get_room_changes_for_initial_sync(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_tags_for_user(user_id)

log_kv({"rooms_changed": len(room_changes.room_entries)})
Expand Down Expand Up @@ -1900,7 +1934,7 @@ async def _have_rooms_changed(

assert since_token

if membership_change_events:
if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
return True

stream_id = since_token.room_key.stream
Expand All @@ -1909,7 +1943,7 @@ async def _have_rooms_changed(
return True
return False

async def _get_rooms_changed(
async def _get_room_changes_for_incremental_sync(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
Expand Down Expand Up @@ -1947,7 +1981,9 @@ async def _get_rooms_changed(
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)

newly_joined_rooms: List[str] = []
newly_joined_rooms: List[str] = list(
sync_result_builder.forced_newly_joined_room_ids
)
newly_left_rooms: List[str] = []
room_entries: List[RoomSyncResultBuilder] = []
invited: List[InvitedSyncResult] = []
Expand Down Expand Up @@ -2153,7 +2189,7 @@ async def _get_rooms_changed(
newly_left_rooms,
)

async def _get_all_rooms(
async def _get_room_changes_for_initial_sync(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
Expand All @@ -2178,7 +2214,7 @@ async def _get_all_rooms(
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
membership_list=Membership.LIST,
excluded_rooms=self.rooms_to_exclude,
excluded_rooms=sync_result_builder.excluded_room_ids,
)

room_entries = []
Expand Down Expand Up @@ -2549,6 +2585,13 @@ class SyncResultBuilder:
since_token: The token supplied by user, or None.
now_token: The token to sync up to.
joined_room_ids: List of rooms the user is joined to
excluded_room_ids: Set of room ids we should omit from the /sync response.
forced_newly_joined_room_ids:
Rooms that should be presented in the /sync response as if they were
newly joined during the sync period, even if that's not the case.
(This is useful if the room was previously excluded from a /sync response,
and now the client should be made aware of it.)
Only used by incremental syncs.
# The following mirror the fields in a sync response
presence
Expand All @@ -2565,6 +2608,8 @@ class SyncResultBuilder:
since_token: Optional[StreamToken]
now_token: StreamToken
joined_room_ids: FrozenSet[str]
excluded_room_ids: FrozenSet[str]
forced_newly_joined_room_ids: FrozenSet[str]
membership_change_events: List[EventBase]

presence: List[UserPresenceState] = attr.Factory(list)
Expand Down
26 changes: 26 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,32 @@ async def on_new_room_events(
event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token)

async def on_un_partial_stated_room(
self,
room_id: str,
new_token: int,
) -> None:
"""Used by the resync background processes to wake up all listeners
of this room when it is un-partial-stated.
It will also notify replication listeners of the change in stream.
"""

# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
)
except Exception:
logger.exception("Failed to notify listener")

# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()

async def notify_new_room_events(
self,
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
Expand Down
1 change: 1 addition & 0 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ async def on_rdata(
self._state_storage_controller.notify_room_un_partial_stated(
row.room_id
)
await self.notifier.on_un_partial_stated_room(row.room_id, token)
elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn(
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_stated_rooms_key=0,
)

return events[:limit], next_token
Expand Down
47 changes: 41 additions & 6 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -1294,10 +1295,44 @@ def get_un_partial_stated_rooms_token(self, instance_name: str) -> int:
instance_name
)

async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int, room_ids: Collection[str]
) -> Set[str]:
"""Get all rooms that got un partial stated between `last_id` exclusive and
`current_id` inclusive.
Returns:
The list of room ids.
"""

if last_id == current_id:
return set()

def _get_un_partial_stated_rooms_between_txn(
txn: LoggingTransaction,
) -> Set[str]:
sql = """
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ? AND
"""

clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)

txn.execute(sql + clause, [last_id, current_id] + args)

return {r[0] for r in txn}

return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_between",
_get_un_partial_stated_rooms_between_txn,
)

async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream.
"""Get updates for un partial stated rooms replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
Expand Down Expand Up @@ -2304,16 +2339,16 @@ async def unblock_room(self, room_id: str) -> None:
(room_id,),
)

async def clear_partial_state_room(self, room_id: str) -> bool:
async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
"""Clears the partial state flag for a room.
Args:
room_id: The room whose partial state flag is to be cleared.
Returns:
`True` if the partial state flag has been cleared successfully.
The corresponding stream id for the un-partial-stated rooms stream.
`False` if the partial state flag could not be cleared because the room
`None` if the partial state flag could not be cleared because the room
still contains events with partial state.
"""
try:
Expand All @@ -2324,15 +2359,15 @@ async def clear_partial_state_room(self, room_id: str) -> bool:
room_id,
un_partial_state_room_stream_id,
)
return True
return un_partial_state_room_stream_id
except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events.
logger.info(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
)
return False
return None

def _clear_partial_state_room_txn(
self,
Expand Down
19 changes: 14 additions & 5 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
from typing import (
TYPE_CHECKING,
AbstractSet,
Collection,
Dict,
FrozenSet,
Expand Down Expand Up @@ -47,7 +48,13 @@
ProfileInfo,
RoomsForUser,
)
from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain_from_id
from synapse.types import (
JsonDict,
PersistedEventPosition,
StateMap,
StrCollection,
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
Expand Down Expand Up @@ -385,7 +392,7 @@ async def get_rooms_for_local_user_where_membership_is(
self,
user_id: str,
membership_list: Collection[str],
excluded_rooms: Optional[List[str]] = None,
excluded_rooms: StrCollection = (),
) -> List[RoomsForUser]:
"""Get all the rooms for this *local* user where the membership for this user
matches one in the membership list.
Expand All @@ -412,10 +419,12 @@ async def get_rooms_for_local_user_where_membership_is(
)

# Now we filter out forgotten and excluded rooms
rooms_to_exclude: Set[str] = await self.get_forgotten_rooms_for_user(user_id)
rooms_to_exclude = await self.get_forgotten_rooms_for_user(user_id)

if excluded_rooms is not None:
rooms_to_exclude.update(set(excluded_rooms))
# Take a copy to avoid mutating the in-cache set
rooms_to_exclude = set(rooms_to_exclude)
rooms_to_exclude.update(excluded_rooms)

return [room for room in rooms if room.room_id not in rooms_to_exclude]

Expand Down Expand Up @@ -1169,7 +1178,7 @@ def f(txn: LoggingTransaction) -> int:
return count == 0

@cached()
async def get_forgotten_rooms_for_user(self, user_id: str) -> Set[str]:
async def get_forgotten_rooms_for_user(self, user_id: str) -> AbstractSet[str]:
"""Gets all rooms the user has forgotten.
Args:
Expand Down
Loading

0 comments on commit 80d4406

Please sign in to comment.