Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace the synapse.app.appservice and synapse.app.user_dir worker types with more flexible config options #10616

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10616.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow a single worker to both notify appservices and update the user directory by removing the dependence on the `worker_type` worker config option.
16 changes: 14 additions & 2 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2761,11 +2761,23 @@ opentracing:
# events: worker1
# typing: worker1

# The worker that is used to run background tasks (e.g. cleaning up expired
# data). If not provided this defaults to the main process.
# The name of the worker that is used to run background tasks (e.g. cleaning
# up expired data). If not provided this defaults to the main process.
#
#run_background_tasks_on: worker1

# The name of the worker that is used to notify application services of new
# traffic within their configured namespace. If not provided this defaults
# to the main process.
#
#notify_appservices_from_worker: worker2

# The name of the worker that is used to update the user directory tables as
# users are created, update their room memberships as well as their profiles.
# If not provided this defaults to the main process.
#
#update_user_directory_on_worker: worker3

# A shared secret used by the replication APIs to authenticate HTTP requests
# from workers.
#
Expand Down
2 changes: 1 addition & 1 deletion synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def start(config_options):
config.no_redirect_stdio = True

# Explicitly disable background processes
config.update_user_directory = False
config.worker.should_update_user_directory = False
config.run_background_tasks = False
config.start_pushers = False
config.pusher_shard_config.instances = []
Expand Down
32 changes: 0 additions & 32 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,38 +427,6 @@ def start(config_options):
"synapse.app.user_dir",
)

if config.worker_app == "synapse.app.appservice":
if config.appservice.notify_appservices:
sys.stderr.write(
"\nThe appservices must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``notify_appservices: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the appservice to start since they will be disabled in the main config
config.appservice.notify_appservices = True
else:
# For other worker types we force this to off.
config.appservice.notify_appservices = False

if config.worker_app == "synapse.app.user_dir":
if config.server.update_user_directory:
sys.stderr.write(
"\nThe update_user_directory must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``update_user_directory: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.server.update_user_directory = True
else:
# For other worker types we force this to off.
config.server.update_user_directory = False

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage

Expand Down
1 change: 0 additions & 1 deletion synapse/config/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class AppServiceConfig(Config):

def read_config(self, config, **kwargs):
self.app_service_config_files = config.get("app_service_config_files", [])
self.notify_appservices = config.get("notify_appservices", True)
self.track_appservice_user_ips = config.get("track_appservice_user_ips", False)

def generate_config_section(cls, **kwargs):
Expand Down
4 changes: 0 additions & 4 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,6 @@ def read_config(self, config, **kwargs):
self.presence_router_config,
) = load_module(presence_router_config, ("presence", "presence_router"))

# Whether to update the user directory or not. This should be set to
# false only if we are updating the user directory in a worker
self.update_user_directory = config.get("update_user_directory", True)

# whether to enable the media repository endpoints. This should be set
# to false if the media repository is running as a separate endpoint;
# doing so ensures that we will not run cache cleanup jobs on the
Expand Down
46 changes: 42 additions & 4 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,39 @@ def read_config(self, config, **kwargs):
# be able to run on only a single instance (meaning that they don't
# depend on any in-memory state of a particular worker).
#
# No effort is made to ensure only a single instance of these tasks is
# running.
# No effort is made here to ensure only a single instance of these tasks
# are running.
background_tasks_instance = config.get("run_background_tasks_on") or "master"
self.run_background_tasks = (
self.worker_name is None and background_tasks_instance == "master"
) or self.worker_name == background_tasks_instance

# Whether this worker should notify appservices of traffic within their namespace.
#
# As a note for developers, this task is currently not shardable, and thus should
# only be handled by a single process.
#
# No effort is made here to ensure only a single instance of this task running.
notify_appservices_instance = (
config.get("notify_appservices_from_worker") or "master"
)
self.should_notify_appservices = (
self.worker_name is None and notify_appservices_instance == "master"
) or self.worker_name == notify_appservices_instance

# Whether this worker should update the user directory tables.
#
# As a note for developers, this task is currently not shardable, and thus should
# only be handled by a single process.
#
# No effort is made here to ensure only a single instance of this task running.
update_user_directory_instance = (
config.get("update_user_directory_on_worker") or "master"
)
self.should_update_user_directory = (
self.worker_name is None and update_user_directory_instance == "master"
) or self.worker_name == update_user_directory_instance

def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
## Workers ##
Expand Down Expand Up @@ -318,11 +344,23 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs):
# events: worker1
# typing: worker1

# The worker that is used to run background tasks (e.g. cleaning up expired
# data). If not provided this defaults to the main process.
# The name of the worker that is used to run background tasks (e.g. cleaning
# up expired data). If not provided this defaults to the main process.
#
#run_background_tasks_on: worker1

# The name of the worker that is used to notify application services of new
# traffic within their configured namespace. If not provided this defaults
# to the main process.
#
#notify_appservices_from_worker: worker2

# The name of the worker that is used to update the user directory tables as
# users are created, update their room memberships as well as their profiles.
# If not provided this defaults to the main process.
#
#update_user_directory_on_worker: worker3

# A shared secret used by the replication APIs to authenticate HTTP requests
# from workers.
#
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(self, hs: "HomeServer"):
self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
self.clock = hs.get_clock()
self.notify_appservices = hs.config.notify_appservices
self.notify_appservices = hs.config.worker.should_notify_appservices
self.event_sources = hs.get_event_sources()

self.current_max = 0
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.update_user_directory = hs.config.worker.should_update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
self.spam_checker = hs.get_spam_checker()
# The current position in the current_state_delta stream
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/v2_alpha/shared_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, hs):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.user_directory_active = hs.config.update_user_directory
self.user_directory_active = hs.config.worker.should_update_user_directory

async def on_GET(self, request, user_id):

Expand Down
2 changes: 0 additions & 2 deletions tests/handlers/test_user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):

config = self.default_config()
config["update_user_directory"] = True
return self.setup_test_homeserver(config=config)

def prepare(self, reactor, clock, hs):
Expand Down Expand Up @@ -653,7 +652,6 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase):

def make_homeserver(self, reactor, clock):
config = self.default_config()
config["update_user_directory"] = True
hs = self.setup_test_homeserver(config=config)

self.config = hs.config
Expand Down
1 change: 0 additions & 1 deletion tests/rest/client/v2_alpha/test_shared_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class UserSharedRoomsTest(unittest.HomeserverTestCase):

def make_homeserver(self, reactor, clock):
config = self.default_config()
config["update_user_directory"] = True
return self.setup_test_homeserver(config=config)

def prepare(self, reactor, clock, hs):
Expand Down
3 changes: 0 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ def default_config(name, parse=False):
# We need a sane default_room_version, otherwise attempts to create
# rooms will fail.
"default_room_version": DEFAULT_ROOM_VERSION,
# disable user directory updates, because they get done in the
# background, which upsets the test runner.
"update_user_directory": False,
"caches": {"global_factor": 1},
"listeners": [{"port": 0, "type": "http"}],
}
Expand Down