diff --git a/changelog.d/9042.feature b/changelog.d/9042.feature new file mode 100644 index 000000000000..4ec319f1f291 --- /dev/null +++ b/changelog.d/9042.feature @@ -0,0 +1 @@ +Add experimental support for handling and persistence of to-device messages to happen on worker processes. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 35e345ce7077..e5339aca239d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random from typing import ( TYPE_CHECKING, Any, @@ -860,8 +861,10 @@ def __init__(self, hs: "HomeServer"): ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]] self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]] - # Map from type to instance name that we should route EDU handling to. - self._edu_type_to_instance = {} # type: Dict[str, str] + # Map from type to instance names that we should route EDU handling to. + # We randomly choose one instance from the list to route to for each new + # EDU received. + self._edu_type_to_instance = {} # type: Dict[str, List[str]] def register_edu_handler( self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] @@ -905,7 +908,12 @@ def register_query_handler( def register_instance_for_edu(self, edu_type: str, instance_name: str): """Register that the EDU handler is on a different instance than master. """ - self._edu_type_to_instance[edu_type] = instance_name + self._edu_type_to_instance[edu_type] = [instance_name] + + def register_instances_for_edu(self, edu_type: str, instance_names: List[str]): + """Register that the EDU handler is on multiple instances. + """ + self._edu_type_to_instance[edu_type] = instance_names async def on_edu(self, edu_type: str, origin: str, content: dict): if not self.config.use_presence and edu_type == "m.presence": @@ -924,8 +932,11 @@ async def on_edu(self, edu_type: str, origin: str, content: dict): return # Check if we can route it somewhere else that isn't us - route_to = self._edu_type_to_instance.get(edu_type, "master") - if route_to != self._instance_name: + instances = self._edu_type_to_instance.get(edu_type, ["master"]) + if self._instance_name not in instances: + # Pick an instance randomly so that we don't overload one. + route_to = random.choice(instances) + try: await self._send_edu( instance_name=route_to,