From db5413f8da1f2d4f3e305c7c69486d24b2ddbe77 Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Fri, 10 Nov 2023 09:47:19 +0000 Subject: [PATCH] Add configuration option to disable status updates via message bus (#331) As well as data documents, blueapi produces events when the status objects monitored by the run engine are updated. These events are useful for creating progress bars and similar updates. Unfortunately it seems very easy to unintentionally make plans/devices produce a very large number of these updates. The handling of all of these results in log spam and high CPU usage. We're seeing this now on I22 and have seen similar problems before (see #111). I think the easy way to make debugging easier is to make the status update handling optional and easy to turn off via config. To that end... Changes: - Add config option to disable status events - Make the worker only hook into the run engine if this option is marked as true --- src/blueapi/config.py | 9 +++++++++ src/blueapi/service/handler.py | 5 ++++- src/blueapi/worker/reworker.py | 5 ++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 02560ea7f..c802c5187 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -46,6 +46,14 @@ class DataWritingConfig(BlueapiBaseModel): group_name: str = "example" +class WorkerEventConfig(BlueapiBaseModel): + """ + Config for event broadcasting via the message bus + """ + + broadcast_status_events: bool = True + + class EnvironmentConfig(BlueapiBaseModel): """ Config for the RunEngine environment @@ -60,6 +68,7 @@ class EnvironmentConfig(BlueapiBaseModel): Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.stubs"), ] data_writing: DataWritingConfig = Field(default_factory=DataWritingConfig) + events: WorkerEventConfig = Field(default_factory=WorkerEventConfig) class LoggingConfig(BlueapiBaseModel): diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index 4a006a960..b6db8be93 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -37,7 +37,10 @@ def __init__( self.context.with_config(self.config.env) - self.worker = worker or RunEngineWorker(self.context) + self.worker = worker or RunEngineWorker( + self.context, + broadcast_statuses=self.config.env.events.broadcast_status_events, + ) self.messaging_template = ( messaging_template or StompMessagingTemplate.autoconfigured(self.config.stomp) diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index 7444abd79..0ff3a6ec2 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -69,6 +69,7 @@ def __init__( self, ctx: BlueskyContext, start_stop_timeout: float = DEFAULT_START_STOP_TIMEOUT, + broadcast_statuses: bool = True, ) -> None: self._ctx = ctx self._start_stop_timeout = start_stop_timeout @@ -90,6 +91,7 @@ def __init__( self._stopping = Event() self._stopped = Event() self._stopped.set() + self._broadcast_statuses = broadcast_statuses def clear_task(self, task_id: str) -> str: task = self._pending_tasks.pop(task_id) @@ -197,7 +199,8 @@ def run(self) -> None: LOGGER.info("Worker starting") self._ctx.run_engine.state_hook = self._on_state_change self._ctx.run_engine.subscribe(self._on_document) - self._ctx.run_engine.waiting_hook = self._waiting_hook + if self._broadcast_statuses: + self._ctx.run_engine.waiting_hook = self._waiting_hook self._stopped.clear() self._started.set()