Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Clean up ShardedWorkerHandlingConfig (#9466)
Browse files Browse the repository at this point in the history
* Split ShardedWorkerHandlingConfig

This is so that we have a type level understanding of when it is safe to
call `get_instance(..)` (as opposed to `should_handle(..)`).

* Remove special cases in ShardedWorkerHandlingConfig.

`ShardedWorkerHandlingConfig` tried to handle the various different ways
it was possible to configure federation senders and pushers. This led to
special cases that weren't hit during testing.

To fix this the handling of the different cases is moved from there and
`generic_worker` into the worker config class. This allows us to have
the logic in one place and allows the rest of the code to ignore the
different cases.
  • Loading branch information
erikjohnston authored Feb 24, 2021
1 parent 0b5c967 commit 2927921
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 63 deletions.
1 change: 1 addition & 0 deletions changelog.d/9466.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix deleting pushers when using sharded pushers.
2 changes: 2 additions & 0 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ def start(config_options):
config.update_user_directory = False
config.run_background_tasks = False
config.start_pushers = False
config.pusher_shard_config.instances = []
config.send_federation = False
config.federation_shard_config.instances = []

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

Expand Down
32 changes: 0 additions & 32 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,22 +919,6 @@ def start(config_options):
# For other worker types we force this to off.
config.appservice.notify_appservices = False

if config.worker_app == "synapse.app.pusher":
if config.server.start_pushers:
sys.stderr.write(
"\nThe pushers must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``start_pushers: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.server.start_pushers = True
else:
# For other worker types we force this to off.
config.server.start_pushers = False

if config.worker_app == "synapse.app.user_dir":
if config.server.update_user_directory:
sys.stderr.write(
Expand All @@ -951,22 +935,6 @@ def start(config_options):
# For other worker types we force this to off.
config.server.update_user_directory = False

if config.worker_app == "synapse.app.federation_sender":
if config.worker.send_federation:
sys.stderr.write(
"\nThe send_federation must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``send_federation: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.worker.send_federation = True
else:
# For other worker types we force this to off.
config.worker.send_federation = False

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

hs = GenericWorkerServer(
Expand Down
36 changes: 27 additions & 9 deletions synapse/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,22 +844,23 @@ class ShardedWorkerHandlingConfig:

def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key."""
# If multiple instances are not defined we always return true
if not self.instances or len(self.instances) == 1:
return True
# If no instances are defined we assume some other worker is handling
# this.
if not self.instances:
return False

return self.get_instance(key) == instance_name
return self._get_instance(key) == instance_name

def get_instance(self, key: str) -> str:
def _get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.
Note: For things like federation sending the config for which instance
is sending is known only to the sender instance if there is only one.
Therefore `should_handle` should be used where possible.
Note: For federation sending and pushers the config for which instance
is sending is known only to the sender instance, so we don't expose this
method by default.
"""

if not self.instances:
return "master"
raise Exception("Unknown worker")

if len(self.instances) == 1:
return self.instances[0]
Expand All @@ -876,4 +877,21 @@ def get_instance(self, key: str) -> str:
return self.instances[remainder]


@attr.s
class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
"""A version of `ShardedWorkerHandlingConfig` that is used for config
options where all instances know which instances are responsible for the
sharded work.
"""

def __attrs_post_init__(self):
# We require that `self.instances` is non-empty.
if not self.instances:
raise Exception("Got empty list of instances for shard config")

def get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key."""
return self._get_instance(key)


__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
2 changes: 2 additions & 0 deletions synapse/config/_base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,6 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...

class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
def get_instance(self, key: str) -> str: ...
5 changes: 1 addition & 4 deletions synapse/config/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._base import Config, ShardedWorkerHandlingConfig
from ._base import Config


class PushConfig(Config):
Expand All @@ -27,9 +27,6 @@ def read_config(self, config, **kwargs):
"group_unread_count_by_room", True
)

pusher_instances = config.get("pusher_instances") or []
self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)

# There was a a 'redact_content' setting but mistakenly read from the
# 'email'section'. Check for the flag in the 'push' section, and log,
# but do not honour it to avoid nasty surprises when people upgrade.
Expand Down
1 change: 0 additions & 1 deletion synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ def read_config(self, config, **kwargs):
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
self.start_pushers = config.get("start_pushers", True)

# (undocumented) option for torturing the worker-mode replication a bit,
# for testing. The value defines the number of milliseconds to pause before
Expand Down
93 changes: 86 additions & 7 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,28 @@

import attr

from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from ._base import (
Config,
ConfigError,
RoutableShardedWorkerHandlingConfig,
ShardedWorkerHandlingConfig,
)
from .server import ListenerConfig, parse_listener_def

_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
The send_federation config option must be disabled in the main
synapse process before they can be run in a separate worker.
Please add ``send_federation: false`` to the main config
"""

_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
The start_pushers config option must be disabled in the main
synapse process before they can be run in a separate worker.
Please add ``start_pushers: false`` to the main config
"""


def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
Expand Down Expand Up @@ -103,6 +122,7 @@ def read_config(self, config, **kwargs):
self.worker_replication_secret = config.get("worker_replication_secret", None)

self.worker_name = config.get("worker_name", self.worker_app)
self.instance_name = self.worker_name or "master"

self.worker_main_http_uri = config.get("worker_main_http_uri", None)

Expand All @@ -118,12 +138,41 @@ def read_config(self, config, **kwargs):
)
)

# Whether to send federation traffic out in this process. This only
# applies to some federation traffic, and so shouldn't be used to
# "disable" federation
self.send_federation = config.get("send_federation", True)
# Handle federation sender configuration.
#
# There are two ways of configuring which instances handle federation
# sending:
# 1. The old way where "send_federation" is set to false and running a
# `synapse.app.federation_sender` worker app.
# 2. Specifying the workers sending federation in
# `federation_sender_instances`.
#

send_federation = config.get("send_federation", True)

federation_sender_instances = config.get("federation_sender_instances")
if federation_sender_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
federation_sender_instances = []

federation_sender_instances = config.get("federation_sender_instances") or []
# If no federation sender instances are set we check if
# `send_federation` is set, which means use master
if send_federation:
federation_sender_instances = ["master"]

if self.worker_app == "synapse.app.federation_sender":
if send_federation:
# If we're running federation senders, and not using
# `federation_sender_instances`, then we should have
# explicitly set `send_federation` to false.
raise ConfigError(
_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
)

federation_sender_instances = [self.worker_name]

self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
Expand Down Expand Up @@ -164,7 +213,37 @@ def read_config(self, config, **kwargs):
"Must only specify one instance to handle `receipts` messages."
)

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
if len(self.writers.events) == 0:
raise ConfigError("Must specify at least one instance to handle `events`.")

self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)

# Handle sharded push
start_pushers = config.get("start_pushers", True)
pusher_instances = config.get("pusher_instances")
if pusher_instances is None:
# Default to an empty list, which means "another, unknown, worker is
# responsible for it".
pusher_instances = []

# If no pushers instances are set we check if `start_pushers` is
# set, which means use master
if start_pushers:
pusher_instances = ["master"]

if self.worker_app == "synapse.app.pusher":
if start_pushers:
# If we're running pushers, and not using
# `pusher_instances`, then we should have explicitly set
# `start_pushers` to false.
raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)

pusher_instances = [self.instance_name]

self.start_pushers = self.instance_name in pusher_instances
self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)

# Whether this worker should run background tasks or not.
#
Expand Down
4 changes: 3 additions & 1 deletion synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class PusherPool:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.pusher_factory = PusherFactory(hs)
self._should_start_pushers = hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()

Expand All @@ -68,6 +67,9 @@ def __init__(self, hs: "HomeServer"):
# We shard the handling of push notifications by user ID.
self._pusher_shard_config = hs.config.push.pusher_shard_config
self._instance_name = hs.get_instance_name()
self._should_start_pushers = (
self._instance_name in self._pusher_shard_config.instances
)

# We can only delete pushers on master.
self._remove_pusher_client = None
Expand Down
7 changes: 2 additions & 5 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def __init__(
self.start_time = None # type: Optional[int]

self._instance_id = random_string(5)
self._instance_name = config.worker_name or "master"
self._instance_name = config.worker.instance_name

self.version_string = version_string

Expand Down Expand Up @@ -760,7 +760,4 @@ def get_outbound_redis_connection(self) -> Optional["RedisProtocol"]:

def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
return self.config.send_federation and (
not self.config.worker_app
or self.config.worker_app == "synapse.app.federation_sender"
)
return self.config.send_federation
2 changes: 1 addition & 1 deletion tests/replication/tcp/streams/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _get_worker_hs_config(self) -> dict:
# enable federation sending on the worker
config = super()._get_worker_hs_config()
# TODO: make it so we don't need both of these
config["send_federation"] = True
config["send_federation"] = False
config["worker_app"] = "synapse.app.federation_sender"
return config

Expand Down
2 changes: 1 addition & 1 deletion tests/replication/test_federation_ack.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class FederationAckTestCase(HomeserverTestCase):
def default_config(self) -> dict:
config = super().default_config()
config["worker_app"] = "synapse.app.federation_sender"
config["send_federation"] = True
config["send_federation"] = False
return config

def make_homeserver(self, reactor, clock):
Expand Down
2 changes: 1 addition & 1 deletion tests/replication/test_federation_sender_shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_send_event_single_sender(self):

self.make_worker_hs(
"synapse.app.federation_sender",
{"send_federation": True},
{"send_federation": False},
federation_http_client=mock_client,
)

Expand Down
2 changes: 1 addition & 1 deletion tests/replication/test_pusher_shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_send_push_single_worker(self):

self.make_worker_hs(
"synapse.app.pusher",
{"start_pushers": True},
{"start_pushers": False},
proxied_blacklisted_http_client=http_client_mock,
)

Expand Down

0 comments on commit 2927921

Please sign in to comment.