Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Handle remote device list updates during partial join
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Sep 27, 2022
1 parent f5aaa55 commit 8247a9f
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 0 deletions.
40 changes: 40 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,16 @@ async def incoming_device_list_update(
)
return

# Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we
# know who is in the room.
partial_rooms = await self.store.get_partial_state_rooms_and_servers()
if partial_rooms:
await self.store.add_remote_device_list_to_pending(
user_id,
device_id,
)

room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
Expand Down Expand Up @@ -1175,3 +1185,33 @@ async def process_cross_signing_key_update(
device_ids.append(verify_key.version)

return device_ids

async def handle_room_un_partial_stated(self, room_id: str) -> None:
"""Handles sending appropriate device list updates in a room that has
gone from partial to full state.
"""

pending_updates = (
await self.store.get_pending_remote_device_list_updates_for_room(room_id)
)

for user_id, device_id in pending_updates:
logger.info(
"Got pending device list update in room %s: %s / %s",
room_id,
user_id,
device_id,
)
position = await self.store.add_device_change_to_streams(
user_id,
[device_id],
room_ids=[room_id],
)

if not position:
# This should only happen if there are no updates, so we bail.
continue

self.device_handler.notifier.on_new_event(
StreamKeyType.DEVICE_LIST, position, rooms=[room_id]
)
4 changes: 4 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self, hs: "HomeServer"):
self.http_client = hs.get_proxied_blacklisted_http_client()
self._replication = hs.get_replication_data_handler()
self._federation_event_handler = hs.get_federation_event_handler()
self._device_list_update = hs.get_device_handler().device_list_updater

self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
Expand Down Expand Up @@ -1624,6 +1625,9 @@ async def _sync_partial_state_room(
# https://github.com/matrix-org/synapse/issues/12994
await self.state_handler.update_current_state(room_id)

logger.info("Handling any pending device list updates")
await self._device_list_update.handle_room_un_partial_stated(room_id)

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
Expand Down
55 changes: 55 additions & 0 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,3 +1995,58 @@ def add_device_list_outbound_pokes_txn(
add_device_list_outbound_pokes_txn,
stream_ids,
)

async def add_remote_device_list_to_pending(
self, user_id: str, device_id: str
) -> None:
"""Add a device list update to the table tracking remote device list
updates during partial joins.
"""

async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
await self.db_pool.simple_upsert(
table="device_lists_remote_pending",
keyvalues={
"user_id": user_id,
"device_id": device_id,
},
values={"stream_id": stream_id},
desc="add_remote_device_list_to_pending",
)

async def get_pending_remote_device_list_updates_for_room(
self, room_id: str
) -> Collection[Tuple[str, str]]:
"""Get the set of remote device list updates from the pending table for
the room.
"""

min_device_stream_id = await self.db_pool.simple_select_one_onecol(
table="partial_state_rooms",
keyvalues={
"room_id": room_id,
},
retcol="device_lists_stream_id",
desc="get_pending_remote_device_list_updates_for_room_device",
)

sql = """
SELECT user_id, device_id FROM device_lists_remote_pending AS d
INNER JOIN current_state_events AS c ON
type = 'm.room.member'
AND state_key = user_id
AND membership = 'join'
WHERE
room_id = ? AND stream_id > ?
"""

def get_pending_remote_device_list_updates_for_room_txn(
txn: LoggingTransaction,
) -> Collection[Tuple[str, str]]:
txn.execute(sql, (room_id, min_device_stream_id))
return cast(Collection[Tuple[str, str]], txn.fetchall())

return await self.db_pool.runInteraction(
"get_pending_remote_device_list_updates_for_room",
get_pending_remote_device_list_updates_for_room_txn,
)
20 changes: 20 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,26 @@ def _clear_partial_state_room_txn(
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))

# We now delete anything from `device_lists_remote_pending` with a
# stream ID less than the minimum
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
txn,
table="partial_state_rooms",
keyvalues={},
retcol="MIN(device_lists_stream_id)",
allow_none=True,
)
if device_lists_stream_id is None:
# There are no rooms being currently partially joined, so we delete everything.
txn.execute("DELETE FROM device_lists_remote_pending")
else:
sql = """
DELETE FROM device_lists_remote_pending
WHERE stream_id < ?
"""
txn.execute(sql, (device_lists_stream_id,))

@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Stores remote device lists we have received for remote users while a partial
-- join is in progress.
--
-- This allows us to replay any device list updates if it turns out the remote
-- user was in the partially joined room
CREATE TABLE device_lists_remote_pending(
stream_id BIGINT PRIMARY KEY,
user_id TEXT NOT NULL,
device_id TEXT NOT NULL
);

-- We only keep the most recent update for a given user/device pair.
CREATE UNIQUE INDEX device_lists_remote_pending_user_device_id ON device_lists_remote_pending(user_id, device_id);

0 comments on commit 8247a9f

Please sign in to comment.