Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Non lazy loading sync not blocking during fast join #14831

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions changelog.d/14831.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster joins: non lazy-loading syncs will return immediately after a faster join, by omitting partial state rooms until we acquire their full state.
11 changes: 6 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,15 +1726,16 @@ async def _sync_partial_state_room(
await self._device_handler.handle_room_un_partial_stated(room_id)

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
new_stream_id = await self.store.clear_partial_state_room(room_id)
if new_stream_id is not None:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# Poke the notifier so that other workers see the write to
# the un-partial-stated rooms stream.
self._notifier.notify_replication()

await self._notifier.on_un_partial_stated_room(
room_id, new_stream_id
)

# TODO(faster_joins) update room stats and user directory?
# https://github.com/matrix-org/synapse/issues/12814
Expand Down
71 changes: 64 additions & 7 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1613,9 +1613,9 @@ async def _generate_sync_entry_for_to_device(
now_token = sync_result_builder.now_token
since_stream_id = 0
if sync_result_builder.since_token is not None:
since_stream_id = int(sync_result_builder.since_token.to_device_key)
since_stream_id = sync_result_builder.since_token.to_device_key

if device_id is not None and since_stream_id != int(now_token.to_device_key):
if device_id is not None and since_stream_id != now_token.to_device_key:
messages, stream_id = await self.store.get_messages_for_device(
user_id, device_id, since_stream_id, now_token.to_device_key
)
Expand Down Expand Up @@ -1684,7 +1684,7 @@ async def _generate_sync_entry_for_account_data(
)

push_rules_changed = await self.store.have_push_rules_changed_for_user(
user_id, int(since_token.push_rules_key)
user_id, since_token.push_rules_key
)

if push_rules_changed:
Expand Down Expand Up @@ -1817,11 +1817,35 @@ async def _generate_sync_entry_for_rooms(
)
sync_result_builder.now_token = now_token

# Retrieve rooms that got un partial stated in the meantime, only useful in case
# of a non lazy-loading-members sync.
# We also skip calculating that in case of initial sync since we don't need it.
Copy link
Contributor

@DMRobertson DMRobertson Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following you here. I thought we want to exclude partially-joined rooms from all non-lazy load syncs. Why should that not also apply to an initial, non-lazy sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We skip calculating the rooms that got unpartialstated because in case of initial, we have another piece of code that already exclude any partial stated rooms. The stream of unpartialstated rooms is only useful to know which ones have been unpartialed between 2 syncs, for an initial sync we just directly check the DB.

Comment on lines +1820 to +1822
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: For nonlazy, incremental syncs: find all the rooms we excluded in previous nonlazy syncs that we can now disclose to the user.

un_partial_stated_rooms = set()
if (
since_token
and not sync_result_builder.sync_config.filter_collection.lazy_load_members()
):
un_partial_stated_rooms_since = 0
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
if sync_result_builder.since_token is not None:
un_partial_stated_rooms_since = (
sync_result_builder.since_token.un_partial_stated_rooms_key
)

un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
un_partial_stated_rooms_since,
sync_result_builder.now_token.un_partial_stated_rooms_key,
sync_result_builder.joined_room_ids,
)
)

# 2. We check up front if anything has changed, if it hasn't then there is
# no point in going further.
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder)
have_changed = await self._have_rooms_changed(
sync_result_builder, un_partial_stated_rooms
)
Comment on lines +1846 to +1848
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every un-psed room during the sync period is considered to have "changed", i.e., client needs to be informed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rooms which are partial stated need to not sure up in a non-lazy sync. Where is this done? Should this be here?

EJ: this method is better called "should_we_tell_the_client_about_this_room_lol"

log_kv({"rooms_have_changed": have_changed})
if not have_changed:
tags_by_room = await self.store.get_updated_tags(
Expand All @@ -1835,7 +1859,7 @@ async def _generate_sync_entry_for_rooms(
ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users
sync_result_builder, ignored_users, un_partial_stated_rooms
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
Expand Down Expand Up @@ -1888,7 +1912,9 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
)

async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
self,
sync_result_builder: "SyncResultBuilder",
un_partial_stated_rooms: AbstractSet[str],
) -> bool:
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
Expand All @@ -1905,6 +1931,11 @@ async def _have_rooms_changed(

stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
# If a room has been un partial stated during the sync period,
# assume it has seen some kind of change. We'll process that
# change later, in _get_rooms_changed.
if room_id in un_partial_stated_rooms:
return True
if self.store.has_room_changed_since(room_id, stream_id):
return True
return False
Expand All @@ -1913,6 +1944,7 @@ async def _get_rooms_changed(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
un_partial_stated_rooms: AbstractSet[str],
) -> _RoomChanges:
"""Determine the changes in rooms to report to the user.

Expand Down Expand Up @@ -2116,7 +2148,25 @@ async def _get_rooms_changed(
room_entry = room_to_events.get(room_id, None)

newly_joined = room_id in newly_joined_rooms
if room_entry:

# Partially joined rooms are omitted from non lazy-loading-members
# syncs until the resync completes and that room is fully stated.
# When that happens, we need to include their full state in
# the next non-lazy-loading sync.
if (
not sync_config.filter_collection.lazy_load_members()
and room_id in un_partial_stated_rooms
):
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=None,
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EJ: are we going to lose events from room_events? Maybe... but we should treat this like a newly_joined room (it is from the client's perspective) so I think this is fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EJ: Does newly_joined_rooms need to be updated to include un-partial-stated rooms and exclude those which are currently partial stated? See the return value of this function

newly_joined=True,
full_state=True,
since_token=None,
upto_token=now_token,
)
elif room_entry:
events, start_key = room_entry

prev_batch_token = now_token.copy_and_replace(
Expand Down Expand Up @@ -2186,6 +2236,13 @@ async def _get_all_rooms(
knocked = []

for event in room_list:
# Do not include rooms that we don't have the full state yet
# in case of non lazy-loading-members sync.
if (
not sync_config.filter_collection.lazy_load_members()
) and await self.store.is_partial_state_room(event.room_id):
continue

Comment on lines +2239 to +2245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an initial sync, we completely ignore rooms that are currently partially stated

if event.room_version_id not in KNOWN_ROOM_VERSIONS:
continue

Expand Down
26 changes: 26 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,32 @@ async def on_new_room_events(
event_entries.append((entry, event.event_id))
await self.notify_new_room_events(event_entries, max_room_stream_token)

async def on_un_partial_stated_room(
self,
room_id: str,
new_token: int,
) -> None:
"""Used by the resync background processes to wake up all listeners
of this room that it just got un-partial-stated.

It will also notify replication listeners of the change in stream.
"""

# Wake up all related user stream notifiers
user_streams = self.room_to_user_streams.get(room_id, set())
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:
user_stream.notify(
StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
)
except Exception:
logger.exception("Failed to notify listener")

# Poke the replication so that other workers also see the write to
# the un-partial-stated rooms stream.
self.notify_replication()

async def notify_new_room_events(
self,
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn(
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_stated_rooms_key=0,
)

return events[:limit], next_token
Expand Down
47 changes: 41 additions & 6 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -1285,10 +1286,44 @@ def get_un_partial_stated_rooms_token(self) -> int:
# explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()

async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int, room_ids: Collection[str]
) -> Set[str]:
"""Get all rooms that got un partial stated between `last_id` exclusive and
`current_id` inclusive.

Returns:
The list of room ids.
"""

if last_id == current_id:
return set()

def _get_un_partial_stated_rooms_between_txn(
txn: LoggingTransaction,
) -> Set[str]:
sql = """
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ? AND
"""

clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", room_ids
)

txn.execute(sql + clause, [last_id, current_id] + list(args))

return {r[0] for r in txn}

return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_between",
_get_un_partial_stated_rooms_between_txn,
)

async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream.
"""Get updates for un partial stated rooms replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
Expand Down Expand Up @@ -2295,16 +2330,16 @@ async def unblock_room(self, room_id: str) -> None:
(room_id,),
)

async def clear_partial_state_room(self, room_id: str) -> bool:
async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
"""Clears the partial state flag for a room.

Args:
room_id: The room whose partial state flag is to be cleared.

Returns:
`True` if the partial state flag has been cleared successfully.
The corresponding stream id for the un-partial-stated rooms stream.

`False` if the partial state flag could not be cleared because the room
`None` if the partial state flag could not be cleared because the room
still contains events with partial state.
"""
try:
Expand All @@ -2315,15 +2350,15 @@ async def clear_partial_state_room(self, room_id: str) -> bool:
room_id,
un_partial_state_room_stream_id,
)
return True
return un_partial_state_room_stream_id
except self.db_pool.engine.module.IntegrityError as e:
# Assume that any `IntegrityError`s are due to partial state events.
logger.info(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
)
return False
return None

def _clear_partial_state_room_txn(
self,
Expand Down
3 changes: 3 additions & 0 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken:
push_rules_key = self.store.get_max_push_rules_stream_id()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token()

token = StreamToken(
room_key=self.sources.room.get_current_key(),
Expand All @@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken:
device_list_key=device_list_key,
# Groups key is unused.
groups_key=0,
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
)
return token

Expand Down Expand Up @@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_stated_rooms_key=0,
)
return token
12 changes: 8 additions & 4 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,14 +627,15 @@ class StreamKeyType:
PUSH_RULES: Final = "push_rules_key"
TO_DEVICE: Final = "to_device_key"
DEVICE_LIST: Final = "device_list_key"
UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class StreamToken:
"""A collection of keys joined together by underscores in the following
order and which represent the position in their respective streams.

ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1`
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
1. `room_key`: `s2633508` which is a `RoomStreamToken`
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
- See the docstring for `RoomStreamToken` for more details.
Expand All @@ -646,12 +647,13 @@ class StreamToken:
7. `to_device_key`: `274711`
8. `device_list_key`: `265584`
9. `groups_key`: `1` (note that this key is now unused)
10. `un_partial_stated_rooms_key`: `379`

You can see how many of these keys correspond to the various
fields in a "/sync" response:
```json
{
"next_batch": "s12_4_0_1_1_1_1_4_1",
"next_batch": "s12_4_0_1_1_1_1_4_1_1",
"presence": {
"events": []
},
Expand All @@ -663,7 +665,7 @@ class StreamToken:
"!QrZlfIDQLNLdZHqTnt:hs1": {
"timeline": {
"events": [],
"prev_batch": "s10_4_0_1_1_1_1_4_1",
"prev_batch": "s10_4_0_1_1_1_1_4_1_1",
"limited": false
},
"state": {
Expand Down Expand Up @@ -699,6 +701,7 @@ class StreamToken:
device_list_key: int
# Note that the groups key is no longer used and may have bogus values.
groups_key: int
un_partial_stated_rooms_key: int

_SEPARATOR = "_"
START: ClassVar["StreamToken"]
Expand Down Expand Up @@ -737,6 +740,7 @@ async def to_string(self, store: "DataStore") -> str:
# serialized so that there will not be confusion in the future
# if additional tokens are added.
str(self.groups_key),
str(self.un_partial_stated_rooms_key),
]
)

Expand Down Expand Up @@ -769,7 +773,7 @@ def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken":
return attr.evolve(self, **{key: new_value})


StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down
Loading