Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support routing edu's to multiple instances #9042

Merged
merged 6 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9042.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for handling and persistence of to-device messages to happen on worker processes.
21 changes: 16 additions & 5 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -860,8 +861,10 @@ def __init__(self, hs: "HomeServer"):
) # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]]

# Map from type to instance name that we should route EDU handling to.
self._edu_type_to_instance = {} # type: Dict[str, str]
# Map from type to instance names that we should route EDU handling to.
# We randomly choose one instance from the list to route to for each new
# EDU received.
self._edu_type_to_instance = {} # type: Dict[str, List[str]]

def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
Expand Down Expand Up @@ -905,7 +908,12 @@ def register_query_handler(
def register_instance_for_edu(self, edu_type: str, instance_name: str):
"""Register that the EDU handler is on a different instance than master.
"""
self._edu_type_to_instance[edu_type] = instance_name
self._edu_type_to_instance[edu_type] = [instance_name]

def register_instances_for_edu(self, edu_type: str, instance_names: List[str]):
"""Register that the EDU handler is on multiple instances.
"""
self._edu_type_to_instance[edu_type] = instance_names

async def on_edu(self, edu_type: str, origin: str, content: dict):
if not self.config.use_presence and edu_type == "m.presence":
Expand All @@ -924,8 +932,11 @@ async def on_edu(self, edu_type: str, origin: str, content: dict):
return

# Check if we can route it somewhere else that isn't us
route_to = self._edu_type_to_instance.get(edu_type, "master")
if route_to != self._instance_name:
instances = self._edu_type_to_instance.get(edu_type, ["master"])
if self._instance_name not in instances:
# Pick an instance randomly so that we don't overload one.
route_to = random.choice(instances)

try:
await self._send_edu(
instance_name=route_to,
Expand Down