Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Non lazy loading sync not blocking during fast join #14831

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions changelog.d/14831.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Non lazy loading sync not blocking during fast join.
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
63 changes: 59 additions & 4 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1817,11 +1817,34 @@ async def _generate_sync_entry_for_rooms(
)
sync_result_builder.now_token = now_token

# Retrieve rooms that got un partial stated in the meantime, only useful in case
# of a non lazy-loading-members sync.
un_partial_stated_rooms = set()
if not sync_result_builder.sync_config.filter_collection.lazy_load_members():
un_partial_state_rooms_since = 0
if sync_result_builder.since_token is not None:
un_partial_state_rooms_since = int(
sync_result_builder.since_token.un_partial_state_rooms_key
)

un_partial_state_rooms_now = int(
sync_result_builder.now_token.un_partial_state_rooms_key
)
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
if un_partial_state_rooms_since != un_partial_state_rooms_now:
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
un_partial_state_rooms_since,
un_partial_state_rooms_now,
)
)

# 2. We check up front if anything has changed, if it hasn't then there is
# no point in going further.
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder)
have_changed = await self._have_rooms_changed(
sync_result_builder, un_partial_stated_rooms
)
Comment on lines +1846 to +1848
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every un-psed room during the sync period is considered to have "changed", i.e., client needs to be informed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rooms which are partial stated need to not sure up in a non-lazy sync. Where is this done? Should this be here?

EJ: this method is better called "should_we_tell_the_client_about_this_room_lol"

log_kv({"rooms_have_changed": have_changed})
if not have_changed:
tags_by_room = await self.store.get_updated_tags(
Expand All @@ -1835,7 +1858,7 @@ async def _generate_sync_entry_for_rooms(
ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users
sync_result_builder, ignored_users, un_partial_stated_rooms
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
Expand Down Expand Up @@ -1888,7 +1911,9 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
)

async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
self,
sync_result_builder: "SyncResultBuilder",
un_partial_stated_rooms: Set[str],
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
) -> bool:
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
Expand All @@ -1905,6 +1930,11 @@ async def _have_rooms_changed(

stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
# If a room has been un partial stated in the meantime,
# let's consider it has changes and deal with it accordingly
# in _get_rooms_changed.
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
if room_id in un_partial_stated_rooms:
return True
if self.store.has_room_changed_since(room_id, stream_id):
return True
return False
Expand All @@ -1913,6 +1943,7 @@ async def _get_rooms_changed(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
un_partial_stated_rooms: Set[str],
) -> _RoomChanges:
"""Determine the changes in rooms to report to the user.

Expand Down Expand Up @@ -2116,7 +2147,24 @@ async def _get_rooms_changed(
room_entry = room_to_events.get(room_id, None)

newly_joined = room_id in newly_joined_rooms
if room_entry:

# In case of a non lazy-loading-members sync we want to include
# rooms that got un partial stated in the meantime, and we need
# to include the full state of them.
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
if (
not sync_config.filter_collection.lazy_load_members()
and room_id in un_partial_stated_rooms
):
entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=None,
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EJ: are we going to lose events from room_events? Maybe... but we should treat this like a newly_joined room (it is from the client's perspective) so I think this is fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EJ: Does newly_joined_rooms need to be updated to include un-partial-stated rooms and exclude those which are currently partial stated? See the return value of this function

newly_joined=True,
full_state=True,
since_token=None,
upto_token=now_token,
)
elif room_entry:
events, start_key = room_entry

prev_batch_token = now_token.copy_and_replace(
Expand Down Expand Up @@ -2186,6 +2234,13 @@ async def _get_all_rooms(
knocked = []

for event in room_list:
# Do not include rooms that we don't have the full state yet
# in case of non lazy-loading-members sync.
if (
not sync_config.filter_collection.lazy_load_members()
) and await self.store.is_partial_state_room(event.room_id):
continue

Comment on lines +2239 to +2245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an initial sync, we completely ignore rooms that are currently partially stated

if event.room_version_id not in KNOWN_ROOM_VERSIONS:
continue

Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn(
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_state_rooms_key=0,
)

return events[:limit], next_token
Expand Down
32 changes: 31 additions & 1 deletion synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -1285,10 +1286,39 @@ def get_un_partial_stated_rooms_token(self) -> int:
# explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()

async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int
) -> Set[str]:
"""Get all rooms that got un partial stated between `last_id` exclusive and
`current_id` inclusive.

Returns:
The list of rooms.
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
"""

if last_id == current_id:
return set()

def _get_un_partial_stated_rooms_between_txn(
txn: LoggingTransaction,
) -> Set[str]:
sql = """
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ?
"""
txn.execute(sql, (last_id, current_id))

return {r[0] for r in txn}

return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_between",
_get_un_partial_stated_rooms_between_txn,
)

async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream.
"""Get updates for un partial stated rooms replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
Expand Down
3 changes: 3 additions & 0 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken:
push_rules_key = self.store.get_max_push_rules_stream_id()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
un_partial_state_rooms_key = self.store.get_un_partial_stated_rooms_token()

token = StreamToken(
room_key=self.sources.room.get_current_key(),
Expand All @@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken:
device_list_key=device_list_key,
# Groups key is unused.
groups_key=0,
un_partial_state_rooms_key=un_partial_state_rooms_key,
)
return token

Expand Down Expand Up @@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_state_rooms_key=0,
)
return token
9 changes: 6 additions & 3 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,13 @@ class StreamToken:
7. `to_device_key`: `274711`
8. `device_list_key`: `265584`
9. `groups_key`: `1` (note that this key is now unused)
10. `un_partial_state_rooms_key`: `379`
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

You can see how many of these keys correspond to the various
fields in a "/sync" response:
```json
{
"next_batch": "s12_4_0_1_1_1_1_4_1",
"next_batch": "s12_4_0_1_1_1_1_4_1_1",
"presence": {
"events": []
},
Expand All @@ -663,7 +664,7 @@ class StreamToken:
"!QrZlfIDQLNLdZHqTnt:hs1": {
"timeline": {
"events": [],
"prev_batch": "s10_4_0_1_1_1_1_4_1",
"prev_batch": "s10_4_0_1_1_1_1_4_1_1",
"limited": false
},
"state": {
Expand Down Expand Up @@ -699,6 +700,7 @@ class StreamToken:
device_list_key: int
# Note that the groups key is no longer used and may have bogus values.
groups_key: int
un_partial_state_rooms_key: int

_SEPARATOR = "_"
START: ClassVar["StreamToken"]
Expand Down Expand Up @@ -737,6 +739,7 @@ async def to_string(self, store: "DataStore") -> str:
# serialized so that there will not be confusion in the future
# if additional tokens are added.
str(self.groups_key),
str(self.un_partial_state_rooms_key),
]
)

Expand Down Expand Up @@ -769,7 +772,7 @@ def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken":
return attr.evolve(self, **{key: new_value})


StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down
4 changes: 2 additions & 2 deletions tests/rest/admin/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,7 @@ def test_timestamp_to_event(self) -> None:

def test_topo_token_is_accepted(self) -> None:
"""Test Topo Token is accepted."""
token = "t1-0_0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
Expand All @@ -1845,7 +1845,7 @@ def test_topo_token_is_accepted(self) -> None:

def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
"""Test that stream token is accepted for forward pagination."""
token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
Expand Down
10 changes: 5 additions & 5 deletions tests/rest/client/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -1987,7 +1987,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.room_id = self.helper.create_room_as(self.user_id)

def test_topo_token_is_accepted(self) -> None:
token = "t1-0_0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
)
Expand All @@ -1998,7 +1998,7 @@ def test_topo_token_is_accepted(self) -> None:
self.assertTrue("end" in channel.json_body)

def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
)
Expand Down Expand Up @@ -2728,7 +2728,7 @@ def test_messages_filter_labels(self) -> None:
"""Test that we can filter by a label on a /messages request."""
self._send_labelled_messages_in_room()

token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
Expand All @@ -2745,7 +2745,7 @@ def test_messages_filter_not_labels(self) -> None:
"""Test that we can filter by the absence of a label on a /messages request."""
self._send_labelled_messages_in_room()

token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
Expand All @@ -2768,7 +2768,7 @@ def test_messages_filter_labels_not_labels(self) -> None:
"""
self._send_labelled_messages_in_room()

token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
Expand Down