diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index f99a7aee6..173f0551a 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -1,8 +1,9 @@ import logging -from typing import Optional +from typing import Mapping, Optional from blueapi.config import ApplicationConfig from blueapi.core import BlueskyContext +from blueapi.core.event import EventStream from blueapi.messaging import StompMessagingTemplate from blueapi.messaging.base import MessagingTemplate from blueapi.worker.reworker import RunEngineWorker @@ -29,19 +30,35 @@ def __init__(self, config: Optional[ApplicationConfig] = None) -> None: def start(self) -> None: self.worker.start() - self.worker.data_events.subscribe( - lambda event, corr_id: self.message_bus.send( - "public.worker.event.data", event, None, corr_id - ) - ) - self.worker.progress_events.subscribe( - lambda event, corr_id: self.message_bus.send( - "public.worker.event.progress", event, None, corr_id - ) + self._publish_event_streams( + { + self.worker.worker_events: self.message_bus.destinations.topic( + "public.worker.event" + ), + self.worker.progress_events: self.message_bus.destinations.topic( + "public.worker.event.progress" + ), + self.worker.data_events: self.message_bus.destinations.topic( + "public.worker.event.data" + ), + } ) self.message_bus.connect() + def _publish_event_streams( + self, streams_to_destinations: Mapping[EventStream, str] + ) -> None: + for stream, destination in streams_to_destinations.items(): + self._publish_event_stream(stream, destination) + + def _publish_event_stream(self, stream: EventStream, destination: str) -> None: + stream.subscribe( + lambda event, correlation_id: self.message_bus.send( + destination, event, None, correlation_id + ) + ) + def stop(self) -> None: self.worker.stop() self.message_bus.disconnect()