Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove legacy replication endpoint for resyncing devices #14932

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 8 additions & 37 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,11 +919,11 @@ class DeviceListWorkerUpdater:
def __init__(self, hs: "HomeServer"):
from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
ReplicationUserDevicesResyncRestServlet,
ReplicationMultiUserDevicesResyncRestServlet,
)

self._user_device_resync_client = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
self._multi_user_device_resync_client = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)
self._multi_user_device_resync_client = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
Expand Down Expand Up @@ -957,27 +957,10 @@ async def multi_user_device_resync(
# Fall back to single requests
result: Dict[str, Optional[JsonDict]] = {}
for user_id in user_ids:
result[user_id] = await self._user_device_resync_client(user_id=user_id)
result[user_id] = await self._multi_user_device_resync_client(user_id=user_id)
return result

async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]:
"""Fetches all devices for a user and updates the device cache with them.

Args:
user_id: The user's id whose device_list will be updated.
mark_failed_as_stale: Whether to mark the user's device list as stale
if the attempt to resync failed.
Returns:
A dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
None when we weren't able to fetch the device info for some reason,
e.g. due to a connection problem.
"""
return (await self.multi_user_device_resync([user_id]))[user_id]



class DeviceListUpdater(DeviceListWorkerUpdater):
"Handles incoming device list updates from federation and updates the DB"
Expand Down Expand Up @@ -1129,7 +1112,7 @@ async def _handle_device_updates(self, user_id: str) -> None:
)

if resync:
await self.user_device_resync(user_id)
await self.multi_user_device_resync(user_id)
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
Expand Down Expand Up @@ -1196,9 +1179,8 @@ async def _maybe_retry_device_resync(self) -> None:
for user_id in need_resync:
try:
# Try to resync the current user's devices list.
result = await self.user_device_resync(
user_id=user_id,
mark_failed_as_stale=False,
result = await self.multi_user_device_resync(
user_id, mark_failed_as_stale=False
)

# user_device_resync only returns a result if it managed to
Expand Down Expand Up @@ -1258,17 +1240,6 @@ async def multi_user_device_resync(

return result

async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]:
result, failed = await self._user_device_resync_returning_failed(user_id)

if failed and mark_failed_as_stale:
# Mark the remote user's device list as stale so we know we need to retry
# it later.
await self.store.mark_remote_users_device_caches_as_stale((user_id,))

return result

async def _user_device_resync_returning_failed(
self, user_id: str
Expand Down
12 changes: 6 additions & 6 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
log_kv,
set_tag,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.devices import ReplicationMultiUserDevicesResyncRestServlet
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.stringutils import random_string
Expand Down Expand Up @@ -71,12 +71,12 @@ def __init__(self, hs: "HomeServer"):
# sync. We do all device list resyncing on the master instance, so if
# we're on a worker we hit the device resync replication API.
if hs.config.worker.worker_app is None:
self._user_device_resync = (
hs.get_device_handler().device_list_updater.user_device_resync
self._multi_user_device_resync = (
hs.get_device_handler().device_list_updater.multi_user_device_resync
)
else:
self._user_device_resync = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
self._multi_user_device_resync = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)

# a rate limiter for room key requests. The keys are
Expand Down Expand Up @@ -198,7 +198,7 @@ async def _check_for_unknown_devices(
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))

# Immediately attempt a resync in the background
run_in_background(self._user_device_resync, user_id=sender_user_id)
run_in_background(self._multi_user_device_resync, user_id=sender_user_id)

async def send_device_message(
self,
Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.devices import ReplicationMultiUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
Expand Down Expand Up @@ -167,8 +167,8 @@ def __init__(self, hs: "HomeServer"):

self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
if hs.config.worker.worker_app:
self._user_device_resync = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
self._multi_user_device_resync = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)
else:
self._device_list_updater = hs.get_device_handler().device_list_updater
Expand Down Expand Up @@ -1487,9 +1487,9 @@ async def _resync_device(self, sender: str) -> None:

# Immediately attempt a resync in the background
if self._config.worker.worker_app:
await self._user_device_resync(user_id=sender)
await self._multi_user_device_resync(user_id=sender)
else:
await self._device_list_updater.user_device_resync(sender)
await self._device_list_updater.multi_user_device_resync(sender)
except Exception:
logger.exception("Failed to resync device for %s", sender)

Expand Down
126 changes: 0 additions & 126 deletions synapse/replication/http/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,131 +28,6 @@
logger = logging.getLogger(__name__)


class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for a user by contacting their
server.

This must happen on master so that the results can be correctly cached in
the database and streamed to workers.

Request format:

POST /_synapse/replication/user_device_resync/:user_id

{}

Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, e.g.:

{
"user_id": "@alice:example.org",
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
}
"""

NAME = "user_device_resync"
PATH_ARGS = ("user_id",)
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

from synapse.handlers.device import DeviceHandler

handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.device_list_updater = handler.device_list_updater

self.store = hs.get_datastores().main
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
return {}

async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, Optional[JsonDict]]:
user_devices = await self.device_list_updater.user_device_resync(user_id)

return 200, user_devices


class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for multiple users from the same
remote server by contacting their server.

This must happen on master so that the results can be correctly cached in
the database and streamed to workers.

Request format:

POST /_synapse/replication/multi_user_device_resync

{
"user_ids": ["@alice:example.org", "@bob:example.org", ...]
}

Response is roughly equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, but there is a map from user ID to response, e.g.:

{
"@alice:example.org": {
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
},
...
}
"""

NAME = "multi_user_device_resync"
PATH_ARGS = ()
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

from synapse.handlers.device import DeviceHandler

handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.device_list_updater = handler.device_list_updater

self.store = hs.get_datastores().main
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload(user_ids: List[str]) -> JsonDict: # type: ignore[override]
return {"user_ids": user_ids}

async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict
) -> Tuple[int, Dict[str, Optional[JsonDict]]]:
user_ids: List[str] = content["user_ids"]

logger.info("Resync for %r", user_ids)
span = active_span()
if span:
span.set_tag("user_ids", f"{user_ids!r}")

multi_user_devices = await self.device_list_updater.multi_user_device_resync(
user_ids
)

return 200, multi_user_devices


class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
"""Ask master to upload keys for the user and send them out over federation to
update other servers.
Expand Down Expand Up @@ -216,6 +91,5 @@ async def _handle_request( # type: ignore[override]


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
2 changes: 1 addition & 1 deletion tests/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def test_cross_signing_keys_retry(self) -> None:
# Resync the device list.
device_handler = self.hs.get_device_handler()
self.get_success(
device_handler.device_list_updater.user_device_resync(remote_user_id),
device_handler.device_list_updater.multi_user_device_resync(remote_user_id),
)

# Retrieve the cross-signing keys for this user.
Expand Down