Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make background updates controllable via a plugin (#11306)
Browse files Browse the repository at this point in the history
Co-authored-by: Brendan Abolivier <babolivier@matrix.org>
  • Loading branch information
erikjohnston and babolivier authored Nov 29, 2021
1 parent 9d1971a commit d08ef6f
Show file tree
Hide file tree
Showing 12 changed files with 407 additions and 61 deletions.
1 change: 1 addition & 0 deletions changelog.d/11306.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add plugin support for controlling database background updates.
71 changes: 71 additions & 0 deletions docs/modules/background_update_controller_callbacks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Background update controller callbacks

Background update controller callbacks allow module developers to control (e.g. rate-limit)
how database background updates are run. A database background update is an operation
Synapse runs on its database in the background after it starts. It's usually used to run
database operations that would take too long if they were run at the same time as schema
updates (which are run on startup) and delay Synapse's startup too much: populating a
table with a big amount of data, adding an index on a big table, deleting superfluous data,
etc.

Background update controller callbacks can be registered using the module API's
`register_background_update_controller_callbacks` method. Only the first module (in order
of appearance in Synapse's configuration file) calling this method can register background
update controller callbacks, subsequent calls are ignored.

The available background update controller callbacks are:

### `on_update`

_First introduced in Synapse v1.49.0_

```python
def on_update(update_name: str, database_name: str, one_shot: bool) -> AsyncContextManager[int]
```

Called when about to do an iteration of a background update. The module is given the name
of the update, the name of the database, and a flag to indicate whether the background
update will happen in one go and may take a long time (e.g. creating indices). If this last
argument is set to `False`, the update will be run in batches.

The module must return an async context manager. It will be entered before Synapse runs a
background update; this should return the desired duration of the iteration, in
milliseconds.

The context manager will be exited when the iteration completes. Note that the duration
returned by the context manager is a target, and an iteration may take substantially longer
or shorter. If the `one_shot` flag is set to `True`, the duration returned is ignored.

__Note__: Unlike most module callbacks in Synapse, this one is _synchronous_. This is
because asynchronous operations are expected to be run by the async context manager.

This callback is required when registering any other background update controller callback.

### `default_batch_size`

_First introduced in Synapse v1.49.0_

```python
async def default_batch_size(update_name: str, database_name: str) -> int
```

Called before the first iteration of a background update, with the name of the update and
of the database. The module must return the number of elements to process in this first
iteration.

If this callback is not defined, Synapse will use a default value of 100.

### `min_batch_size`

_First introduced in Synapse v1.49.0_

```python
async def min_batch_size(update_name: str, database_name: str) -> int
```

Called before running a new batch for a background update, with the name of the update and
of the database. The module must return an integer representing the minimum number of
elements to process in this iteration. This number must be at least 1, and is used to
ensure that progress is always made.

If this callback is not defined, Synapse will use a default value of 100.
12 changes: 6 additions & 6 deletions docs/modules/writing_a_module.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ Modules **must** register their web resources in their `__init__` method.
## Registering a callback

Modules can use Synapse's module API to register callbacks. Callbacks are functions that
Synapse will call when performing specific actions. Callbacks must be asynchronous, and
are split in categories. A single module may implement callbacks from multiple categories,
and is under no obligation to implement all callbacks from the categories it registers
callbacks for.
Synapse will call when performing specific actions. Callbacks must be asynchronous (unless
specified otherwise), and are split in categories. A single module may implement callbacks
from multiple categories, and is under no obligation to implement all callbacks from the
categories it registers callbacks for.

Modules can register callbacks using one of the module API's `register_[...]_callbacks`
methods. The callback functions are passed to these methods as keyword arguments, with
the callback name as the argument name and the function as its value. This is demonstrated
in the example below. A `register_[...]_callbacks` method exists for each category.
the callback name as the argument name and the function as its value. A
`register_[...]_callbacks` method exists for each category.

Callbacks for each category can be found on their respective page of the
[Synapse documentation website](https://matrix-org.github.io/synapse).
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ def exec_file(path_segments):
# Tests assume that all optional dependencies are installed.
#
# parameterized_class decorator was introduced in parameterized 0.7.0
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0"]
#
# We use `mock` library as that backports `AsyncMock` to Python 3.6
CONDITIONAL_REQUIREMENTS["test"] = ["parameterized>=0.7.0", "mock>=4.0.0"]

CONDITIONAL_REQUIREMENTS["dev"] = (
CONDITIONAL_REQUIREMENTS["lint"]
Expand Down
54 changes: 53 additions & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,19 @@
)
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.context import (
defer_to_thread,
make_deferred_yieldable,
run_in_background,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.client.login import LoginResponse
from synapse.storage import DataStore
from synapse.storage.background_updates import (
DEFAULT_BATCH_SIZE_CALLBACK,
MIN_BATCH_SIZE_CALLBACK,
ON_UPDATE_CALLBACK,
)
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.roommember import ProfileInfo
from synapse.storage.state import StateFilter
Expand Down Expand Up @@ -311,6 +320,24 @@ def register_password_auth_provider_callbacks(
auth_checkers=auth_checkers,
)

def register_background_update_controller_callbacks(
self,
on_update: ON_UPDATE_CALLBACK,
default_batch_size: Optional[DEFAULT_BATCH_SIZE_CALLBACK] = None,
min_batch_size: Optional[MIN_BATCH_SIZE_CALLBACK] = None,
) -> None:
"""Registers background update controller callbacks.
Added in Synapse v1.49.0.
"""

for db in self._hs.get_datastores().databases:
db.updates.register_update_controller_callbacks(
on_update=on_update,
default_batch_size=default_batch_size,
min_batch_size=min_batch_size,
)

def register_web_resource(self, path: str, resource: Resource) -> None:
"""Registers a web resource to be served at the given path.
Expand Down Expand Up @@ -995,6 +1022,11 @@ def looping_background_call(
f,
)

async def sleep(self, seconds: float) -> None:
"""Sleeps for the given number of seconds."""

await self._clock.sleep(seconds)

async def send_mail(
self,
recipient: str,
Expand Down Expand Up @@ -1149,6 +1181,26 @@ async def get_room_state(

return {key: state_events[event_id] for key, event_id in state_ids.items()}

async def defer_to_thread(
self,
f: Callable[..., T],
*args: Any,
**kwargs: Any,
) -> T:
"""Runs the given function in a separate thread from Synapse's thread pool.
Added in Synapse v1.49.0.
Args:
f: The function to run.
args: The function's arguments.
kwargs: The function's keyword arguments.
Returns:
The return value of the function once ran in a thread.
"""
return await defer_to_thread(self._hs.get_reactor(), f, *args, **kwargs)


class PublicRoomListManager:
"""Contains methods for adding to, removing from and querying whether a room
Expand Down
Loading

0 comments on commit d08ef6f

Please sign in to comment.