From 997a5f6bac191f61d4621ba50d3aa23298fe6ab7 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Tue, 8 Nov 2022 20:13:17 -0600 Subject: [PATCH 01/13] Create new Replication HTTP Servlet based on ReplicationUserDevicesResyncRestServlet. And wire it up to be able to start. --- synapse/replication/http/devices.py | 51 +++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 3d63645726b9..10de0641a2a0 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -18,6 +18,7 @@ from twisted.web.server import Request from synapse.http.server import HttpServer +from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.types import JsonDict @@ -78,5 +79,55 @@ async def _handle_request( # type: ignore[override] return 200, user_devices +class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint): + """Ask master to upload keys for the user and send them out over federation to + update other servers. + + This must happen on master so that the results can be correctly cached in + the database and streamed to workers.( Is this accurate?) + + Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on + master to accomplish this. + """ + + NAME = "upload_keys_for_user" + PATH_ARGS = () + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.e2e_keys_handler = hs.get_e2e_keys_handler() + self.store = hs.get_datastores().main + self.clock = hs.get_clock() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + user_id: str, device_id: str, keys: JsonDict + ) -> JsonDict: + + return { + "user_id": user_id, + "device_id": device_id, + "keys": keys, + } + + async def _handle_request( # type: ignore[override] + self, request: Request + ) -> Tuple[int, JsonDict]: + content = parse_json_object_from_request(request) + + user_id = content["user_id"] + device_id = content["device_id"] + keys = content["keys"] + + results = await self.e2e_keys_handler.upload_keys_for_user( + user_id, device_id, keys + ) + + return 200, results + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ReplicationUserDevicesResyncRestServlet(hs).register(http_server) + ReplicationUploadKeysForUserRestServlet(hs).register(http_server) From f708a96ccfada8e6b5187fb6ceb5683e705055d2 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Tue, 8 Nov 2022 20:20:08 -0600 Subject: [PATCH 02/13] Introduce KeyUploadServlet to the new Replication HTTP Servlet. --- synapse/rest/client/keys.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py index f653d2a3e174..4dc154122879 100644 --- a/synapse/rest/client/keys.py +++ b/synapse/rest/client/keys.py @@ -27,6 +27,7 @@ ) from synapse.http.site import SynapseRequest from synapse.logging.opentracing import log_kv, set_tag +from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet from synapse.rest.client._base import client_patterns, interactive_auth_handler from synapse.types import JsonDict, StreamToken from synapse.util.cancellation import cancellable @@ -71,6 +72,13 @@ def __init__(self, hs: "HomeServer"): self.e2e_keys_handler = hs.get_e2e_keys_handler() self.device_handler = hs.get_device_handler() + if hs.config.worker.worker_app is None: + # if main process + self.key_uploader = self.e2e_keys_handler.upload_keys_for_user + else: + # then a worker + self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs) + async def on_POST( self, request: SynapseRequest, device_id: Optional[str] ) -> Tuple[int, JsonDict]: @@ -109,8 +117,8 @@ async def on_POST( 400, "To upload keys, you must pass device_id when authenticating" ) - result = await self.e2e_keys_handler.upload_keys_for_user( - user_id, device_id, body + result = await self.key_uploader( + user_id=user_id, device_id=device_id, keys=body ) return 200, result From 6e20f4072aa0550fa25cbdd7ea36b5588a0d7f41 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Wed, 9 Nov 2022 03:27:19 -0600 Subject: [PATCH 03/13] Remove the skeleton version of KeyUploadServlet and use the only one left. --- synapse/app/generic_worker.py | 104 +--------------------------------- 1 file changed, 3 insertions(+), 101 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index cb5892f041e9..50b3bf25bb25 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -14,14 +14,12 @@ # limitations under the License. import logging import sys -from typing import Dict, List, Optional, Tuple +from typing import Dict, List -from twisted.internet import address from twisted.web.resource import Resource import synapse import synapse.events -from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, FEDERATION_PREFIX, @@ -43,8 +41,7 @@ from synapse.config.server import ListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.server import JsonResource, OptionsResource -from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.http.site import SynapseRequest, SynapseSite +from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource @@ -76,12 +73,12 @@ versions, voip, ) -from synapse.rest.client._base import client_patterns from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet from synapse.rest.client.devices import DevicesRestServlet from synapse.rest.client.keys import ( KeyChangesServlet, KeyQueryServlet, + KeyUploadServlet, OneTimeKeyServlet, ) from synapse.rest.client.register import ( @@ -122,107 +119,12 @@ from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore -from synapse.types import JsonDict from synapse.util import SYNAPSE_VERSION from synapse.util.httpresourcetree import create_resource_tree logger = logging.getLogger("synapse.app.generic_worker") -class KeyUploadServlet(RestServlet): - """An implementation of the `KeyUploadServlet` that responds to read only - requests, but otherwise proxies through to the master instance. - """ - - PATTERNS = client_patterns("/keys/upload(/(?P[^/]+))?$") - - def __init__(self, hs: HomeServer): - """ - Args: - hs: server - """ - super().__init__() - self.auth = hs.get_auth() - self.store = hs.get_datastores().main - self.http_client = hs.get_simple_http_client() - self.main_uri = hs.config.worker.worker_main_http_uri - - async def on_POST( - self, request: SynapseRequest, device_id: Optional[str] - ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - user_id = requester.user.to_string() - body = parse_json_object_from_request(request) - - if device_id is not None: - # passing the device_id here is deprecated; however, we allow it - # for now for compatibility with older clients. - if requester.device_id is not None and device_id != requester.device_id: - logger.warning( - "Client uploading keys for a different device " - "(logged in as %s, uploading for %s)", - requester.device_id, - device_id, - ) - else: - device_id = requester.device_id - - if device_id is None: - raise SynapseError( - 400, "To upload keys, you must pass device_id when authenticating" - ) - - if body: - # They're actually trying to upload something, proxy to main synapse. - - # Proxy headers from the original request, such as the auth headers - # (in case the access token is there) and the original IP / - # User-Agent of the request. - headers: Dict[bytes, List[bytes]] = { - header: list(request.requestHeaders.getRawHeaders(header, [])) - for header in (b"Authorization", b"User-Agent") - } - # Add the previous hop to the X-Forwarded-For header. - x_forwarded_for = list( - request.requestHeaders.getRawHeaders(b"X-Forwarded-For", []) - ) - # we use request.client here, since we want the previous hop, not the - # original client (as returned by request.getClientAddress()). - if isinstance(request.client, (address.IPv4Address, address.IPv6Address)): - previous_host = request.client.host.encode("ascii") - # If the header exists, add to the comma-separated list of the first - # instance of the header. Otherwise, generate a new header. - if x_forwarded_for: - x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host] - x_forwarded_for.extend(x_forwarded_for[1:]) - else: - x_forwarded_for = [previous_host] - headers[b"X-Forwarded-For"] = x_forwarded_for - - # Replicate the original X-Forwarded-Proto header. Note that - # XForwardedForRequest overrides isSecure() to give us the original protocol - # used by the client, as opposed to the protocol used by our upstream proxy - # - which is what we want here. - headers[b"X-Forwarded-Proto"] = [ - b"https" if request.isSecure() else b"http" - ] - - try: - result = await self.http_client.post_json_get_json( - self.main_uri + request.uri.decode("ascii"), body, headers=headers - ) - except HttpResponseException as e: - raise e.to_synapse_error() from e - except RequestSendFailed as e: - raise SynapseError(502, "Failed to talk to master") from e - - return 200, result - else: - # Just interested in counts. - result = await self.store.count_e2e_one_time_keys(user_id, device_id) - return 200, {"one_time_key_counts": result} - - class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. From f727192b3ed33ccb3eba96fdba4e0d4bc5ab0b6c Mon Sep 17 00:00:00 2001 From: Jason Little Date: Wed, 9 Nov 2022 04:37:31 -0600 Subject: [PATCH 04/13] Add long docstring to ReplicationUploadKeysForUserRestServlet. --- synapse/replication/http/devices.py | 52 +++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 10de0641a2a0..4e6170ef6d34 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -88,6 +88,58 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint): Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on master to accomplish this. + + Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload + Request format(borrowed and expanded from KeyUploadServlet): + + POST /_synapse/replication/upload_keys_for_user + + { + "user_id": "", + "device_id": "", + "keys": { + "device_keys": { + "user_id": "", + "device_id": "", + "valid_until_ts": , + "algorithms": [ + "m.olm.curve25519-aes-sha2", + ] + "keys": { + ":": "", + }, + "signatures:" { + "" { + ":": "" + } + } + }, + "fallback_keys": { + ":": "", + "signed_:": { + "fallback": true, + "key": "", + "signatures": { + "": { + ":": "" + } + } + } + } + "one_time_keys": { + ":": "" + }, + } + } + Response is equivalent to ` /_matrix/client/v3/keys/upload` + response, e.g.: + + { + "one_time_key_counts": { + "curve25519": 10, + "signed_curve25519": 20 + } + } """ NAME = "upload_keys_for_user" From 1bd1e1359a2c0e2865dd4ff31acd1f2405532fc0 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Wed, 9 Nov 2022 04:39:17 -0600 Subject: [PATCH 05/13] Remove reading 'worker_main_http_uri' from config yaml, as the place it is consumed no longer exists. --- synapse/config/workers.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 0fb725dd8fc6..3005dc5d4062 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -155,8 +155,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.worker_name = config.get("worker_name", self.worker_app) self.instance_name = self.worker_name or "master" - self.worker_main_http_uri = config.get("worker_main_http_uri", None) - # This option is really only here to support `--manhole` command line # argument. manhole = config.get("worker_manhole") From 6fc961818d932fcb572e8f33ee98dab3fd6d3b7f Mon Sep 17 00:00:00 2001 From: Jason Little Date: Wed, 9 Nov 2022 04:42:28 -0600 Subject: [PATCH 06/13] Prove it works by totally removing 'worker_main_http_uri' from the bootstrapping script. --- docker/configure_workers_and_start.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index da259129d1c2..efeb9a401627 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -209,10 +209,7 @@ "listener_resources": ["client", "replication"], "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], "shared_extra_conf": {}, - "worker_extra_conf": ( - "worker_main_http_uri: http://127.0.0.1:%d" - % (MAIN_PROCESS_HTTP_LISTENER_PORT,) - ), + "worker_extra_conf": "", }, "account_data": { "app": "synapse.app.generic_worker", From c5ea4ae695cee068ff2515efd5827898ffbf185d Mon Sep 17 00:00:00 2001 From: Jason Little Date: Wed, 9 Nov 2022 15:12:00 -0600 Subject: [PATCH 07/13] Changelog --- changelog.d/14400.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14400.misc diff --git a/changelog.d/14400.misc b/changelog.d/14400.misc new file mode 100644 index 000000000000..6e025329c4e5 --- /dev/null +++ b/changelog.d/14400.misc @@ -0,0 +1 @@ +Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication. From badd36a261fdc92ad432e40506e729eddd69742d Mon Sep 17 00:00:00 2001 From: Jason Little Date: Wed, 9 Nov 2022 15:40:34 -0600 Subject: [PATCH 08/13] Give editing configuration manual a shot. --- docs/workers.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/workers.md b/docs/workers.md index 7ee8801161cf..9440b5335813 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -135,7 +135,7 @@ In the config file for each worker, you must specify: [`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). * If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option with an `http` listener. - * If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for + * **Deprecated as of Synapse v1.72.** If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for the main process (`worker_main_http_uri`). For example: @@ -221,7 +221,6 @@ information. ^/_matrix/client/(api/v1|r0|v3|unstable)/search$ # Encryption requests - # Note that ^/_matrix/client/(r0|v3|unstable)/keys/upload/ requires `worker_main_http_uri` ^/_matrix/client/(r0|v3|unstable)/keys/query$ ^/_matrix/client/(r0|v3|unstable)/keys/changes$ ^/_matrix/client/(r0|v3|unstable)/keys/claim$ @@ -376,7 +375,7 @@ responsible for - persisting them to the DB, and finally - updating the events stream. -Because load is sharded in this way, you *must* restart all worker instances when +Because load is sharded in this way, you *must* restart all worker instances when adding or removing event persisters. An `event_persister` should not be mistaken for an `event_creator`. From dd9d895eba63c4a453b4efa6be1b5b544fb0d453 Mon Sep 17 00:00:00 2001 From: realtyem Date: Mon, 14 Nov 2022 12:37:35 -0600 Subject: [PATCH 09/13] Update docs/workers.md Co-authored-by: David Robertson --- docs/workers.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/workers.md b/docs/workers.md index 9440b5335813..46046508037d 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -135,8 +135,8 @@ In the config file for each worker, you must specify: [`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). * If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option with an `http` listener. - * **Deprecated as of Synapse v1.72.** If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for - the main process (`worker_main_http_uri`). + * **Synapse 1.71 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for + the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.72 and newer. For example: From 8b4e09925b4ef7625fb8688444b426d94371f6f5 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 14 Nov 2022 13:03:25 -0600 Subject: [PATCH 10/13] Put the reading from yaml back and add a logger warning per dmr's request. --- synapse/config/workers.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 3005dc5d4062..95ce99083257 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -155,6 +155,11 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.worker_name = config.get("worker_name", self.worker_app) self.instance_name = self.worker_name or "master" + # FIXME: Remove this check after a suitable amount of time. + self.worker_main_http_uri = config.get("worker_main_http_uri", None) + if self.worker_main_http_uri is not None: + logger.warning("The config option worker_main_http_uri is unused since Synapse 1.72. It can be safely removed from your configuration.") + # This option is really only here to support `--manhole` command line # argument. manhole = config.get("worker_manhole") From f2c780bd8e22bd08e15fedbc454074267e5df7eb Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 14 Nov 2022 13:19:50 -0600 Subject: [PATCH 11/13] Rearrange comments to deduplicate and be smarter about layout per review. --- synapse/replication/http/devices.py | 44 +++-------------------- synapse/rest/client/keys.py | 56 ++++++++++++++++++++--------- 2 files changed, 44 insertions(+), 56 deletions(-) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 4e6170ef6d34..b733a737f1f4 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -87,7 +87,7 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint): the database and streamed to workers.( Is this accurate?) Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on - master to accomplish this. + the main process to accomplish this. Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload Request format(borrowed and expanded from KeyUploadServlet): @@ -98,48 +98,12 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint): "user_id": "", "device_id": "", "keys": { - "device_keys": { - "user_id": "", - "device_id": "", - "valid_until_ts": , - "algorithms": [ - "m.olm.curve25519-aes-sha2", - ] - "keys": { - ":": "", - }, - "signatures:" { - "" { - ":": "" - } - } - }, - "fallback_keys": { - ":": "", - "signed_:": { - "fallback": true, - "key": "", - "signatures": { - "": { - ":": "" - } - } - } - } - "one_time_keys": { - ":": "" - }, + ....this part can be found in KeyUploadServlet in rest/client/keys.py.... } } - Response is equivalent to ` /_matrix/client/v3/keys/upload` - response, e.g.: - { - "one_time_key_counts": { - "curve25519": 10, - "signed_curve25519": 20 - } - } + Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet + """ NAME = "upload_keys_for_user" diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py index 4dc154122879..ee038c71921a 100644 --- a/synapse/rest/client/keys.py +++ b/synapse/rest/client/keys.py @@ -44,24 +44,48 @@ class KeyUploadServlet(RestServlet): Content-Type: application/json { - "device_keys": { - "user_id": "", - "device_id": "", - "valid_until_ts": , - "algorithms": [ - "m.olm.curve25519-aes-sha2", - ] - "keys": { - ":": "", + "device_keys": { + "user_id": "", + "device_id": "", + "valid_until_ts": , + "algorithms": [ + "m.olm.curve25519-aes-sha2", + ] + "keys": { + ":": "", + }, + "signatures:" { + "" { + ":": "" + } + } }, - "signatures:" { - "" { - ":": "" - } } }, - "one_time_keys": { - ":": "" - }, + "fallback_keys": { + ":": "", + "signed_:": { + "fallback": true, + "key": "", + "signatures": { + "": { + ":": "" + } + } + } + } + "one_time_keys": { + ":": "" + }, + } + + response, e.g.: + + { + "one_time_key_counts": { + "curve25519": 10, + "signed_curve25519": 20 + } } + """ PATTERNS = client_patterns("/keys/upload(/(?P[^/]+))?$") From 66ded784808f732340576bce01fd018679aedaca Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 14 Nov 2022 16:30:10 -0600 Subject: [PATCH 12/13] Linting --- synapse/config/workers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 95ce99083257..09ed8cc1e7cb 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -158,7 +158,10 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: # FIXME: Remove this check after a suitable amount of time. self.worker_main_http_uri = config.get("worker_main_http_uri", None) if self.worker_main_http_uri is not None: - logger.warning("The config option worker_main_http_uri is unused since Synapse 1.72. It can be safely removed from your configuration.") + logger.warning( + "The config option worker_main_http_uri is unused since Synapse 1.72. " + "It can be safely removed from your configuration." + ) # This option is really only here to support `--manhole` command line # argument. From ad3d454852e0548b76c424d44d5cda5139893be9 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 16 Nov 2022 12:35:59 +0000 Subject: [PATCH 13/13] Adjust replication servelet comment --- synapse/replication/http/devices.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index b733a737f1f4..c21629def832 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -83,8 +83,8 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint): """Ask master to upload keys for the user and send them out over federation to update other servers. - This must happen on master so that the results can be correctly cached in - the database and streamed to workers.( Is this accurate?) + For now, only the master is permitted to handle key upload requests; + any worker can handle key query requests (since they're read-only). Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on the main process to accomplish this.