From 4a1a8321dde4095efd9f65fcb6868d5257e065a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Nov 2021 11:14:46 +0000 Subject: [PATCH 01/18] Store whether a BG update is oneshot or not --- synapse/storage/background_updates.py | 32 ++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index b9a8ca997e61..30818aad6526 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,8 @@ import logging from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional +import attr + from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.types import Connection from synapse.types import JsonDict @@ -28,6 +30,22 @@ logger = logging.getLogger(__name__) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _BackgroundUpdateHandler: + """A handler for a given background update. + + Attributes: + callback: The function to call to make progress on the background + update. + oneshot: Wether the update is likely to happen all in one go, ignoring + the supplied target duration, e.g. index creation. This is used by + the update controller to help correctly schedule the update. + """ + + callback: Callable[[JsonDict, int], Awaitable[int]] + oneshot: bool = False + + class BackgroundUpdatePerformance: """Tracks the how long a background update is taking to update its items""" @@ -95,9 +113,7 @@ def __init__(self, hs: "HomeServer", database: "DatabasePool"): self._current_background_update: Optional[str] = None self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {} - self._background_update_handlers: Dict[ - str, Callable[[JsonDict, int], Awaitable[int]] - ] = {} + self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {} self._all_done = False # Whether we're currently running updates @@ -258,7 +274,7 @@ async def _do_background_update(self, desired_duration_ms: float) -> int: update_name = self._current_background_update logger.info("Starting update batch on background update '%s'", update_name) - update_handler = self._background_update_handlers[update_name] + update_handler = self._background_update_handlers[update_name].callback performance = self._background_update_performance.get(update_name) @@ -329,7 +345,9 @@ def register_background_update_handler( update_name: The name of the update that this code handles. update_handler: The function that does the update. """ - self._background_update_handlers[update_name] = update_handler + self._background_update_handlers[update_name] = _BackgroundUpdateHandler( + update_handler + ) def register_noop_background_update(self, update_name: str) -> None: """Register a noop handler for a background update. @@ -451,7 +469,9 @@ async def updater(progress, batch_size): await self._end_background_update(update_name) return 1 - self.register_background_update_handler(update_name, updater) + self._background_update_handlers[update_name] = _BackgroundUpdateHandler( + updater, oneshot=True + ) async def _end_background_update(self, update_name: str) -> None: """Removes a completed background update task from the queue. From 957da6f56ec5d46f1e8908a9ad557e56de6c7997 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Nov 2021 11:50:01 +0000 Subject: [PATCH 02/18] Add a `BackgroundUpdateController` class. This controls how often and for how long a background update is run for. The idea is to allow the default one to be replaced by a pluggable module. --- synapse/storage/background_updates.py | 182 +++++++++++++++++++++--- tests/push/test_email.py | 10 +- tests/storage/test_background_update.py | 22 +-- tests/storage/test_event_chain.py | 4 +- tests/unittest.py | 10 +- 5 files changed, 176 insertions(+), 52 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 30818aad6526..e5ecfc1f3ea9 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -11,15 +11,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import abc import logging -from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional +from typing import ( + TYPE_CHECKING, + AsyncContextManager, + Awaitable, + Callable, + Dict, + Iterable, + Optional, +) import attr from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.types import Connection from synapse.types import JsonDict -from synapse.util import json_encoder +from synapse.util import Clock, json_encoder from . import engines @@ -46,6 +55,120 @@ class _BackgroundUpdateHandler: oneshot: bool = False +class BackgroundUpdateController(abc.ABC): + """A base class for controlling background update timings.""" + + #### + # NOTE: This is used by modules so changes must be backwards compatible or + # be announced appropriately + #### + + @abc.abstractmethod + def run_update( + self, update_name: str, database_name: str, oneshot: bool + ) -> AsyncContextManager[int]: + """Called before we do the next iteration of a background update. The + returned async context manager is immediately entered and then exited + after this iteration of the background update has finished. + + Implementations will likely want to sleep for a period of time to stop + the background update from continuously being run. + + Args: + update_name: The name of the update that is to be run + database_name: The name of the database the background update is + being run on. Really only useful if Synapse is configured with + multiple databases. + oneshot: Whether the update will complete all in one go, e.g. + index creation. In such cases the returned target duration is + ignored. + + Returns: + The target duration in milliseconds that the background update + should run for. + + Note: this is a *target*, and an iteration may take substantially + longer or shorter. + """ + ... + + @abc.abstractmethod + async def default_batch_size(self, update_name: str, database_name: str) -> int: + """The batch size to use for the first iteration of a new background + update. + """ + ... + + @abc.abstractmethod + async def min_batch_size(self, update_name: str, database_name: str) -> int: + """A lower bound on the batch size of a new background update. + + Used to ensure that progress is always made. Must be greater than 0. + """ + ... + + +class _TimeBasedBackgroundUpdateController(BackgroundUpdateController): + """The default controller which aims to spend X ms doing the background + update every Y ms. + """ + + MINIMUM_BACKGROUND_BATCH_SIZE = 100 + DEFAULT_BACKGROUND_BATCH_SIZE = 100 + + BACKGROUND_UPDATE_INTERVAL_MS = 1000 + BACKGROUND_UPDATE_DURATION_MS = 100 + + def __init__(self, clock: Clock): + self._clock = clock + + def run_update( + self, + update_name: str, + database_name: str, + oneshot: bool, + ) -> AsyncContextManager[int]: + return self + + async def default_batch_size(self, update_name: str, database_name: str) -> int: + return self.DEFAULT_BACKGROUND_BATCH_SIZE + + async def min_batch_size(self, update_name: str, database_name: str) -> int: + return self.MINIMUM_BACKGROUND_BATCH_SIZE + + async def __aenter__(self) -> int: + await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000) + return self.BACKGROUND_UPDATE_DURATION_MS + + async def __aexit__(self, *exc): + pass + + +class _ImmediateBackgroundUpdateController(BackgroundUpdateController): + """A background update controller that doesn't ever wait, effectively + running the background updates as quickly as possible""" + + def run_update( + self, + update_name: str, + database_name: str, + oneshot: bool, + ) -> AsyncContextManager[int]: + return self + + async def default_batch_size(self, update_name: str, database_name: str) -> int: + return 100 + + async def min_batch_size(self, update_name: str, database_name: str) -> int: + return 100 + + async def __aenter__(self) -> int: + return 100 + + async def __aexit__(self, *exc): + pass + + class BackgroundUpdatePerformance: """Tracks the how long a background update is taking to update its items""" @@ -100,18 +223,17 @@ class BackgroundUpdater: process and autotuning the batch size. """ - MINIMUM_BACKGROUND_BATCH_SIZE = 100 - DEFAULT_BACKGROUND_BATCH_SIZE = 100 - BACKGROUND_UPDATE_INTERVAL_MS = 1000 - BACKGROUND_UPDATE_DURATION_MS = 100 - def __init__(self, hs: "HomeServer", database: "DatabasePool"): self._clock = hs.get_clock() self.db_pool = database + self._database_name = database.name() + # if a background update is currently running, its name. self._current_background_update: Optional[str] = None + self._controller = _TimeBasedBackgroundUpdateController(self._clock) + self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {} self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {} self._all_done = False @@ -149,13 +271,8 @@ async def run_background_updates(self, sleep: bool = True) -> None: try: logger.info("Starting background schema updates") while self.enabled: - if sleep: - await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) - try: - result = await self.do_next_background_update( - self.BACKGROUND_UPDATE_DURATION_MS - ) + result = await self.do_next_background_update(sleep) except Exception: logger.exception("Error doing update") else: @@ -217,13 +334,15 @@ async def has_completed_background_update(self, update_name: str) -> bool: return not update_exists - async def do_next_background_update(self, desired_duration_ms: float) -> bool: + async def do_next_background_update(self, sleep: bool = True) -> bool: """Does some amount of work on the next queued background update Returns once some amount of work is done. Args: - desired_duration_ms: How long we want to spend updating. + sleep: Whether to limit how quickly we run background updates or + not. + Returns: True if we have finished running all the background updates, otherwise False """ @@ -266,7 +385,25 @@ def get_background_updates_txn(txn): self._current_background_update = upd["update_name"] - await self._do_background_update(desired_duration_ms) + # We have a background update to run, otherwise we would have returned + # early. + assert self._current_background_update is not None + update_info = self._background_update_handlers[self._current_background_update] + + if sleep: + controller = self._controller + else: + # If `sleep` is False then we want to run the updates as quickly as + # possible. + controller = _ImmediateBackgroundUpdateController() + + async with controller.run_update( + update_name=self._current_background_update, + database_name=self._database_name, + oneshot=update_info.oneshot, + ) as desired_duration_ms: + await self._do_background_update(desired_duration_ms) + return False async def _do_background_update(self, desired_duration_ms: float) -> int: @@ -287,9 +424,14 @@ async def _do_background_update(self, desired_duration_ms: float) -> int: if items_per_ms is not None: batch_size = int(desired_duration_ms * items_per_ms) # Clamp the batch size so that we always make progress - batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE) + batch_size = max( + batch_size, + await self._controller.min_batch_size(update_name, self._database_name), + ) else: - batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE + batch_size = await self._controller.default_batch_size( + update_name, self._database_name + ) progress_json = await self.db_pool.simple_select_one_onecol( "background_updates", @@ -308,6 +450,8 @@ async def _do_background_update(self, desired_duration_ms: float) -> int: duration_ms = time_stop - time_start + performance.update(items_updated, duration_ms) + logger.info( "Running background update %r. Processed %r items in %rms." " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)", @@ -320,8 +464,6 @@ async def _do_background_update(self, desired_duration_ms: float) -> int: batch_size, ) - performance.update(items_updated, duration_ms) - return len(self._background_update_performance) def register_background_update_handler( diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 90f800e564b4..ad7a2bf3349c 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -408,13 +408,9 @@ def test_remove_unlinked_pushers_background_job(self): self.hs.get_datastore().db_pool.updates._all_done = False # Now let's actually drive the updates to completion - while not self.get_success( - self.hs.get_datastore().db_pool.updates.has_completed_background_updates() - ): - self.get_success( - self.hs.get_datastore().db_pool.updates.do_next_background_update(100), - by=0.1, - ) + self.get_success( + self.hs.get_datastore().db_pool.updates.run_background_updates(False) + ) # Check that all pushers with unlinked addresses were deleted pushers = self.get_success( diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 0da42b5ac559..6ad783d25344 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -20,10 +20,10 @@ def prepare(self, reactor, clock, homeserver): def test_do_background_update(self): # the time we claim each update takes - duration_ms = 42 + duration_ms = 0.2 # the target runtime for each bg update - target_background_update_duration_ms = 50000 + target_background_update_duration_ms = 100 store = self.hs.get_datastore() self.get_success( @@ -48,17 +48,13 @@ async def update(progress, count): self.update_handler.side_effect = update self.update_handler.reset_mock() res = self.get_success( - self.updates.do_next_background_update( - target_background_update_duration_ms - ), - by=0.1, + self.updates.do_next_background_update(False), + by=0.01, ) self.assertFalse(res) # on the first call, we should get run with the default background update size - self.update_handler.assert_called_once_with( - {"my_key": 1}, self.updates.DEFAULT_BACKGROUND_BATCH_SIZE - ) + self.update_handler.assert_called_once_with({"my_key": 1}, 100) # second step: complete the update # we should now get run with a much bigger number of items to update @@ -74,16 +70,12 @@ async def update(progress, count): self.update_handler.side_effect = update self.update_handler.reset_mock() - result = self.get_success( - self.updates.do_next_background_update(target_background_update_duration_ms) - ) + result = self.get_success(self.updates.do_next_background_update(False)) self.assertFalse(result) self.update_handler.assert_called_once() # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = self.get_success( - self.updates.do_next_background_update(target_background_update_duration_ms) - ) + result = self.get_success(self.updates.do_next_background_update(False)) self.assertTrue(result) self.assertFalse(self.update_handler.called) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index b31c5eb5ecc6..7b7f6c349e1c 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -664,7 +664,7 @@ def test_background_update_single_large_room(self): ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 + self.store.db_pool.updates.do_next_background_update(False), by=0.1 ) # Ensure that we did actually take multiple iterations to process the @@ -723,7 +723,7 @@ def test_background_update_multiple_large_room(self): ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 + self.store.db_pool.updates.do_next_background_update(False), by=0.1 ) # Ensure that we did actually take multiple iterations to process the diff --git a/tests/unittest.py b/tests/unittest.py index a9b60b7eeb4a..a5a8ec0593e5 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -319,12 +319,7 @@ def wait_on_thread(self, deferred, timeout=10): def wait_for_background_updates(self) -> None: """Block until all background database updates have completed.""" - while not self.get_success( - self.store.db_pool.updates.has_completed_background_updates() - ): - self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 - ) + self.get_success(self.store.db_pool.updates.run_background_updates(False)) def make_homeserver(self, reactor, clock): """ @@ -484,8 +479,7 @@ def setup_test_homeserver(self, *args: Any, **kwargs: Any) -> HomeServer: async def run_bg_updates(): with LoggingContext("run_bg_updates"): - while not await stor.db_pool.updates.has_completed_background_updates(): - await stor.db_pool.updates.do_next_background_update(1) + self.get_success(stor.db_pool.updates.run_background_updates(False)) hs = setup_test_homeserver(self.addCleanup, *args, **kwargs) stor = hs.get_datastore() From 0c3ba88496a80eb6872b160422427f023053c8b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Nov 2021 10:35:52 +0000 Subject: [PATCH 03/18] Add a `register_background_update_controller` --- synapse/module_api/__init__.py | 17 +++++++++++++++++ synapse/storage/background_updates.py | 11 ++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 6e7f5238fed2..94a814ac5ac3 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -48,6 +48,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client.login import LoginResponse from synapse.storage import DataStore +from synapse.storage.background_updates import BackgroundUpdateController from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.roommember import ProfileInfo from synapse.storage.state import StateFilter @@ -92,6 +93,7 @@ "JsonDict", "EventBase", "StateMap", + "BackgroundUpdateController", ] logger = logging.getLogger(__name__) @@ -212,6 +214,21 @@ def register_web_resource(self, path: str, resource: IResource): """ self._hs.register_module_web_resource(path, resource) + def register_background_update_controller( + self, + controller: BackgroundUpdateController, + ) -> None: + """Registers a background update controller. + + Added in v1.48.0 + + Args: + controller: The controller to use. + """ + + for db in self._hs.get_datastores().databases: + db.updates.register_update_controller(controller) + ######################################################################### # The following methods can be called by the module at any point in time. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index e5ecfc1f3ea9..e43648982e2e 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -232,7 +232,9 @@ def __init__(self, hs: "HomeServer", database: "DatabasePool"): # if a background update is currently running, its name. self._current_background_update: Optional[str] = None - self._controller = _TimeBasedBackgroundUpdateController(self._clock) + self._controller: BackgroundUpdateController = ( + _TimeBasedBackgroundUpdateController(self._clock) + ) self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {} self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {} @@ -245,6 +247,13 @@ def __init__(self, hs: "HomeServer", database: "DatabasePool"): # enable/disable background updates via the admin API. self.enabled = True + def register_update_controller( + self, controller: BackgroundUpdateController + ) -> None: + """Register a new background update controller to use.""" + + self._controller = controller + def get_current_update(self) -> Optional[BackgroundUpdatePerformance]: """Returns the current background update, if any.""" From 0ace9f8d85d35568aab195474703cd665746f446 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Nov 2021 10:37:18 +0000 Subject: [PATCH 04/18] Expose a `sleep(..)` func on `ModuleApi` --- synapse/module_api/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 94a814ac5ac3..16ed702ea0f8 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -876,6 +876,11 @@ def looping_background_call( f, ) + async def sleep(self, seconds: float) -> None: + """Sleeps for the given number of seconds.""" + + await self._clock.sleep(seconds) + async def send_mail( self, recipient: str, From dccddf15b087027d2fa73e6cf1c8c6a1222983ee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Nov 2021 11:53:33 +0000 Subject: [PATCH 05/18] Add tests --- setup.py | 4 +- tests/storage/test_background_update.py | 77 ++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 5d602db240cc..14836131bdea 100755 --- a/setup.py +++ b/setup.py @@ -118,7 +118,9 @@ def exec_file(path_segments): # Tests assume that all optional dependencies are installed. # # parameterized_class decorator was introduced in parameterized 0.7.0 -CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0"] +# +# We use `mock` library as that backports `AsyncMock` to Python 3.6 +CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0", "mock>=4.0.0"] CONDITIONAL_REQUIREMENTS["dev"] = ( CONDITIONAL_REQUIREMENTS["lint"] diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 6ad783d25344..d9611b829527 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -1,6 +1,11 @@ -from unittest.mock import Mock +from mock import AsyncMock, Mock -from synapse.storage.background_updates import BackgroundUpdater +from twisted.internet.defer import Deferred, ensureDeferred + +from synapse.storage.background_updates import ( + BackgroundUpdateController, + BackgroundUpdater, +) from tests import unittest @@ -79,3 +84,71 @@ async def update(progress, count): result = self.get_success(self.updates.do_next_background_update(False)) self.assertTrue(result) self.assertFalse(self.update_handler.called) + + +class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase): + def prepare(self, reactor, clock, homeserver): + self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates + # the base test class should have run the real bg updates for us + self.assertTrue( + self.get_success(self.updates.has_completed_background_updates()) + ) + + self.update_deferred = Deferred() + self.update_handler = Mock(return_value=self.update_deferred) + self.updates.register_background_update_handler( + "test_update", self.update_handler + ) + + self._controller_ctx_mgr = AsyncMock(name="_controller_ctx_mgr") + self._controller = AsyncMock(BackgroundUpdateController) + self._controller.run_update.return_value = self._controller_ctx_mgr + + self.updates.register_update_controller(self._controller) + + def test_controller(self): + store = self.hs.get_datastore() + self.get_success( + store.db_pool.simple_insert( + "background_updates", + values={"update_name": "test_update", "progress_json": "{}"}, + ) + ) + + default_batch_size = 100 + + # Set up the return values of the controller. + enter_defer = Deferred() + self._controller_ctx_mgr.__aenter__ = Mock(return_value=enter_defer) + self._controller.default_batch_size.return_value = default_batch_size + self._controller.min_batch_size.return_value = default_batch_size + + # Start the background update. + do_update_d = ensureDeferred(self.updates.do_next_background_update(True)) + + self.pump() + + # `run_update` should have been called, but the update handler won't be + # called until the `enter_defer` (returned by `__aenter__`) is resolved. + self._controller.run_update.assert_called_once_with( + update_name="test_update", + database_name="master", + oneshot=False, + ) + self.assertFalse(do_update_d.called) + self.assertFalse(self.update_deferred.called) + + # Resolving the `enter_defer` should call the update handler, which then + # blocks. + enter_defer.callback(100) + self.pump() + self.update_handler.assert_called_once_with({}, default_batch_size) + self.assertFalse(self.update_deferred.called) + self._controller_ctx_mgr.__aexit__.assert_not_awaited() + + # Resolving the update handler deferred should cause the + # `do_next_background_update` to finish and return + self.update_deferred.callback(100) + self.pump() + self._controller_ctx_mgr.__aexit__.assert_awaited() + self.get_success(do_update_d) From c7f1498e359a23b219a6675a86cad3d5eb679c65 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Nov 2021 12:00:15 +0000 Subject: [PATCH 06/18] Newsfile --- changelog.d/11306.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11306.feature diff --git a/changelog.d/11306.feature b/changelog.d/11306.feature new file mode 100644 index 000000000000..99c1cfaaf831 --- /dev/null +++ b/changelog.d/11306.feature @@ -0,0 +1 @@ +Add new plugin support for controlling background update timings. From c77bad8d5715ecd1d1c7d2460207f76f2dbb04a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Nov 2021 11:24:09 +0000 Subject: [PATCH 07/18] Convert API to use callbacks --- synapse/module_api/__init__.py | 27 ++++++++++++++++++--- synapse/storage/background_updates.py | 34 +++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 16ed702ea0f8..3d9b689eb50b 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -17,6 +17,8 @@ from typing import ( TYPE_CHECKING, Any, + AsyncContextManager, + Awaitable, Callable, Dict, Generator, @@ -48,7 +50,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client.login import LoginResponse from synapse.storage import DataStore -from synapse.storage.background_updates import BackgroundUpdateController +from synapse.storage.background_updates import _CallbackBackgroundUpdateController from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.roommember import ProfileInfo from synapse.storage.state import StateFilter @@ -216,16 +218,35 @@ def register_web_resource(self, path: str, resource: IResource): def register_background_update_controller( self, - controller: BackgroundUpdateController, + update_handler: Callable[[str, str, bool], AsyncContextManager[int]], + default_batch_size: Optional[Callable[[str, str], Awaitable[int]]] = None, + min_batch_size: Optional[Callable[[str, str], Awaitable[int]]] = None, ) -> None: """Registers a background update controller. Added in v1.48.0 Args: - controller: The controller to use. + update_handler: Called when about to do an iteration of a background + update. Takes the `update_name`, `database_name` and `oneshot` + as arguments, where `oneshot` is a flag to indicate whether the + background update will happen in one go and may take a long time + (e.g creating indices). The return value is a async context + manager that returns the desired duration of the iteration in ms, + and will be exited when the iteration is complete. + + default_batch_size: Called to get the default batch size, i.e. the + batch size to use for the first iteration of a given background + update. Defaults to always returning 100. + + min_batch_size: The minimum batch size for each background update, + default to always returning 100. """ + controller = _CallbackBackgroundUpdateController( + update_handler, default_batch_size, min_batch_size + ) + for db in self._hs.get_datastores().databases: db.updates.register_update_controller(controller) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index e43648982e2e..82a052eb3675 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -58,11 +58,6 @@ class _BackgroundUpdateHandler: class BackgroundUpdateController(abc.ABC): """A base class for controlling background update timings.""" - #### - # NOTE: This is used by modules so changes must be backwards compatible or - # be announced appropriately - #### - @abc.abstractmethod def run_update( self, update_name: str, database_name: str, oneshot: bool @@ -108,6 +103,35 @@ async def min_batch_size(self, update_name: str, database_name: str) -> int: ... +@attr.s(auto_attribs=True) +class _CallbackBackgroundUpdateController(BackgroundUpdateController): + """A background update controller that defers to the given callbacks. + + Used to wrap callbacks from the module API. + """ + + _update_handler: Callable[[str, str, bool], AsyncContextManager[int]] + _default_batch_size: Optional[Callable[[str, str], Awaitable[int]]] = None + _min_batch_size: Optional[Callable[[str, str], Awaitable[int]]] = None + + def run_update( + self, update_name: str, database_name: str, oneshot: bool + ) -> AsyncContextManager[int]: + return self._update_handler(update_name, database_name, oneshot) + + async def default_batch_size(self, update_name: str, database_name: str) -> int: + if self._default_batch_size is None: + return 100 + + return await self._default_batch_size(update_name, database_name) + + async def min_batch_size(self, update_name: str, database_name: str) -> int: + if self._min_batch_size is None: + return 100 + + return await self._min_batch_size(update_name, database_name) + + class _TimeBasedBackgroundUpdateController(BackgroundUpdateController): """The default controller which aims to spend X ms doing the background update every Y ms. From 4a1d77e706a46cc7aaa6a9a639d7e64d7e68746e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 18 Nov 2021 18:34:36 +0100 Subject: [PATCH 08/18] Remove callback wrapping Also fix tests --- synapse/module_api/__init__.py | 42 ++--- synapse/storage/background_updates.py | 211 +++++++----------------- tests/storage/test_background_update.py | 55 +++--- tests/storage/test_user_directory.py | 5 +- 4 files changed, 116 insertions(+), 197 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index ed9ac7546a2a..b7247817b40d 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -50,7 +50,11 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client.login import LoginResponse from synapse.storage import DataStore -from synapse.storage.background_updates import _CallbackBackgroundUpdateController +from synapse.storage.background_updates import ( + DEFAULT_BATCH_SIZE_CALLBACK, + MIN_BATCH_SIZE_CALLBACK, + UPDATE_HANDLER_CALLBACK, +) from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.roommember import ProfileInfo from synapse.storage.state import StateFilter @@ -215,39 +219,23 @@ def register_web_resource(self, path: str, resource: Resource): """ self._hs.register_module_web_resource(path, resource) - def register_background_update_controller( + def register_background_update_controller_callbacks( self, - update_handler: Callable[[str, str, bool], AsyncContextManager[int]], - default_batch_size: Optional[Callable[[str, str], Awaitable[int]]] = None, - min_batch_size: Optional[Callable[[str, str], Awaitable[int]]] = None, + update_handler: UPDATE_HANDLER_CALLBACK, + default_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, + min_batch_size: Optional[MIN_BATCH_SIZE_CALLBACK] = None, ) -> None: - """Registers a background update controller. + """Registers background update controller callbacks. Added in v1.48.0 - - Args: - update_handler: Called when about to do an iteration of a background - update. Takes the `update_name`, `database_name` and `oneshot` - as arguments, where `oneshot` is a flag to indicate whether the - background update will happen in one go and may take a long time - (e.g creating indices). The return value is a async context - manager that returns the desired duration of the iteration in ms, - and will be exited when the iteration is complete. - - default_batch_size: Called to get the default batch size, i.e. the - batch size to use for the first iteration of a given background - update. Defaults to always returning 100. - - min_batch_size: The minimum batch size for each background update, - default to always returning 100. """ - controller = _CallbackBackgroundUpdateController( - update_handler, default_batch_size, min_batch_size - ) - for db in self._hs.get_datastores().databases: - db.updates.register_update_controller(controller) + db.updates.register_update_controller_callbacks( + update_handler=update_handler, + default_batch_size=default_batch_size, + min_batch_size=min_batch_size, + ) ######################################################################### # The following methods can be called by the module at any point in time. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 82a052eb3675..1f7700a4f748 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import abc import logging from typing import ( TYPE_CHECKING, @@ -39,6 +38,11 @@ logger = logging.getLogger(__name__) +UPDATE_HANDLER_CALLBACK = Callable[[str, str, bool], AsyncContextManager[int]] +DEFAULT_BATCH_SIZE_CALLBACK = Callable[[str, str], Awaitable[int]] +MIN_BATCH_SIZE_CALLBACK = Callable[[str, str], Awaitable[int]] + + @attr.s(slots=True, frozen=True, auto_attribs=True) class _BackgroundUpdateHandler: """A handler for a given background update. @@ -55,141 +59,21 @@ class _BackgroundUpdateHandler: oneshot: bool = False -class BackgroundUpdateController(abc.ABC): - """A base class for controlling background update timings.""" - - @abc.abstractmethod - def run_update( - self, update_name: str, database_name: str, oneshot: bool - ) -> AsyncContextManager[int]: - """Called before we do the next iteration of a background update. The - returned async context manager is immediately entered and then exited - after this iteration of the background update has finished. - - Implementations will likely want to sleep for a period of time to stop - the background update from continuously being run. - - Args: - update_name: The name of the update that is to be run - database_name: The name of the database the background update is - being run on. Really only useful if Synapse is configured with - multiple databases. - oneshot: Whether the update will complete all in one go, e.g. - index creation. In such cases the returned target duration is - ignored. - - Returns: - The target duration in milliseconds that the background update - should run for. - - Note: this is a *target*, and an iteration may take substantially - longer or shorter. - """ - ... - - @abc.abstractmethod - async def default_batch_size(self, update_name: str, database_name: str) -> int: - """The batch size to use for the first iteration of a new background - update. - """ - ... - - @abc.abstractmethod - async def min_batch_size(self, update_name: str, database_name: str) -> int: - """A lower bound on the batch size of a new background update. - - Used to ensure that progress is always made. Must be greater than 0. - """ - ... - - -@attr.s(auto_attribs=True) -class _CallbackBackgroundUpdateController(BackgroundUpdateController): - """A background update controller that defers to the given callbacks. - - Used to wrap callbacks from the module API. - """ - - _update_handler: Callable[[str, str, bool], AsyncContextManager[int]] - _default_batch_size: Optional[Callable[[str, str], Awaitable[int]]] = None - _min_batch_size: Optional[Callable[[str, str], Awaitable[int]]] = None - - def run_update( - self, update_name: str, database_name: str, oneshot: bool - ) -> AsyncContextManager[int]: - return self._update_handler(update_name, database_name, oneshot) - - async def default_batch_size(self, update_name: str, database_name: str) -> int: - if self._default_batch_size is None: - return 100 - - return await self._default_batch_size(update_name, database_name) - - async def min_batch_size(self, update_name: str, database_name: str) -> int: - if self._min_batch_size is None: - return 100 - - return await self._min_batch_size(update_name, database_name) - - -class _TimeBasedBackgroundUpdateController(BackgroundUpdateController): - """The default controller which aims to spend X ms doing the background - update every Y ms. - """ - - MINIMUM_BACKGROUND_BATCH_SIZE = 100 - DEFAULT_BACKGROUND_BATCH_SIZE = 100 - +class _BackgroundUpdateContextManager: BACKGROUND_UPDATE_INTERVAL_MS = 1000 BACKGROUND_UPDATE_DURATION_MS = 100 - def __init__(self, clock: Clock): + def __init__(self, sleep: bool, clock: Clock): + self._sleep = sleep self._clock = clock - def run_update( - self, - update_name: str, - database_name: str, - oneshot: bool, - ) -> AsyncContextManager[int]: - return self - - async def default_batch_size(self, update_name: str, database_name: str) -> int: - return self.DEFAULT_BACKGROUND_BATCH_SIZE - - async def min_batch_size(self, update_name: str, database_name: str) -> int: - return self.MINIMUM_BACKGROUND_BATCH_SIZE - async def __aenter__(self) -> int: - await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000) - return self.BACKGROUND_UPDATE_DURATION_MS + if self._sleep: + await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000) - async def __aexit__(self, *exc): - pass - - -class _ImmediateBackgroundUpdateController(BackgroundUpdateController): - """A background update controller that doesn't ever wait, effectively - running the background updates as quickly as possible""" - - def run_update( - self, - update_name: str, - database_name: str, - oneshot: bool, - ) -> AsyncContextManager[int]: - return self - - async def default_batch_size(self, update_name: str, database_name: str) -> int: - return 100 - - async def min_batch_size(self, update_name: str, database_name: str) -> int: - return 100 - - async def __aenter__(self) -> int: - return 100 + return self.BACKGROUND_UPDATE_DURATION_MS - async def __aexit__(self, *exc): + async def __aexit__(self, *exc) -> None: pass @@ -256,9 +140,9 @@ def __init__(self, hs: "HomeServer", database: "DatabasePool"): # if a background update is currently running, its name. self._current_background_update: Optional[str] = None - self._controller: BackgroundUpdateController = ( - _TimeBasedBackgroundUpdateController(self._clock) - ) + self._update_handler_callback: Optional[UPDATE_HANDLER_CALLBACK] = None + self._default_batch_size_callback: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None + self._min_batch_size_callback: Optional[MIN_BATCH_SIZE_CALLBACK] = None self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {} self._background_update_handlers: Dict[str, _BackgroundUpdateHandler] = {} @@ -271,12 +155,53 @@ def __init__(self, hs: "HomeServer", database: "DatabasePool"): # enable/disable background updates via the admin API. self.enabled = True - def register_update_controller( - self, controller: BackgroundUpdateController + def register_update_controller_callbacks( + self, + update_handler: UPDATE_HANDLER_CALLBACK, + default_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, + min_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, ) -> None: - """Register a new background update controller to use.""" + if self._update_handler_callback is not None: + logger.warning( + "More than one module tried to register callbacks for controlling" + " background updates. Only the callbacks registered by the first module" + " (in order of appearance in Synapse's configuration file) that tried to" + " do so will be called." + ) - self._controller = controller + return + + self._update_handler_callback = update_handler + + if default_batch_size is not None: + self._default_batch_size_callback = default_batch_size + + if min_batch_size is not None: + self._min_batch_size_callback = min_batch_size + + def _get_context_manager_for_update( + self, + sleep: bool, + update_name: str, + database_name: str, + oneshot: bool, + ) -> AsyncContextManager[int]: + if self._update_handler_callback is not None: + return self._update_handler_callback(update_name, database_name, oneshot) + + return _BackgroundUpdateContextManager(sleep, self._clock) + + async def _default_batch_size(self, update_name: str, database_name: str) -> int: + if self._default_batch_size_callback is not None: + return await self._default_batch_size_callback(update_name, database_name) + + return 100 + + async def _min_batch_size(self, update_name: str, database_name: str) -> int: + if self._min_batch_size_callback is not None: + return await self._min_batch_size_callback(update_name, database_name) + + return 100 def get_current_update(self) -> Optional[BackgroundUpdatePerformance]: """Returns the current background update, if any.""" @@ -423,14 +348,8 @@ def get_background_updates_txn(txn): assert self._current_background_update is not None update_info = self._background_update_handlers[self._current_background_update] - if sleep: - controller = self._controller - else: - # If `sleep` is False then we want to run the updates as quickly as - # possible. - controller = _ImmediateBackgroundUpdateController() - - async with controller.run_update( + async with self._get_context_manager_for_update( + sleep=sleep, update_name=self._current_background_update, database_name=self._database_name, oneshot=update_info.oneshot, @@ -459,12 +378,10 @@ async def _do_background_update(self, desired_duration_ms: float) -> int: # Clamp the batch size so that we always make progress batch_size = max( batch_size, - await self._controller.min_batch_size(update_name, self._database_name), + await self._min_batch_size(update_name, self._database_name), ) else: - batch_size = await self._controller.default_batch_size( - update_name, self._database_name - ) + batch_size = await self._default_batch_size(update_name, self._database_name) progress_json = await self.db_pool.simple_select_one_onecol( "background_updates", diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index d9611b829527..1bc3e1713cbb 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -1,13 +1,13 @@ +from typing import AsyncContextManager + from mock import AsyncMock, Mock from twisted.internet.defer import Deferred, ensureDeferred -from synapse.storage.background_updates import ( - BackgroundUpdateController, - BackgroundUpdater, -) +from synapse.storage.background_updates import BackgroundUpdater from tests import unittest +from tests.test_utils import make_awaitable class BackgroundUpdateTestCase(unittest.HomeserverTestCase): @@ -100,11 +100,28 @@ def prepare(self, reactor, clock, homeserver): "test_update", self.update_handler ) - self._controller_ctx_mgr = AsyncMock(name="_controller_ctx_mgr") - self._controller = AsyncMock(BackgroundUpdateController) - self._controller.run_update.return_value = self._controller_ctx_mgr - - self.updates.register_update_controller(self._controller) + # Mock out the AsyncContextManager + self._update_ctx_manager = Mock(spec=["__aenter__", "__aexit__"]) + self._update_ctx_manager.__aenter__ = Mock( + return_value=make_awaitable(None), + ) + self._update_ctx_manager.__aexit__ = Mock(return_value=make_awaitable(None)) + + # Mock out the `update_handler` callback + self._update_handler_callback = Mock(return_value=self._update_ctx_manager) + + # Define a default batch size value that's not the same as the internal default + # value (100). + self._default_batch_size = 500 + + # Register the callbacks with more mocks + self.hs.get_module_api().register_background_update_controller_callbacks( + update_handler=self._update_handler_callback, + min_batch_size=Mock(return_value=make_awaitable(self._default_batch_size)), + default_batch_size=Mock( + return_value=make_awaitable(self._default_batch_size), + ), + ) def test_controller(self): store = self.hs.get_datastore() @@ -115,13 +132,9 @@ def test_controller(self): ) ) - default_batch_size = 100 - - # Set up the return values of the controller. + # Set the return value for the context manager. enter_defer = Deferred() - self._controller_ctx_mgr.__aenter__ = Mock(return_value=enter_defer) - self._controller.default_batch_size.return_value = default_batch_size - self._controller.min_batch_size.return_value = default_batch_size + self._update_ctx_manager.__aenter__ = Mock(return_value=enter_defer) # Start the background update. do_update_d = ensureDeferred(self.updates.do_next_background_update(True)) @@ -130,10 +143,8 @@ def test_controller(self): # `run_update` should have been called, but the update handler won't be # called until the `enter_defer` (returned by `__aenter__`) is resolved. - self._controller.run_update.assert_called_once_with( - update_name="test_update", - database_name="master", - oneshot=False, + self._update_handler_callback.assert_called_once_with( + "test_update", "master", False, ) self.assertFalse(do_update_d.called) self.assertFalse(self.update_deferred.called) @@ -142,13 +153,13 @@ def test_controller(self): # blocks. enter_defer.callback(100) self.pump() - self.update_handler.assert_called_once_with({}, default_batch_size) + self.update_handler.assert_called_once_with({}, self._default_batch_size) self.assertFalse(self.update_deferred.called) - self._controller_ctx_mgr.__aexit__.assert_not_awaited() + self._update_ctx_manager.__aexit__.assert_not_called() # Resolving the update handler deferred should cause the # `do_next_background_update` to finish and return self.update_deferred.callback(100) self.pump() - self._controller_ctx_mgr.__aexit__.assert_awaited() + self._update_ctx_manager.__aexit__.assert_called() self.get_success(do_update_d) diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py index 37cf7bb232f9..7f5b28aed8c4 100644 --- a/tests/storage/test_user_directory.py +++ b/tests/storage/test_user_directory.py @@ -23,6 +23,7 @@ from synapse.rest.client import login, register, room from synapse.server import HomeServer from synapse.storage import DataStore +from synapse.storage.background_updates import _BackgroundUpdateHandler from synapse.storage.roommember import ProfileInfo from synapse.util import Clock @@ -391,7 +392,9 @@ async def mocked_process_users(*args: Any, **kwargs: Any) -> int: with mock.patch.dict( self.store.db_pool.updates._background_update_handlers, - populate_user_directory_process_users=mocked_process_users, + populate_user_directory_process_users=_BackgroundUpdateHandler( + mocked_process_users, + ), ): self._purge_and_rebuild_user_dir() From 31a4897da97169e26a1b7e104d5bdbf0fd9541ad Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 18 Nov 2021 18:54:32 +0100 Subject: [PATCH 09/18] Lint and docstrings --- synapse/module_api/__init__.py | 2 -- synapse/storage/background_updates.py | 37 +++++++++++++++++++++++-- tests/storage/test_background_update.py | 8 +++--- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index b7247817b40d..e3bf3892613d 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -17,8 +17,6 @@ from typing import ( TYPE_CHECKING, Any, - AsyncContextManager, - Awaitable, Callable, Dict, Generator, diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 1f7700a4f748..1483b452cf3f 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -161,6 +161,7 @@ def register_update_controller_callbacks( default_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, min_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, ) -> None: + """Register callbacks from a module for each hook.""" if self._update_handler_callback is not None: logger.warning( "More than one module tried to register callbacks for controlling" @@ -186,18 +187,48 @@ def _get_context_manager_for_update( database_name: str, oneshot: bool, ) -> AsyncContextManager[int]: - if self._update_handler_callback is not None: + """Get a context manager to run a background update with. + + If a module has registered a `update_handler` callback, and we can sleep between + updates, use the context manager it returns. + + Otherwise, returns a context manager that will return a default value, optionally + sleeping if needed. + + Args: + sleep: Whether we can sleep between updates. + update_name: The name of the update. + database_name: The name of the database the update is being run on. + oneshot: Whether the update will complete all in one go, e.g. index creation. + In such cases the returned target duration is ignored. + + Returns: + The target duration in milliseconds that the background update should run for. + + Note: this is a *target*, and an iteration may take substantially longer or + shorter. + """ + if self._update_handler_callback is not None and sleep: + # TODO: not sure skipping the module callback if sleep is False is the right + # thing to do return self._update_handler_callback(update_name, database_name, oneshot) return _BackgroundUpdateContextManager(sleep, self._clock) async def _default_batch_size(self, update_name: str, database_name: str) -> int: + """The batch size to use for the first iteration of a new background + update. + """ if self._default_batch_size_callback is not None: return await self._default_batch_size_callback(update_name, database_name) return 100 async def _min_batch_size(self, update_name: str, database_name: str) -> int: + """A lower bound on the batch size of a new background update. + + Used to ensure that progress is always made. Must be greater than 0. + """ if self._min_batch_size_callback is not None: return await self._min_batch_size_callback(update_name, database_name) @@ -381,7 +412,9 @@ async def _do_background_update(self, desired_duration_ms: float) -> int: await self._min_batch_size(update_name, self._database_name), ) else: - batch_size = await self._default_batch_size(update_name, self._database_name) + batch_size = await self._default_batch_size( + update_name, self._database_name + ) progress_json = await self.db_pool.simple_select_one_onecol( "background_updates", diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 1bc3e1713cbb..22a5ec1cd74d 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -1,6 +1,4 @@ -from typing import AsyncContextManager - -from mock import AsyncMock, Mock +from mock import Mock from twisted.internet.defer import Deferred, ensureDeferred @@ -144,7 +142,9 @@ def test_controller(self): # `run_update` should have been called, but the update handler won't be # called until the `enter_defer` (returned by `__aenter__`) is resolved. self._update_handler_callback.assert_called_once_with( - "test_update", "master", False, + "test_update", + "master", + False, ) self.assertFalse(do_update_d.called) self.assertFalse(self.update_deferred.called) From d89cadd13edc91b8f206df44eeea2c66da269261 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 18 Nov 2021 19:01:04 +0100 Subject: [PATCH 10/18] Don't ignore module callbacks if we don't want to sleep --- synapse/storage/background_updates.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 1483b452cf3f..bfad9ebd894f 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -189,8 +189,8 @@ def _get_context_manager_for_update( ) -> AsyncContextManager[int]: """Get a context manager to run a background update with. - If a module has registered a `update_handler` callback, and we can sleep between - updates, use the context manager it returns. + If a module has registered a `update_handler` callback, use the context manager + it returns. Otherwise, returns a context manager that will return a default value, optionally sleeping if needed. @@ -208,9 +208,7 @@ def _get_context_manager_for_update( Note: this is a *target*, and an iteration may take substantially longer or shorter. """ - if self._update_handler_callback is not None and sleep: - # TODO: not sure skipping the module callback if sleep is False is the right - # thing to do + if self._update_handler_callback is not None: return self._update_handler_callback(update_name, database_name, oneshot) return _BackgroundUpdateContextManager(sleep, self._clock) From df863fb831268d5b172ee2a57f0a1896aba6301a Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 19 Nov 2021 11:07:07 +0100 Subject: [PATCH 11/18] Rename update handler to avoid name clashes --- synapse/module_api/__init__.py | 38 ++++++++++++------------- synapse/storage/background_updates.py | 14 ++++----- tests/storage/test_background_update.py | 6 ++-- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index e3bf3892613d..296ec6a4832a 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -51,7 +51,7 @@ from synapse.storage.background_updates import ( DEFAULT_BATCH_SIZE_CALLBACK, MIN_BATCH_SIZE_CALLBACK, - UPDATE_HANDLER_CALLBACK, + ON_UPDATE_CALLBACK, ) from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.roommember import ProfileInfo @@ -201,6 +201,24 @@ def register_password_auth_provider_callbacks(self): """ return self._password_auth_provider.register_password_auth_provider_callbacks + def register_background_update_controller_callbacks( + self, + on_update: ON_UPDATE_CALLBACK, + default_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, + min_batch_size: Optional[MIN_BATCH_SIZE_CALLBACK] = None, + ) -> None: + """Registers background update controller callbacks. + + Added in Synapse v1.48.0. + """ + + for db in self._hs.get_datastores().databases: + db.updates.register_update_controller_callbacks( + on_update=on_update, + default_batch_size=default_batch_size, + min_batch_size=min_batch_size, + ) + def register_web_resource(self, path: str, resource: Resource): """Registers a web resource to be served at the given path. @@ -217,24 +235,6 @@ def register_web_resource(self, path: str, resource: Resource): """ self._hs.register_module_web_resource(path, resource) - def register_background_update_controller_callbacks( - self, - update_handler: UPDATE_HANDLER_CALLBACK, - default_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, - min_batch_size: Optional[MIN_BATCH_SIZE_CALLBACK] = None, - ) -> None: - """Registers background update controller callbacks. - - Added in v1.48.0 - """ - - for db in self._hs.get_datastores().databases: - db.updates.register_update_controller_callbacks( - update_handler=update_handler, - default_batch_size=default_batch_size, - min_batch_size=min_batch_size, - ) - ######################################################################### # The following methods can be called by the module at any point in time. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index bfad9ebd894f..482eaa81dec0 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -38,7 +38,7 @@ logger = logging.getLogger(__name__) -UPDATE_HANDLER_CALLBACK = Callable[[str, str, bool], AsyncContextManager[int]] +ON_UPDATE_CALLBACK = Callable[[str, str, bool], AsyncContextManager[int]] DEFAULT_BATCH_SIZE_CALLBACK = Callable[[str, str], Awaitable[int]] MIN_BATCH_SIZE_CALLBACK = Callable[[str, str], Awaitable[int]] @@ -140,7 +140,7 @@ def __init__(self, hs: "HomeServer", database: "DatabasePool"): # if a background update is currently running, its name. self._current_background_update: Optional[str] = None - self._update_handler_callback: Optional[UPDATE_HANDLER_CALLBACK] = None + self._on_update_callback: Optional[ON_UPDATE_CALLBACK] = None self._default_batch_size_callback: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None self._min_batch_size_callback: Optional[MIN_BATCH_SIZE_CALLBACK] = None @@ -157,12 +157,12 @@ def __init__(self, hs: "HomeServer", database: "DatabasePool"): def register_update_controller_callbacks( self, - update_handler: UPDATE_HANDLER_CALLBACK, + on_update: ON_UPDATE_CALLBACK, default_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, min_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None, ) -> None: """Register callbacks from a module for each hook.""" - if self._update_handler_callback is not None: + if self._on_update_callback is not None: logger.warning( "More than one module tried to register callbacks for controlling" " background updates. Only the callbacks registered by the first module" @@ -172,7 +172,7 @@ def register_update_controller_callbacks( return - self._update_handler_callback = update_handler + self._on_update_callback = on_update if default_batch_size is not None: self._default_batch_size_callback = default_batch_size @@ -208,8 +208,8 @@ def _get_context_manager_for_update( Note: this is a *target*, and an iteration may take substantially longer or shorter. """ - if self._update_handler_callback is not None: - return self._update_handler_callback(update_name, database_name, oneshot) + if self._on_update_callback is not None: + return self._on_update_callback(update_name, database_name, oneshot) return _BackgroundUpdateContextManager(sleep, self._clock) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 22a5ec1cd74d..e6acefdab941 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -106,7 +106,7 @@ def prepare(self, reactor, clock, homeserver): self._update_ctx_manager.__aexit__ = Mock(return_value=make_awaitable(None)) # Mock out the `update_handler` callback - self._update_handler_callback = Mock(return_value=self._update_ctx_manager) + self._on_update = Mock(return_value=self._update_ctx_manager) # Define a default batch size value that's not the same as the internal default # value (100). @@ -114,7 +114,7 @@ def prepare(self, reactor, clock, homeserver): # Register the callbacks with more mocks self.hs.get_module_api().register_background_update_controller_callbacks( - update_handler=self._update_handler_callback, + on_update=self._on_update, min_batch_size=Mock(return_value=make_awaitable(self._default_batch_size)), default_batch_size=Mock( return_value=make_awaitable(self._default_batch_size), @@ -141,7 +141,7 @@ def test_controller(self): # `run_update` should have been called, but the update handler won't be # called until the `enter_defer` (returned by `__aenter__`) is resolved. - self._update_handler_callback.assert_called_once_with( + self._on_update.assert_called_once_with( "test_update", "master", False, From 66aae9247caed5b8da4a8c7ef598207ac48dd2bb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 19 Nov 2021 13:26:22 +0100 Subject: [PATCH 12/18] Add docs --- .../background_update_controller_callbacks.md | 68 +++++++++++++++++++ docs/modules/writing_a_module.md | 12 ++-- 2 files changed, 74 insertions(+), 6 deletions(-) create mode 100644 docs/modules/background_update_controller_callbacks.md diff --git a/docs/modules/background_update_controller_callbacks.md b/docs/modules/background_update_controller_callbacks.md new file mode 100644 index 000000000000..d97f577961a6 --- /dev/null +++ b/docs/modules/background_update_controller_callbacks.md @@ -0,0 +1,68 @@ +# Background update controller callbacks + +Background update controller callbacks allow module developers to control (e.g. rate-limit) +how database background updates are run. A database background update is an operation +Synapse runs on its database in the background after it starts. It's usually used to run +database operations that would take too long if they were run at the same time as schema +updates (which are run on startup) and delay Synapse's startup too much: populating a +table with a big amount of data, adding an index on a big table, etc. + +Background update controller callbacks can be registered using the module API's +`register_background_update_controller_callbacks` method. Only the first module (in order +of appearance in Synapse's configuration file) calling this method can register background +update controller callbacks, subsequent calls are ignored. + +The available background update controller callbacks are: + +### `on_update` + +_First introduced in Synapse v1.48.0_ + +```python +def on_update(update_name: str, database_name: str, one_shot: bool) -> AsyncContextManager[int] +``` + +Called when about to do an iteration of a background update. The module is given the name +of the update, the name of the database, and a flag to indicate whether the background +update will happen in one go and may take a long time (e.g. creating indices). If this last +argument is set to `False`, the update will be run in batches. + +The module must return an async context manager that returns the desired duration of the +iteration, in milliseconds, and will be exited when the iteration completes. Note that the +duration returned by the context manager is a target, and an iteration may take +substantially longer or shorter. If the `one_shot` flag is set to `True`, the duration +returned is ignored. + +__Note__: Unlike most module callbacks in Synapse, this one is _synchronous_. This is +because asynchronous operations are expected to be run by the async context manager. + +This callback is required when registering any other background update controller callback. + +### `default_batch_size` + +_First introduced in Synapse v1.48.0_ + +```python +async def default_batch_size(update_name: str, database_name: str) -> int +``` + +Called before the first iteration of a background update, with the name of the update and +of the database. The module must return the number of elements to process in this first +iteration. + +If this callback is not defined, Synapse will use a default value of 100. + +### `min_batch_size` + +_First introduced in Synapse v1.48.0_ + +```python +async def min_batch_size(update_name: str, database_name: str) -> int +``` + +Called before running a new batch for a background update, with the name of the update and +of the database. The module must return the minimum number of elements to process in this +iteration. This number must be greater than 0, and is used to ensure that progress is +always made. + +If this callback is not defined, Synapse will use a default value of 100. diff --git a/docs/modules/writing_a_module.md b/docs/modules/writing_a_module.md index 7764e066926b..e7c0ffad58bf 100644 --- a/docs/modules/writing_a_module.md +++ b/docs/modules/writing_a_module.md @@ -71,15 +71,15 @@ Modules **must** register their web resources in their `__init__` method. ## Registering a callback Modules can use Synapse's module API to register callbacks. Callbacks are functions that -Synapse will call when performing specific actions. Callbacks must be asynchronous, and -are split in categories. A single module may implement callbacks from multiple categories, -and is under no obligation to implement all callbacks from the categories it registers -callbacks for. +Synapse will call when performing specific actions. Callbacks must be asynchronous (unless +specified otherwise), and are split in categories. A single module may implement callbacks +from multiple categories, and is under no obligation to implement all callbacks from the +categories it registers callbacks for. Modules can register callbacks using one of the module API's `register_[...]_callbacks` methods. The callback functions are passed to these methods as keyword arguments, with -the callback name as the argument name and the function as its value. This is demonstrated -in the example below. A `register_[...]_callbacks` method exists for each category. +the callback name as the argument name and the function as its value. A +`register_[...]_callbacks` method exists for each category. Callbacks for each category can be found on their respective page of the [Synapse documentation website](https://matrix-org.github.io/synapse). \ No newline at end of file From f5d551a7eaf9e481d2e47b92cd8f35206b7efe03 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 19 Nov 2021 14:13:29 +0100 Subject: [PATCH 13/18] Fixup changelog --- changelog.d/11306.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/11306.feature b/changelog.d/11306.feature index 99c1cfaaf831..aba32920159b 100644 --- a/changelog.d/11306.feature +++ b/changelog.d/11306.feature @@ -1 +1 @@ -Add new plugin support for controlling background update timings. +Add plugin support for controlling database background updates. From 82e880e0dbaaaa3c251f5e87aad3ef71384a1bf7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 19 Nov 2021 14:35:29 +0100 Subject: [PATCH 14/18] Let more time for the update to complete --- tests/rest/admin/test_background_updates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py index 78c48db552de..5b6c1725eeae 100644 --- a/tests/rest/admin/test_background_updates.py +++ b/tests/rest/admin/test_background_updates.py @@ -75,7 +75,7 @@ def test_status_bg_update(self): self._register_bg_update() self.store.db_pool.updates.start_doing_background_updates() - self.reactor.pump([1.0, 1.0]) + self.reactor.pump([1.0, 1.0, 1.0]) channel = self.make_request( "GET", From 26a61b4d640fe57466fc32eac6900aaaaf7fa8e5 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 23 Nov 2021 11:14:08 +0000 Subject: [PATCH 15/18] Fix test --- tests/unittest.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index f478f99be182..eea0903f0574 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -333,10 +333,15 @@ def wait_on_thread(self, deferred, timeout=10): def wait_for_background_updates(self) -> None: """Block until all background database updates have completed. - Note that callers must ensure that's a store property created on the + Note that callers must ensure there's a store property created on the testcase. """ - self.get_success(self.store.db_pool.updates.run_background_updates(False)) + while not self.get_success( + self.store.db_pool.updates.has_completed_background_updates() + ): + self.get_success( + self.store.db_pool.updates.do_next_background_update(False), by=0.1 + ) def make_homeserver(self, reactor, clock): """ From 1a99abe56be93438b871879bd9e64693cb51a488 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 23 Nov 2021 14:12:25 +0000 Subject: [PATCH 16/18] Allow modules to run a function in a thread --- synapse/module_api/__init__.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index b63156286cff..19a27ebd89c8 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -24,6 +24,7 @@ List, Optional, Tuple, + TypeVar, Union, ) @@ -81,7 +82,11 @@ ) from synapse.http.servlet import parse_json_object_from_request from synapse.http.site import SynapseRequest -from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.context import ( + defer_to_thread, + make_deferred_yieldable, + run_in_background +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client.login import LoginResponse from synapse.storage import DataStore @@ -109,6 +114,8 @@ from synapse.app.generic_worker import GenericWorkerSlavedStore from synapse.server import HomeServer +TV = TypeVar("TV") + """ This package defines the 'stable' API which can be used by extension modules which are loaded into Synapse. @@ -320,7 +327,7 @@ def register_background_update_controller_callbacks( ) -> None: """Registers background update controller callbacks. - Added in Synapse v1.48.0. + Added in Synapse v1.49.0. """ for db in self._hs.get_datastores().databases: @@ -1152,6 +1159,26 @@ async def get_room_state( return {key: state_events[event_id] for key, event_id in state_ids.items()} + async def defer_to_thread( + self, + f: Callable[[...], TV], + *args: Any, + **kwargs: Any, + ) -> TV: + """Runs the given function in a separate thread from Synapse's thread pool. + + Added in Synapse v1.49.0. + + Args: + f: The function to run. + args: The function's arguments. + kwargs: The function's keyword arguments. + + Returns: + The return value of the function once ran in a thread. + """ + return await defer_to_thread(self._hs.get_reactor(), f, *args, **kwargs) + class PublicRoomListManager: """Contains methods for adding to, removing from and querying whether a room From 08bb9b1e421356e53c9c8340afe9fef097b07f11 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 23 Nov 2021 14:46:02 +0000 Subject: [PATCH 17/18] Lint --- synapse/module_api/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 19a27ebd89c8..af94040b60d8 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -85,7 +85,7 @@ from synapse.logging.context import ( defer_to_thread, make_deferred_yieldable, - run_in_background + run_in_background, ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client.login import LoginResponse @@ -1161,7 +1161,7 @@ async def get_room_state( async def defer_to_thread( self, - f: Callable[[...], TV], + f: Callable[..., TV], *args: Any, **kwargs: Any, ) -> TV: From 589d8ea43ce6ec230af829badce2877aac7218c5 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 29 Nov 2021 16:16:52 +0000 Subject: [PATCH 18/18] Incorporate review comments --- .../background_update_controller_callbacks.md | 27 ++++++++++--------- tests/push/test_email.py | 5 ++-- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/docs/modules/background_update_controller_callbacks.md b/docs/modules/background_update_controller_callbacks.md index d97f577961a6..b3e7c259f4ae 100644 --- a/docs/modules/background_update_controller_callbacks.md +++ b/docs/modules/background_update_controller_callbacks.md @@ -5,7 +5,8 @@ how database background updates are run. A database background update is an oper Synapse runs on its database in the background after it starts. It's usually used to run database operations that would take too long if they were run at the same time as schema updates (which are run on startup) and delay Synapse's startup too much: populating a -table with a big amount of data, adding an index on a big table, etc. +table with a big amount of data, adding an index on a big table, deleting superfluous data, +etc. Background update controller callbacks can be registered using the module API's `register_background_update_controller_callbacks` method. Only the first module (in order @@ -16,7 +17,7 @@ The available background update controller callbacks are: ### `on_update` -_First introduced in Synapse v1.48.0_ +_First introduced in Synapse v1.49.0_ ```python def on_update(update_name: str, database_name: str, one_shot: bool) -> AsyncContextManager[int] @@ -27,11 +28,13 @@ of the update, the name of the database, and a flag to indicate whether the back update will happen in one go and may take a long time (e.g. creating indices). If this last argument is set to `False`, the update will be run in batches. -The module must return an async context manager that returns the desired duration of the -iteration, in milliseconds, and will be exited when the iteration completes. Note that the -duration returned by the context manager is a target, and an iteration may take -substantially longer or shorter. If the `one_shot` flag is set to `True`, the duration -returned is ignored. +The module must return an async context manager. It will be entered before Synapse runs a +background update; this should return the desired duration of the iteration, in +milliseconds. + +The context manager will be exited when the iteration completes. Note that the duration +returned by the context manager is a target, and an iteration may take substantially longer +or shorter. If the `one_shot` flag is set to `True`, the duration returned is ignored. __Note__: Unlike most module callbacks in Synapse, this one is _synchronous_. This is because asynchronous operations are expected to be run by the async context manager. @@ -40,7 +43,7 @@ This callback is required when registering any other background update controlle ### `default_batch_size` -_First introduced in Synapse v1.48.0_ +_First introduced in Synapse v1.49.0_ ```python async def default_batch_size(update_name: str, database_name: str) -> int @@ -54,15 +57,15 @@ If this callback is not defined, Synapse will use a default value of 100. ### `min_batch_size` -_First introduced in Synapse v1.48.0_ +_First introduced in Synapse v1.49.0_ ```python async def min_batch_size(update_name: str, database_name: str) -> int ``` Called before running a new batch for a background update, with the name of the update and -of the database. The module must return the minimum number of elements to process in this -iteration. This number must be greater than 0, and is used to ensure that progress is -always made. +of the database. The module must return an integer representing the minimum number of +elements to process in this iteration. This number must be at least 1, and is used to +ensure that progress is always made. If this callback is not defined, Synapse will use a default value of 100. diff --git a/tests/push/test_email.py b/tests/push/test_email.py index ad7a2bf3349c..f8cba7b64584 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -128,6 +128,7 @@ def prepare(self, reactor, clock, hs): ) self.auth_handler = hs.get_auth_handler() + self.store = hs.get_datastore() def test_need_validated_email(self): """Test that we can only add an email pusher if the user has validated @@ -408,9 +409,7 @@ def test_remove_unlinked_pushers_background_job(self): self.hs.get_datastore().db_pool.updates._all_done = False # Now let's actually drive the updates to completion - self.get_success( - self.hs.get_datastore().db_pool.updates.run_background_updates(False) - ) + self.wait_for_background_updates() # Check that all pushers with unlinked addresses were deleted pushers = self.get_success(