From 0536d0c9beb15fce5f2ca24b4565f79f3140943f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 20:35:40 +0000 Subject: [PATCH 01/18] make FederationClient.backfill async --- synapse/federation/federation_client.py | 26 +++++++++++-------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f99d17a7de96..297292f389f2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,7 +17,7 @@ import copy import itertools import logging -from typing import Dict, Iterable +from typing import Dict, Iterable, List, Tuple from prometheus_client import Counter @@ -37,7 +37,7 @@ EventFormatVersions, RoomVersions, ) -from synapse.events import builder, room_version_to_event_format +from synapse.events import EventBase, builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.logging.context import make_deferred_yieldable from synapse.logging.utils import log_function @@ -170,21 +170,17 @@ def claim_client_keys(self, destination, content, timeout): sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys(destination, content, timeout) - @defer.inlineCallbacks - @log_function - def backfill(self, dest, room_id, limit, extremities): - """Requests some more historic PDUs for the given context from the + async def backfill( + self, dest: str, room_id: str, limit: int, extremities: Iterable[str] + ) -> List[EventBase]: + """Requests some more historic PDUs for the given room from the given destination server. Args: dest (str): The remote homeserver to ask. room_id (str): The room_id to backfill. - limit (int): The maximum number of PDUs to return. - extremities (list): List of PDU id and origins of the first pdus - we have seen from the context - - Returns: - Deferred: Results in the received PDUs. + limit (int): The maximum number of events to return. + extremities (list): our current backwards extremities, to backfill from """ logger.debug("backfill extrem=%s", extremities) @@ -192,13 +188,13 @@ def backfill(self, dest, room_id, limit, extremities): if not extremities: return - transaction_data = yield self.transport_layer.backfill( + transaction_data = await self.transport_layer.backfill( dest, room_id, extremities, limit ) logger.debug("backfill transaction_data=%r", transaction_data) - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) pdus = [ @@ -207,7 +203,7 @@ def backfill(self, dest, room_id, limit, extremities): ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield make_deferred_yieldable( + pdus[:] = await make_deferred_yieldable( defer.gatherResults( self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True ).addErrback(unwrapFirstError) From 0cb0c7bcd529b0dda92aff0dae6277dfcb6f6e29 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 20:41:54 +0000 Subject: [PATCH 02/18] make FederationClient.get_pdu async --- synapse/federation/federation_client.py | 32 +++++++++++++------------ 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 297292f389f2..b9c8d8e32597 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,7 +17,7 @@ import copy import itertools import logging -from typing import Dict, Iterable, List, Tuple +from typing import Dict, Iterable, List, Optional, Tuple from prometheus_client import Counter @@ -211,11 +211,14 @@ async def backfill( return pdus - @defer.inlineCallbacks - @log_function - def get_pdu( - self, destinations, event_id, room_version, outlier=False, timeout=None - ): + async def get_pdu( + self, + destinations: Iterable[str], + event_id: str, + room_version: str, + outlier: bool = False, + timeout: Optional[int] = None, + ) -> Optional[EventBase]: """Requests the PDU with given origin and ID from the remote home servers. @@ -223,18 +226,17 @@ def get_pdu( one succeeds. Args: - destinations (list): Which homeservers to query - event_id (str): event to fetch - room_version (str): version of the room - outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if + destinations: Which homeservers to query + event_id: event to fetch + room_version: version of the room + outlier: Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` - timeout (int): How long to try (in ms) each destination for before + timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. Returns: - Deferred: Results in the requested PDU, or None if we were unable to find - it. + The requested PDU, or None if we were unable to find it. """ # TODO: Rate limit the number of times we try and get the same event. @@ -255,7 +257,7 @@ def get_pdu( continue try: - transaction_data = yield self.transport_layer.get_event( + transaction_data = await self.transport_layer.get_event( destination, event_id, timeout=timeout ) @@ -275,7 +277,7 @@ def get_pdu( pdu = pdu_list[0] # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hash(room_version, pdu) + signed_pdu = await self._check_sigs_and_hash(room_version, pdu) break From d73683c363a9177951b8078dd1628c1ade8de508 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 20:42:52 +0000 Subject: [PATCH 03/18] make FederationClient.get_room_state_ids async --- synapse/federation/federation_client.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b9c8d8e32597..c3e556e7d92f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -307,15 +307,16 @@ async def get_pdu( return signed_pdu - @defer.inlineCallbacks - def get_room_state_ids(self, destination: str, room_id: str, event_id: str): + async def get_room_state_ids( + self, destination: str, room_id: str, event_id: str + ) -> Tuple[List[str], List[str]]: """Calls the /state_ids endpoint to fetch the state at a particular point in the room, and the auth events for the given event Returns: - Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids) + a tuple of (state event_ids, auth event_ids) """ - result = yield self.transport_layer.get_room_state_ids( + result = await self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id ) From 24d814ca238c40fef61c7826d1a9d9f9dd3a144d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 20:43:40 +0000 Subject: [PATCH 04/18] make FederationClient.get_event_auth async --- synapse/federation/federation_client.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c3e556e7d92f..1da33a7a64ab 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -330,19 +330,17 @@ async def get_room_state_ids( return state_event_ids, auth_event_ids - @defer.inlineCallbacks - @log_function - def get_event_auth(self, destination, room_id, event_id): - res = yield self.transport_layer.get_event_auth(destination, room_id, event_id) + async def get_event_auth(self, destination, room_id, event_id): + res = await self.transport_layer.get_event_auth(destination, room_id, event_id) - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) auth_chain = [ event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"] ] - signed_auth = yield self._check_sigs_and_hash_and_fetch( + signed_auth = await self._check_sigs_and_hash_and_fetch( destination, auth_chain, outlier=True, room_version=room_version ) From 3f11cbb40494f0ac9622a5b686d3cf8379a44b5d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 20:51:26 +0000 Subject: [PATCH 05/18] make FederationClient.make_membership_event async --- synapse/federation/federation_client.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 1da33a7a64ab..b69aac90413b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -35,6 +35,7 @@ from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, + RoomVersion, RoomVersions, ) from synapse.events import EventBase, builder, room_version_to_event_format @@ -404,7 +405,7 @@ def _try_destination_list(self, description, destinations, callback): raise SynapseError(502, "Failed to %s via any server" % (description,)) - def make_membership_event( + async def make_membership_event( self, destinations: Iterable[str], room_id: str, @@ -412,7 +413,7 @@ def make_membership_event( membership: str, content: dict, params: Dict[str, str], - ): + ) -> Tuple[str, EventBase, RoomVersion]: """ Creates an m.room.member event, with context, without participating in the room. @@ -433,19 +434,19 @@ def make_membership_event( content: Any additional data to put into the content field of the event. params: Query parameters to include in the request. - Return: - Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of + + Returns: `(origin, event, room_version)` where origin is the remote homeserver which generated the event, and room_version is the version of the room. - Fails with a `UnsupportedRoomVersionError` if remote responds with - a room version we don't understand. + Raises: + UnsupportedRoomVersionError: if remote responds with + a room version we don't understand. - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + SynapseError: if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError: if no servers were reachable. """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: @@ -491,7 +492,7 @@ def send_request(destination): return (destination, ev, room_version) - return self._try_destination_list( + return await self._try_destination_list( "make_" + membership, destinations, send_request ) From 8af9f11bea8d36188d7f8ff65f227b3cfdf9fa17 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 20:55:00 +0000 Subject: [PATCH 06/18] make FederationClient.send_join async --- synapse/federation/federation_client.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b69aac90413b..c98b27680541 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,7 +17,7 @@ import copy import itertools import logging -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple from prometheus_client import Counter @@ -496,27 +496,29 @@ def send_request(destination): "make_" + membership, destinations, send_request ) - def send_join(self, destinations, pdu, event_format_version): + async def send_join( + self, destinations: Iterable[str], pdu: EventBase, event_format_version: int + ) -> Dict[str, Any]: """Sends a join event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, and send the event out to the rest of the federation. Args: - destinations (str): Candidate homeservers which are probably + destinations: Candidate homeservers which are probably participating in the room. - pdu (BaseEvent): event to be sent - event_format_version (int): The event format version + pdu: event to be sent + event_format_version: The event format version - Return: - Deferred: resolves to a dict with members ``origin`` (a string + Returns: + a dict with members ``origin`` (a string giving the serer the event was sent to, ``state`` (?) and ``auth_chain``. - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + Raises: + SynapseError: if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError: if no servers were reachable. """ def check_authchain_validity(signed_auth_chain): @@ -603,7 +605,7 @@ def send_request(destination): "origin": destination, } - return self._try_destination_list("send_join", destinations, send_request) + return await self._try_destination_list("send_join", destinations, send_request) @defer.inlineCallbacks def _do_send_join(self, destination, pdu): From a46fabf17bca51eb3a73feb5b3de4072030033e4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 20:55:11 +0000 Subject: [PATCH 07/18] make FederationClient.send_leave async --- synapse/federation/federation_client.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c98b27680541..8d33d27137d4 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -730,7 +730,7 @@ def _do_send_invite(self, destination, pdu, room_version): ) return content - def send_leave(self, destinations, pdu): + async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None: """Sends a leave event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, @@ -739,17 +739,14 @@ def send_leave(self, destinations, pdu): This is mostly useful to reject received invites. Args: - destinations (str): Candidate homeservers which are probably + destinations: Candidate homeservers which are probably participating in the room. - pdu (BaseEvent): event to be sent - - Return: - Deferred: resolves to None. + pdu: event to be sent - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + Raises: + SynapseError if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError if no servers were reachable. """ @defer.inlineCallbacks @@ -759,7 +756,9 @@ def send_request(destination): logger.debug("Got content: %s", content) return None - return self._try_destination_list("send_leave", destinations, send_request) + return await self._try_destination_list( + "send_leave", destinations, send_request + ) @defer.inlineCallbacks def _do_send_leave(self, destination, pdu): From 1330c311b79bc4a9b4d3349e72a2353bb54dcd90 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 20:59:10 +0000 Subject: [PATCH 08/18] make FederationClient._try_destination_list async --- synapse/federation/federation_client.py | 36 ++++++++++++++++++------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8d33d27137d4..11802dad0f31 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,7 +17,17 @@ import copy import itertools import logging -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Tuple, + TypeVar, +) from prometheus_client import Counter @@ -53,6 +63,8 @@ PDU_RETRY_TIME_MS = 1 * 60 * 1000 +T = TypeVar("T") + class InvalidResponseError(RuntimeError): """Helper for _try_destination_list: indicates that the server returned a response @@ -349,17 +361,21 @@ async def get_event_auth(self, destination, room_id, event_id): return signed_auth - @defer.inlineCallbacks - def _try_destination_list(self, description, destinations, callback): + async def _try_destination_list( + self, + description: str, + destinations: Iterable[str], + callback: Callable[[str], Awaitable[T]], + ) -> T: """Try an operation on a series of servers, until it succeeds Args: - description (unicode): description of the operation we're doing, for logging + description: description of the operation we're doing, for logging - destinations (Iterable[unicode]): list of server_names to try + destinations: list of server_names to try - callback (callable): Function to run for each server. Passed a single - argument: the server_name to try. May return a deferred. + callback: Function to run for each server. Passed a single + argument: the server_name to try. If the callback raises a CodeMessageException with a 300/400 code, attempts to perform the operation stop immediately and the exception is @@ -370,7 +386,7 @@ def _try_destination_list(self, description, destinations, callback): suppressed if the exception is an InvalidResponseError. Returns: - The [Deferred] result of callback, if it succeeds + The result of callback, if it succeeds Raises: SynapseError if the chosen remote server returns a 300/400 code, or @@ -381,7 +397,7 @@ def _try_destination_list(self, description, destinations, callback): continue try: - res = yield callback(destination) + res = await callback(destination) return res except InvalidResponseError as e: logger.warning("Failed to %s via %s: %s", description, destination, e) @@ -400,7 +416,7 @@ def _try_destination_list(self, description, destinations, callback): ) except Exception: logger.warning( - "Failed to %s via %s", description, destination, exc_info=1 + "Failed to %s via %s", description, destination, exc_info=True ) raise SynapseError(502, "Failed to %s via any server" % (description,)) From ad09ee92622e0d37502fb49336f1e1474af458df Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 21:07:13 +0000 Subject: [PATCH 09/18] make FederationClient.make_membership_event.send_request async --- synapse/federation/federation_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 11802dad0f31..b59d08c4ae81 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -471,9 +471,8 @@ async def make_membership_event( % (membership, ",".join(valid_memberships)) ) - @defer.inlineCallbacks - def send_request(destination): - ret = yield self.transport_layer.make_membership_event( + async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]: + ret = await self.transport_layer.make_membership_event( destination, room_id, user_id, membership, params ) @@ -506,7 +505,7 @@ def send_request(destination): event_dict=pdu_dict, ) - return (destination, ev, room_version) + return destination, ev, room_version return await self._try_destination_list( "make_" + membership, destinations, send_request From 3960527c2e1d9bbeb2f0f7b6218de06cd32bdcb4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 21:07:38 +0000 Subject: [PATCH 10/18] make FederationClient.send_join.send_request async --- synapse/federation/federation_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b59d08c4ae81..8ca36c4d32f7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -553,9 +553,8 @@ def check_authchain_validity(signed_auth_chain): "room appears to have unsupported version %s" % (room_version,) ) - @defer.inlineCallbacks - def send_request(destination): - content = yield self._do_send_join(destination, pdu) + async def send_request(destination) -> Dict[str, Any]: + content = await self._do_send_join(destination, pdu) logger.debug("Got content: %s", content) @@ -584,7 +583,7 @@ def send_request(destination): # invalid, and it would fail auth checks anyway. raise SynapseError(400, "No create event in state") - valid_pdus = yield self._check_sigs_and_hash_and_fetch( + valid_pdus = await self._check_sigs_and_hash_and_fetch( destination, list(pdus.values()), outlier=True, From 638001116de3e472d916f8e809e15c658bab282a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 21:08:24 +0000 Subject: [PATCH 11/18] make FederationClient._do_send_join async --- synapse/federation/federation_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8ca36c4d32f7..b4609b78cb45 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -621,12 +621,11 @@ async def send_request(destination) -> Dict[str, Any]: return await self._try_destination_list("send_join", destinations, send_request) - @defer.inlineCallbacks - def _do_send_join(self, destination, pdu): + async def _do_send_join(self, destination: str, pdu: EventBase): time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_join_v2( + content = await self.transport_layer.send_join_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -648,7 +647,7 @@ def _do_send_join(self, destination, pdu): logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") - resp = yield self.transport_layer.send_join_v1( + resp = await self.transport_layer.send_join_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, From e88b90aaebd19822e310b708696036dc0b1f17f6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 21:08:51 +0000 Subject: [PATCH 12/18] make FederationClient.send_leave.send_request async --- synapse/federation/federation_client.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b4609b78cb45..f98c36039d49 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -762,12 +762,9 @@ async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None: RuntimeError if no servers were reachable. """ - @defer.inlineCallbacks - def send_request(destination): - content = yield self._do_send_leave(destination, pdu) - + async def send_request(destination: str) -> None: + content = await self._do_send_leave(destination, pdu) logger.debug("Got content: %s", content) - return None return await self._try_destination_list( "send_leave", destinations, send_request From abadf44eb2c32c9a6f1da84239043755f854b327 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 21:09:07 +0000 Subject: [PATCH 13/18] make FederationClient._do_send_leave async --- synapse/federation/federation_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f98c36039d49..5043220d1489 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -770,12 +770,11 @@ async def send_request(destination: str) -> None: "send_leave", destinations, send_request ) - @defer.inlineCallbacks - def _do_send_leave(self, destination, pdu): + async def _do_send_leave(self, destination, pdu): time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_leave_v2( + content = await self.transport_layer.send_leave_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -797,7 +796,7 @@ def _do_send_leave(self, destination, pdu): logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API") - resp = yield self.transport_layer.send_leave_v1( + resp = await self.transport_layer.send_leave_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, From 6deeefb68cbd2bc54d8e108b16e6427454644019 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 21:14:30 +0000 Subject: [PATCH 14/18] make FederationClient.get_missing_events async --- synapse/federation/federation_client.py | 40 ++++++++++++------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5043220d1489..1e9b207a2496 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,6 +25,7 @@ Iterable, List, Optional, + Sequence, Tuple, TypeVar, ) @@ -828,34 +829,33 @@ def get_public_rooms( third_party_instance_id=third_party_instance_id, ) - @defer.inlineCallbacks - def get_missing_events( + async def get_missing_events( self, - destination, - room_id, - earliest_events_ids, - latest_events, - limit, - min_depth, - timeout, - ): + destination: str, + room_id: str, + earliest_events_ids: Sequence[str], + latest_events: Iterable[EventBase], + limit: int, + min_depth: int, + timeout: int, + ) -> List[EventBase]: """Tries to fetch events we are missing. This is called when we receive an event without having received all of its ancestors. Args: - destination (str) - room_id (str) - earliest_events_ids (list): List of event ids. Effectively the + destination + room_id + earliest_events_ids: List of event ids. Effectively the events we expected to receive, but haven't. `get_missing_events` should only return events that didn't happen before these. - latest_events (list): List of events we have received that we don't + latest_events: List of events we have received that we don't have all previous events for. - limit (int): Maximum number of events to return. - min_depth (int): Minimum depth of events tor return. - timeout (int): Max time to wait in ms + limit: Maximum number of events to return. + min_depth: Minimum depth of events to return. + timeout: Max time to wait in ms """ try: - content = yield self.transport_layer.get_missing_events( + content = await self.transport_layer.get_missing_events( destination=destination, room_id=room_id, earliest_events=earliest_events_ids, @@ -865,14 +865,14 @@ def get_missing_events( timeout=timeout, ) - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) events = [ event_from_pdu_json(e, format_ver) for e in content.get("events", []) ] - signed_events = yield self._check_sigs_and_hash_and_fetch( + signed_events = await self._check_sigs_and_hash_and_fetch( destination, events, outlier=False, room_version=room_version ) except HttpResponseException as e: From 4b4536dd02cc1128335383b6cc36afc1f0f6d71c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 21:15:08 +0000 Subject: [PATCH 15/18] newsfile --- changelog.d/6840.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6840.misc diff --git a/changelog.d/6840.misc b/changelog.d/6840.misc new file mode 100644 index 000000000000..0496f12de801 --- /dev/null +++ b/changelog.d/6840.misc @@ -0,0 +1 @@ +Port much of `synapse.handlers.federation` to async/await. From ea23210b2dcc75489d9369b13636a56b7449d765 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 3 Feb 2020 22:29:49 +0000 Subject: [PATCH 16/18] make FederationClient.send_invite async --- synapse/federation/federation_client.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 1e9b207a2496..51f9b1d8d7ba 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -659,23 +659,22 @@ async def _do_send_join(self, destination: str, pdu: EventBase): # content. return resp[1] - @defer.inlineCallbacks - def send_invite(self, destination, room_id, event_id, pdu): - room_version = yield self.store.get_room_version_id(room_id) + async def send_invite(self, destination, room_id, event_id, pdu): + room_version = await self.store.get_room_version_id(room_id) - content = yield self._do_send_invite(destination, pdu, room_version) + content = await self._do_send_invite(destination, pdu, room_version) pdu_dict = content["event"] logger.debug("Got response to send_invite: %s", pdu_dict) - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) format_ver = room_version_to_event_format(room_version) pdu = event_from_pdu_json(pdu_dict, format_ver) # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version, pdu) # FIXME: We should handle signature failures more gracefully. From 146fec08208144c8566da5ab6c2683229d162c1a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 5 Feb 2020 15:47:00 +0000 Subject: [PATCH 17/18] Apply suggestions from code review Co-Authored-By: Erik Johnston --- synapse/federation/federation_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 51f9b1d8d7ba..b4525d28c2b6 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -528,7 +528,7 @@ async def send_join( Returns: a dict with members ``origin`` (a string - giving the serer the event was sent to, ``state`` (?) and + giving the server the event was sent to, ``state`` (?) and ``auth_chain``. Raises: @@ -659,7 +659,9 @@ async def _do_send_join(self, destination: str, pdu: EventBase): # content. return resp[1] - async def send_invite(self, destination, room_id, event_id, pdu): + async def send_invite( + self, destination: str, room_id: str, event_id: str, pdu: EventBase, + ) -> EventBase: room_version = await self.store.get_room_version_id(room_id) content = await self._do_send_invite(destination, pdu, room_version) From 6bbd890f05b59ffa33769ae15ea73bdf3c8dc908 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 5 Feb 2020 15:49:42 +0000 Subject: [PATCH 18/18] make FederationClient._do_send_invite async --- synapse/federation/federation_client.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b4525d28c2b6..3a840e068bd2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -53,6 +53,7 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.logging.context import make_deferred_yieldable from synapse.logging.utils import log_function +from synapse.types import JsonDict from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -682,23 +683,19 @@ async def send_invite( return pdu - @defer.inlineCallbacks - def _do_send_invite(self, destination, pdu, room_version): + async def _do_send_invite( + self, destination: str, pdu: EventBase, room_version: str + ) -> JsonDict: """Actually sends the invite, first trying v2 API and falling back to v1 API if necessary. - Args: - destination (str): Target server - pdu (FrozenEvent) - room_version (str) - Returns: - dict: The event as a dict as returned by the remote server + The event as a dict as returned by the remote server """ time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_invite_v2( + content = await self.transport_layer.send_invite_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -737,7 +734,7 @@ def _do_send_invite(self, destination, pdu, room_version): # Didn't work, try v1 API. # Note the v1 API returns a tuple of `(200, content)` - _, content = yield self.transport_layer.send_invite_v1( + _, content = await self.transport_layer.send_invite_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id,