Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow running sendToDevice on workers #9044

Merged
merged 25 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8f7e6b7
Refactor add_messages_to_device_inbox
erikjohnston Jan 7, 2021
c132b39
Support routing edu's to multiple instances
erikjohnston Jan 7, 2021
691c373
Newsfile
erikjohnston Jan 7, 2021
124f415
Support resync clients off master
erikjohnston Jan 7, 2021
e81d693
Newsfile
erikjohnston Jan 7, 2021
40d9869
Only define _last_device_delete_cache once
erikjohnston Jan 7, 2021
6da60bf
Newsfile
erikjohnston Jan 7, 2021
7386f09
Merge branch 'erikj/resync_off_master' into erikj/split_to_device_sen…
erikjohnston Jan 7, 2021
08a4f88
Merge branch 'erikj/allow_routing_edus' into erikj/split_to_device_se…
erikjohnston Jan 7, 2021
1236ac3
Use MultiWriterIDGenerator for device inbox
erikjohnston Jan 7, 2021
248832e
Allow configuring which workers handle to_device messages
erikjohnston Jan 7, 2021
811cb59
Move DB write functions to worker store
erikjohnston Jan 7, 2021
eb6121a
Assert writing to device inbox only happens on appropriate workers
erikjohnston Jan 7, 2021
7d43cb7
Wire up SendToDeviceRestServlet to work on workers
erikjohnston Jan 7, 2021
5420f01
Newsfile
erikjohnston Jan 7, 2021
36f0d2b
Merge remote-tracking branch 'origin/develop' into erikj/split_to_dev…
erikjohnston Jan 7, 2021
9a91c2d
Ensure you can only config one instance to handle to device messages,…
erikjohnston Jan 7, 2021
82286cc
Merge remote-tracking branch 'origin/develop' into erikj/split_to_dev…
erikjohnston Jan 7, 2021
d371d96
Update changelog
erikjohnston Jan 7, 2021
fc38115
Fix port script to handle new sequences
erikjohnston Jan 7, 2021
2667875
Remove redundant '_device_inbox_id_gen'
erikjohnston Jan 7, 2021
171f360
Expand comment on federation_sender
erikjohnston Jan 7, 2021
867a21f
Remove spurious changelog
erikjohnston Jan 7, 2021
07f2d9a
Fixup comment
erikjohnston Jan 7, 2021
14624e7
Fix newsfile
erikjohnston Jan 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9042.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support routing edu's to multiple instances.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably lost?

1 change: 1 addition & 0 deletions changelog.d/9043.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow handling `/sendToDevice` API endpoints on workers.
1 change: 1 addition & 0 deletions changelog.d/9044.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow handling `/sendToDevice` API endpoints on workers.
3 changes: 3 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
)
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
Expand Down Expand Up @@ -520,6 +521,8 @@ def _listen_http(self, listener_config: ListenerConfig):
room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)

SendToDeviceRestServlet(self).register(resource)

user_directory.register_servlets(self, resource)

# If presence is disabled, use the stub servlet that does
Expand Down
10 changes: 9 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class WriterLocations:
default=["master"], type=List[str], converter=_instance_to_list_converter
)
typing = attr.ib(default="master", type=str)
to_device = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter,
)


class WorkerConfig(Config):
Expand Down Expand Up @@ -124,7 +127,7 @@ def read_config(self, config, **kwargs):

# Check that the configured writers for events and typing also appears in
# `instance_map`.
for stream in ("events", "typing"):
for stream in ("events", "typing", "to_device"):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
if instance != "master" and instance not in self.instance_map:
Expand All @@ -133,6 +136,11 @@ def read_config(self, config, **kwargs):
% (instance, stream)
)

if len(self.writers.to_device) != 1:
raise ConfigError(
"Must only specify one instance to handle `to_device` messages."
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

# Whether this worker should run background tasks or not.
Expand Down
17 changes: 13 additions & 4 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -861,7 +862,7 @@ def __init__(self, hs: "HomeServer"):
self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]]

# Map from type to instance name that we should route EDU handling to.
self._edu_type_to_instance = {} # type: Dict[str, str]
self._edu_type_to_instance = {} # type: Dict[str, List[str]]

def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
Expand Down Expand Up @@ -905,7 +906,12 @@ def register_query_handler(
def register_instance_for_edu(self, edu_type: str, instance_name: str):
"""Register that the EDU handler is on a different instance than master.
"""
self._edu_type_to_instance[edu_type] = instance_name
self._edu_type_to_instance[edu_type] = [instance_name]

def register_instances_for_edu(self, edu_type: str, instance_names: List[str]):
"""Register that the EDU handler is on multiple instances.
"""
self._edu_type_to_instance[edu_type] = instance_names

async def on_edu(self, edu_type: str, origin: str, content: dict):
if not self.config.use_presence and edu_type == "m.presence":
Expand All @@ -924,8 +930,11 @@ async def on_edu(self, edu_type: str, origin: str, content: dict):
return

# Check if we can route it somewhere else that isn't us
route_to = self._edu_type_to_instance.get(edu_type, "master")
if route_to != self._instance_name:
instances = self._edu_type_to_instance.get(edu_type, ["master"])
if self._instance_name not in instances:
# Pick an instance randomly so that we don't overload one.
route_to = random.choice(instances)

try:
await self._send_edu(
instance_name=route_to,
Expand Down
43 changes: 31 additions & 12 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
set_tag,
start_active_span,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.stringutils import random_string
Expand All @@ -44,13 +45,32 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()

hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
# We only need to poke the federation sender explicitly if its on the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# We only need to poke the federation sender explicitly if its on the
# We only need to poke the federation sender explicitly if it's on the

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... how does it work if it's on a different instance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded the comment to:

We only need to poke the federation sender explicitly if its on the
same instance. Other federation sender instances will get notified by
the generic_worker when it sees it in the to-device replication
stream.

# same instance.
self.federation_sender = None
if hs.should_send_federation():
self.federation_sender = hs.get_federation_sender()

# If we can handle the to device EDUs we do so, otherwise we route them
# to the appropriate worker.
if hs.get_instance_name() in hs.config.worker.writers.to_device:
hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
else:
hs.get_federation_registry().register_instances_for_edu(
"m.direct_to_device", hs.config.worker.writers.to_device,
)

self._device_list_updater = hs.get_device_handler().device_list_updater
if hs.config.worker.worker_app is None:
self._user_device_resync = (
hs.get_device_handler().device_list_updater.user_device_resync
)
else:
self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client(
hs
)

async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
local_messages = {}
Expand Down Expand Up @@ -138,9 +158,7 @@ async def _check_for_unknown_devices(
await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)

# Immediately attempt a resync in the background
run_in_background(
self._device_list_updater.user_device_resync, sender_user_id
)
run_in_background(self._user_device_resync, sender_user_id)

async def send_device_message(
self,
Expand Down Expand Up @@ -195,7 +213,8 @@ async def send_device_message(
)

log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation.send_device_messages(destination)
if self.federation_sender:
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation_sender.send_device_messages(destination)
32 changes: 1 addition & 31 deletions synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,8 @@
# limitations under the License.

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache


class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._device_inbox_id_gen = SlavedIdTracker(
db_conn, "device_inbox", "stream_id"
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
self._device_inbox_id_gen.get_current_token(),
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
self._device_inbox_id_gen.get_current_token(),
)

def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
row.entity, token
)
else:
self._device_federation_outbox_stream_cache.entity_has_changed(
row.entity, token
)
return super().process_replication_rows(stream_name, instance_name, token, rows)
pass
9 changes: 9 additions & 0 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
EventsStream,
FederationStream,
Stream,
ToDeviceStream,
TypingStream,
)

Expand Down Expand Up @@ -115,6 +116,14 @@ def __init__(self, hs):

continue

if isinstance(stream, ToDeviceStream):
# Only add ToDeviceStream as a source on instances in charge of
# sending to device messages.
if hs.get_instance_name() in hs.config.worker.writers.to_device:
self._streams_to_replicate.append(stream)

continue

if isinstance(stream, TypingStream):
# Only add TypingStream as a source on the instance in charge of
# typing.
Expand Down
30 changes: 0 additions & 30 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,36 +189,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
prefilled_cache=presence_cache_prefill,
)

max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_inbox",
entity_column="user_id",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_inbox_stream_cache = StreamChangeCache(
"DeviceInboxStreamChangeCache",
min_device_inbox_id,
prefilled_cache=device_inbox_prefill,
)
# The federation outbox and the local device inbox uses the same
# stream_id generator.
device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_federation_outbox",
entity_column="destination",
stream_column="stream_id",
max_value=max_device_inbox_id,
limit=1000,
)
self._device_federation_outbox_stream_cache = StreamChangeCache(
"DeviceFederationOutboxStreamChangeCache",
min_device_outbox_id,
prefilled_cache=device_outbox_prefill,
)

device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache", device_list_max
Expand Down
Loading