From 3a76eb400b70e353ab17749259418c246b73d9de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:07:38 +0100 Subject: [PATCH 01/13] Only pull out old tasks --- synapse/util/task_scheduler.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index b7de201bdeda..6466d6f5693e 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -291,16 +291,14 @@ async def _launch_scheduled_tasks(self) -> None: async def _clean_scheduled_tasks(self) -> None: """Clean old complete or failed jobs to avoid clutter the DB.""" + now = self._clock.time_msec() for task in await self._store.get_scheduled_tasks( - statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] + statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE], + max_timestamp=now - TaskScheduler.KEEP_TASKS_FOR_MS, ): # FAILED and COMPLETE tasks should never be running assert task.id not in self._running_tasks - if ( - self._clock.time_msec() - > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS - ): - await self._store.delete_scheduled_task(task.id) + await self._store.delete_scheduled_task(task.id) async def _launch_task(self, task: ScheduledTask) -> None: """Launch a scheduled task now. From adccd7a5716b906a9b4ad4806e1ae83a450cdacf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:11:19 +0100 Subject: [PATCH 02/13] Add index to 'timestamp' --- .../main/delta/82/02_scheduled_tasks_index.sql | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql diff --git a/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql b/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql new file mode 100644 index 000000000000..6b9027513961 --- /dev/null +++ b/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql @@ -0,0 +1,16 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX IF NOT EXISTS scheduled_tasks_timestamp ON scheduled_tasks(timestamp); From 908010211bc6d8cff42e2756bedca8aba06c859b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:17:13 +0100 Subject: [PATCH 03/13] Limit number of tasks we fetch --- synapse/storage/databases/main/task_scheduler.py | 6 ++++++ synapse/util/task_scheduler.py | 11 +++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 9ab120eea9ca..5c5372a8259d 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -53,6 +53,7 @@ async def get_scheduled_tasks( resource_id: Optional[str] = None, statuses: Optional[List[TaskStatus]] = None, max_timestamp: Optional[int] = None, + limit: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of scheduled tasks from the DB. @@ -62,6 +63,7 @@ async def get_scheduled_tasks( statuses: Limit the returned tasks to the specific statuses max_timestamp: Limit the returned tasks to the ones that have a timestamp inferior to the specified one + limit: Only return `limit` number of rows if set. Returns: a list of `ScheduledTask`, ordered by increasing timestamps """ @@ -94,6 +96,10 @@ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: sql = sql + " ORDER BY timestamp" + if limit is not None: + sql += " LIMIT ?" + args.append(limit) + txn.execute(sql, args) return self.db_pool.cursor_to_dict(txn) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 6466d6f5693e..7d8c43be92f7 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -234,6 +234,7 @@ async def get_tasks( resource_id: Optional[str] = None, statuses: Optional[List[TaskStatus]] = None, max_timestamp: Optional[int] = None, + limit: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of tasks. Returns all the tasks if no args is provided. @@ -247,6 +248,7 @@ async def get_tasks( statuses: Limit the returned tasks to the specific statuses max_timestamp: Limit the returned tasks to the ones that have a timestamp inferior to the specified one + limit: Only return `limit` number of rows if set. Returns A list of `ScheduledTask`, ordered by increasing timestamps @@ -256,6 +258,7 @@ async def get_tasks( resource_id=resource_id, statuses=statuses, max_timestamp=max_timestamp, + limit=limit, ) async def delete_task(self, id: str) -> None: @@ -280,10 +283,14 @@ async def _handle_scheduled_tasks(self) -> None: async def _launch_scheduled_tasks(self) -> None: """Retrieve and launch scheduled tasks that should be running at that time.""" - for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): + for task in await self.get_tasks( + statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS + ): await self._launch_task(task) for task in await self.get_tasks( - statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec() + statuses=[TaskStatus.SCHEDULED], + max_timestamp=self._clock.time_msec(), + limit=self.MAX_CONCURRENT_RUNNING_TASKS, ): await self._launch_task(task) From a0c02ad18414d3c81e8f904d3661061a142fbc18 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:17:57 +0100 Subject: [PATCH 04/13] Fast path when we're at capacity --- synapse/util/task_scheduler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 7d8c43be92f7..b0d3f83df7fc 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -283,6 +283,10 @@ async def _handle_scheduled_tasks(self) -> None: async def _launch_scheduled_tasks(self) -> None: """Retrieve and launch scheduled tasks that should be running at that time.""" + # Don't bother trying to launch new tasks if we're already at capacity. + if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: + return + for task in await self.get_tasks( statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS ): From f7b28da006453a596378cc36552c66341d756f02 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:46:18 +0100 Subject: [PATCH 05/13] Split launch and cleanup tasks --- synapse/util/task_scheduler.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index b0d3f83df7fc..5897cc3b6ec6 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -20,7 +20,10 @@ from twisted.python.failure import Failure from synapse.logging.context import nested_logging_context -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.types import JsonMapping, ScheduledTask, TaskStatus from synapse.util.stringutils import random_string @@ -70,6 +73,8 @@ class TaskScheduler: # Precision of the scheduler, evaluation of tasks to run will only happen # every `SCHEDULE_INTERVAL_MS` ms SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn + # How often to clean up old tasks. + CLEANUP_INTERVAL_MS = 30 * 60 * 1000 # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time @@ -94,10 +99,12 @@ def __init__(self, hs: "HomeServer"): if self._run_background_tasks: self._clock.looping_call( - run_as_background_process, + self._launch_scheduled_tasks, + TaskScheduler.SCHEDULE_INTERVAL_MS, + ) + self._clock.looping_call( + self._clean_scheduled_tasks, TaskScheduler.SCHEDULE_INTERVAL_MS, - "handle_scheduled_tasks", - self._handle_scheduled_tasks, ) def register_action( @@ -276,11 +283,7 @@ async def delete_task(self, id: str) -> None: raise Exception(f"Task {id} is currently ACTIVE and can't be deleted") await self._store.delete_scheduled_task(id) - async def _handle_scheduled_tasks(self) -> None: - """Main loop taking care of launching tasks and cleaning up old ones.""" - await self._launch_scheduled_tasks() - await self._clean_scheduled_tasks() - + @wrap_as_background_process("launch_scheduled_tasks") async def _launch_scheduled_tasks(self) -> None: """Retrieve and launch scheduled tasks that should be running at that time.""" # Don't bother trying to launch new tasks if we're already at capacity. @@ -300,6 +303,7 @@ async def _launch_scheduled_tasks(self) -> None: running_tasks_gauge.set(len(self._running_tasks)) + @wrap_as_background_process("clean_scheduled_tasks") async def _clean_scheduled_tasks(self) -> None: """Clean old complete or failed jobs to avoid clutter the DB.""" now = self._clock.time_msec() From f3a576a17f6feb86f40d7525baff67c1670a9fd9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:19:22 +0100 Subject: [PATCH 06/13] Try launching a new task immediately when a task finishes --- synapse/util/task_scheduler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 5897cc3b6ec6..bdd1c4d94353 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -352,6 +352,9 @@ async def wrapper() -> None: ) self._running_tasks.remove(task.id) + # Try launch a new task since we've finished with this one. + self._clock.call_later(1, self._launch_scheduled_tasks) + if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: return From 1976defdb06950e795c5336da4ec6566edaed1ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:22:52 +0100 Subject: [PATCH 07/13] Make replication handling more efficient By making replication handler function non-async we avoid launching a background task for each and every received command. --- synapse/replication/tcp/handler.py | 6 ++---- synapse/util/task_scheduler.py | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 5642666411cc..b668bb5da1de 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -672,14 +672,12 @@ def on_LOCK_RELEASED( cmd.instance_name, cmd.lock_name, cmd.lock_key ) - async def on_NEW_ACTIVE_TASK( + def on_NEW_ACTIVE_TASK( self, conn: IReplicationConnection, cmd: NewActiveTaskCommand ) -> None: """Called when get a new NEW_ACTIVE_TASK command.""" if self._task_scheduler: - task = await self._task_scheduler.get_task(cmd.data) - if task: - await self._task_scheduler._launch_task(task) + self._task_scheduler.launch_task_by_id(cmd.data) def new_connection(self, connection: IReplicationConnection) -> None: """Called when we have a new connection.""" diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index bdd1c4d94353..b73e9c14e906 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -283,6 +283,20 @@ async def delete_task(self, id: str) -> None: raise Exception(f"Task {id} is currently ACTIVE and can't be deleted") await self._store.delete_scheduled_task(id) + def launch_task_by_id(self, id: str) -> None: + """Try launching the task with the given ID.""" + # Don't bother trying to launch new tasks if we're already at capacity. + if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: + return + + run_as_background_process("launch_task_by_id", self._launch_task_by_id, id) + + async def _launch_task_by_id(self, id: str) -> None: + """Helper async function for `launch_task_by_id`.""" + task = await self.get_task(id) + if task: + await self._launch_task(task) + @wrap_as_background_process("launch_scheduled_tasks") async def _launch_scheduled_tasks(self) -> None: """Retrieve and launch scheduled tasks that should be running at that time.""" From 7f485e279d137baf126ff240460ee7160cc96d2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:25:16 +0100 Subject: [PATCH 08/13] Prefix background process desc --- synapse/util/task_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index b73e9c14e906..7b688e3d2c4e 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -385,4 +385,4 @@ async def wrapper() -> None: self._running_tasks.add(task.id) await self.update_task(task.id, status=TaskStatus.ACTIVE) - run_as_background_process(task.action, wrapper) + run_as_background_process(f"task-{task.action}", wrapper) From f677fca30e4010c55ab8b1cb5537d82dcddafba2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 13:26:03 +0100 Subject: [PATCH 09/13] Newsfile --- changelog.d/16313.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16313.misc diff --git a/changelog.d/16313.misc b/changelog.d/16313.misc new file mode 100644 index 000000000000..4f266c1fb029 --- /dev/null +++ b/changelog.d/16313.misc @@ -0,0 +1 @@ +Delete device messages asynchronously and in staged batches using the task scheduler. From 2283b3f916880cdf0219b2abea2f96e23826ca35 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Sep 2023 15:14:15 +0100 Subject: [PATCH 10/13] Make sure we only have one concurrent `_launch_scheduled_tasks` at a time. --- synapse/util/task_scheduler.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 7b688e3d2c4e..0a5b5c6e1e43 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -97,6 +97,9 @@ def __init__(self, hs: "HomeServer"): ] = {} self._run_background_tasks = hs.config.worker.run_background_tasks + # Flag to make sure we only try and launch new tasks once at a time. + self._launching_new_tasks = False + if self._run_background_tasks: self._clock.looping_call( self._launch_scheduled_tasks, @@ -304,18 +307,26 @@ async def _launch_scheduled_tasks(self) -> None: if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: return - for task in await self.get_tasks( - statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS - ): - await self._launch_task(task) - for task in await self.get_tasks( - statuses=[TaskStatus.SCHEDULED], - max_timestamp=self._clock.time_msec(), - limit=self.MAX_CONCURRENT_RUNNING_TASKS, - ): - await self._launch_task(task) + if self._launching_new_tasks: + return + + self._launching_new_tasks = True + + try: + for task in await self.get_tasks( + statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS + ): + await self._launch_task(task) + for task in await self.get_tasks( + statuses=[TaskStatus.SCHEDULED], + max_timestamp=self._clock.time_msec(), + limit=self.MAX_CONCURRENT_RUNNING_TASKS, + ): + await self._launch_task(task) - running_tasks_gauge.set(len(self._running_tasks)) + running_tasks_gauge.set(len(self._running_tasks)) + finally: + self._launching_new_tasks = False @wrap_as_background_process("clean_scheduled_tasks") async def _clean_scheduled_tasks(self) -> None: From 1b9378317a68c4c446964e291d930e468e5e03e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Sep 2023 15:16:44 +0100 Subject: [PATCH 11/13] Use a LaterGauge --- synapse/util/task_scheduler.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 0a5b5c6e1e43..32978ac1d006 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -20,6 +20,7 @@ from twisted.python.failure import Failure from synapse.logging.context import nested_logging_context +from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -33,12 +34,6 @@ logger = logging.getLogger(__name__) -running_tasks_gauge = Gauge( - "synapse_scheduler_running_tasks", - "The number of concurrent running tasks handled by the TaskScheduler", -) - - class TaskScheduler: """ This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background` @@ -110,6 +105,13 @@ def __init__(self, hs: "HomeServer"): TaskScheduler.SCHEDULE_INTERVAL_MS, ) + LaterGauge( + "synapse_scheduler_running_tasks", + "The number of concurrent running tasks handled by the TaskScheduler", + labels=None, + caller=lambda: len(self._running_tasks), + ) + def register_action( self, function: Callable[ @@ -324,7 +326,6 @@ async def _launch_scheduled_tasks(self) -> None: ): await self._launch_task(task) - running_tasks_gauge.set(len(self._running_tasks)) finally: self._launching_new_tasks = False From 3696c0532a00413feef197804ebf0b68ffdc8154 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Sep 2023 15:26:46 +0100 Subject: [PATCH 12/13] Lint --- synapse/util/task_scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 32978ac1d006..437275154a62 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -15,7 +15,6 @@ import logging from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple -from prometheus_client import Gauge from twisted.python.failure import Failure From 7f801bc884f918d5a3622f520ac0c07c839063ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Sep 2023 15:34:53 +0100 Subject: [PATCH 13/13] LINT --- synapse/util/task_scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 437275154a62..caf13b3474be 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -15,7 +15,6 @@ import logging from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple - from twisted.python.failure import Failure from synapse.logging.context import nested_logging_context