Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Non lazy loading sync not blocking during fast join
Browse files Browse the repository at this point in the history
Signed-off-by: Mathieu Velten <mathieuv@matrix.org>
  • Loading branch information
Mathieu Velten committed Jan 16, 2023
1 parent 4db3331 commit 39a0ddc
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 32 deletions.
1 change: 1 addition & 0 deletions changelog.d/14831.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Non lazy loading sync not blocking during fast join.
84 changes: 63 additions & 21 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1817,11 +1817,32 @@ async def _generate_sync_entry_for_rooms(
)
sync_result_builder.now_token = now_token

# Retrieve rooms that got un partial stated in the meantime, only useful in case
# of a non lazy-loading-members sync.
un_partial_stated_rooms = set()
if not sync_result_builder.sync_config.filter_collection.lazy_load_members():
un_partial_state_rooms_since = 0
if sync_result_builder.since_token is not None:
un_partial_state_rooms_since = int(
sync_result_builder.since_token.un_partial_state_rooms_key
)

un_partial_state_rooms_now = int(now_token.un_partial_state_rooms_key)
if un_partial_state_rooms_since != un_partial_state_rooms_now:
un_partial_stated_rooms = (
await self.store.get_un_partial_stated_rooms_between(
un_partial_state_rooms_since,
un_partial_state_rooms_now,
)
)

# 2. We check up front if anything has changed, if it hasn't then there is
# no point in going further.
if not sync_result_builder.full_state:
if since_token and not ephemeral_by_room and not account_data_by_room:
have_changed = await self._have_rooms_changed(sync_result_builder)
have_changed = await self._have_rooms_changed(
sync_result_builder, un_partial_stated_rooms
)
log_kv({"rooms_have_changed": have_changed})
if not have_changed:
tags_by_room = await self.store.get_updated_tags(
Expand All @@ -1835,7 +1856,7 @@ async def _generate_sync_entry_for_rooms(
ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users
sync_result_builder, ignored_users, un_partial_stated_rooms
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
Expand Down Expand Up @@ -1888,7 +1909,9 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
)

async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
self,
sync_result_builder: "SyncResultBuilder",
un_partial_stated_rooms: Set[str],
) -> bool:
"""Returns whether there may be any new events that should be sent down
the sync. Returns True if there are.
Expand All @@ -1905,6 +1928,8 @@ async def _have_rooms_changed(

stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if room_id in un_partial_stated_rooms:
return True
if self.store.has_room_changed_since(room_id, stream_id):
return True
return False
Expand All @@ -1913,6 +1938,7 @@ async def _get_rooms_changed(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
un_partial_stated_rooms: Set[str],
) -> _RoomChanges:
"""Determine the changes in rooms to report to the user.
Expand Down Expand Up @@ -2113,37 +2139,46 @@ async def _get_rooms_changed(
# We loop through all room ids, even if there are no new events, in case
# there are non room events that we need to notify about.
for room_id in sync_result_builder.joined_room_ids:
full_state = False
new_since_token: Optional[StreamToken] = since_token
newly_joined = room_id in newly_joined_rooms

room_entry = room_to_events.get(room_id, None)

newly_joined = room_id in newly_joined_rooms
if room_entry:
# In case of a non lazy-loading-members sync we want to include
# rooms that got un partial stated in the meantime, and we need
# to include the full state of them.
if (
not sync_config.filter_collection.lazy_load_members()
and room_id in un_partial_stated_rooms
):
newly_joined = True
full_state = True
new_since_token = None
upto_token = now_token
elif room_entry:
events, start_key = room_entry

prev_batch_token = now_token.copy_and_replace(
StreamKeyType.ROOM, start_key
)

entry = RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=events,
newly_joined=newly_joined,
full_state=False,
since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
)
new_since_token = None if newly_joined else since_token
upto_token = prev_batch_token
else:
entry = RoomSyncResultBuilder(
upto_token = since_token

room_entries.append(
RoomSyncResultBuilder(
room_id=room_id,
rtype="joined",
events=[],
events=None,
newly_joined=newly_joined,
full_state=False,
since_token=since_token,
upto_token=since_token,
full_state=full_state,
since_token=new_since_token,
upto_token=upto_token,
)

room_entries.append(entry)
)

return _RoomChanges(
room_entries,
Expand Down Expand Up @@ -2186,6 +2221,13 @@ async def _get_all_rooms(
knocked = []

for event in room_list:
# Do not include rooms that we don't have the full state yet
# in case of non lazy-loading-members sync.
if (
not sync_config.filter_collection.lazy_load_members()
) and await self.store.is_partial_state_room(event.room_id):
continue

if event.room_version_id not in KNOWN_ROOM_VERSIONS:
continue

Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn(
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_state_rooms_key=0,
)

return events[:limit], next_token
Expand Down
32 changes: 31 additions & 1 deletion synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -1285,10 +1286,39 @@ def get_un_partial_stated_rooms_token(self) -> int:
# explanation.)
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()

async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int
) -> Set[str]:
"""Get all rooms that got un partial stated between `last_id` exclusive and
`current_id` inclusive.
Returns:
The list of rooms.
"""

if last_id == current_id:
return set()

def _get_un_partial_stated_rooms_between_txn(
txn: LoggingTransaction,
) -> Set[str]:
sql = """
SELECT DISTINCT room_id FROM un_partial_stated_room_stream
WHERE ? < stream_id AND stream_id <= ?
"""
txn.execute(sql, (last_id, current_id))

return set([r[0] for r in txn])

return await self.db_pool.runInteraction(
"get_un_partial_stated_rooms_between",
_get_un_partial_stated_rooms_between_txn,
)

async def get_un_partial_stated_rooms_from_stream(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
"""Get updates for caches replication stream.
"""Get updates for un partial stated rooms replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
Expand Down
3 changes: 3 additions & 0 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken:
push_rules_key = self.store.get_max_push_rules_stream_id()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
un_partial_state_rooms_key = self.store.get_un_partial_stated_rooms_token()

token = StreamToken(
room_key=self.sources.room.get_current_key(),
Expand All @@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken:
device_list_key=device_list_key,
# Groups key is unused.
groups_key=0,
un_partial_state_rooms_key=un_partial_state_rooms_key,
)
return token

Expand Down Expand Up @@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
to_device_key=0,
device_list_key=0,
groups_key=0,
un_partial_state_rooms_key=0,
)
return token
9 changes: 6 additions & 3 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,13 @@ class StreamToken:
7. `to_device_key`: `274711`
8. `device_list_key`: `265584`
9. `groups_key`: `1` (note that this key is now unused)
10. `un_partial_state_rooms_key`: `379`
You can see how many of these keys correspond to the various
fields in a "/sync" response:
```json
{
"next_batch": "s12_4_0_1_1_1_1_4_1",
"next_batch": "s12_4_0_1_1_1_1_4_1_1",
"presence": {
"events": []
},
Expand All @@ -663,7 +664,7 @@ class StreamToken:
"!QrZlfIDQLNLdZHqTnt:hs1": {
"timeline": {
"events": [],
"prev_batch": "s10_4_0_1_1_1_1_4_1",
"prev_batch": "s10_4_0_1_1_1_1_4_1_1",
"limited": false
},
"state": {
Expand Down Expand Up @@ -699,6 +700,7 @@ class StreamToken:
device_list_key: int
# Note that the groups key is no longer used and may have bogus values.
groups_key: int
un_partial_state_rooms_key: int

_SEPARATOR = "_"
START: ClassVar["StreamToken"]
Expand Down Expand Up @@ -737,6 +739,7 @@ async def to_string(self, store: "DataStore") -> str:
# serialized so that there will not be confusion in the future
# if additional tokens are added.
str(self.groups_key),
str(self.un_partial_state_rooms_key),
]
)

Expand Down Expand Up @@ -769,7 +772,7 @@ def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken":
return attr.evolve(self, **{key: new_value})


StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down
4 changes: 2 additions & 2 deletions tests/rest/admin/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,7 @@ def test_timestamp_to_event(self) -> None:

def test_topo_token_is_accepted(self) -> None:
"""Test Topo Token is accepted."""
token = "t1-0_0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
Expand All @@ -1845,7 +1845,7 @@ def test_topo_token_is_accepted(self) -> None:

def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
"""Test that stream token is accepted for forward pagination."""
token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
Expand Down
10 changes: 5 additions & 5 deletions tests/rest/client/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -1987,7 +1987,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.room_id = self.helper.create_room_as(self.user_id)

def test_topo_token_is_accepted(self) -> None:
token = "t1-0_0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
)
Expand All @@ -1998,7 +1998,7 @@ def test_topo_token_is_accepted(self) -> None:
self.assertTrue("end" in channel.json_body)

def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
)
Expand Down Expand Up @@ -2728,7 +2728,7 @@ def test_messages_filter_labels(self) -> None:
"""Test that we can filter by a label on a /messages request."""
self._send_labelled_messages_in_room()

token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
Expand All @@ -2745,7 +2745,7 @@ def test_messages_filter_not_labels(self) -> None:
"""Test that we can filter by the absence of a label on a /messages request."""
self._send_labelled_messages_in_room()

token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
Expand All @@ -2768,7 +2768,7 @@ def test_messages_filter_labels_not_labels(self) -> None:
"""
self._send_labelled_messages_in_room()

token = "s0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
Expand Down

0 comments on commit 39a0ddc

Please sign in to comment.