From 8296f30399c36030cba386886bf8e4c7ff7dfa37 Mon Sep 17 00:00:00 2001 From: Stanislaw Malinowski Date: Mon, 13 May 2024 09:40:50 +0100 Subject: [PATCH] rename amq client to event bus client --- src/blueapi/cli/cli.py | 20 ++++++++++--------- .../cli/{amq.py => event_bus_client.py} | 2 +- src/blueapi/cli/rest.py | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) rename src/blueapi/cli/{amq.py => event_bus_client.py} (99%) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index d8be05ad9..7e7c81710 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -9,7 +9,7 @@ from requests.exceptions import ConnectionError from blueapi import __version__ -from blueapi.cli.amq import AmqClient +from blueapi.cli.event_bus_client import EventBusClient from blueapi.config import ApplicationConfig, ConfigLoader from blueapi.core import DataEvent from blueapi.messaging import MessageContext @@ -135,7 +135,9 @@ def listen_to_events(obj: dict) -> None: """Listen to events output by blueapi""" config: ApplicationConfig = obj["config"] if config.stomp is not None: - amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp)) + event_bus_client = EventBusClient( + StompMessagingTemplate.autoconfigured(config.stomp) + ) else: raise RuntimeError("Message bus needs to be configured") @@ -150,8 +152,8 @@ def on_event( "Subscribing to all bluesky events from " f"{config.stomp.host}:{config.stomp.port}" ) - with amq_client: - amq_client.subscribe_to_all_events(on_event) + with event_bus_client: + event_bus_client.subscribe_to_all_events(on_event) input("Press enter to exit") @@ -181,7 +183,7 @@ def run_plan( raise RuntimeError( "Cannot run plans without Stomp configuration to track progress" ) - amq_client = AmqClient(_message_template) + event_bus_client = EventBusClient(_message_template) finished_event: deque[WorkerEvent] = deque() def store_finished_event(event: WorkerEvent) -> None: @@ -194,13 +196,13 @@ def store_finished_event(event: WorkerEvent) -> None: resp = client.create_task(task) task_id = resp.task_id - with amq_client: - amq_client.subscribe_to_topics(task_id, on_event=store_finished_event) + with event_bus_client: + event_bus_client.subscribe_to_topics(task_id, on_event=store_finished_event) updated = client.update_worker_task(WorkerTask(task_id=task_id)) - amq_client.wait_for_complete(timeout=timeout) + event_bus_client.wait_for_complete(timeout=timeout) - if amq_client.timed_out: + if event_bus_client.timed_out: logger.error(f"Plan did not complete within {timeout} seconds") return diff --git a/src/blueapi/cli/amq.py b/src/blueapi/cli/event_bus_client.py similarity index 99% rename from src/blueapi/cli/amq.py rename to src/blueapi/cli/event_bus_client.py index face01b4b..afa2e4416 100644 --- a/src/blueapi/cli/amq.py +++ b/src/blueapi/cli/event_bus_client.py @@ -18,7 +18,7 @@ def __init__(self, message: str) -> None: _Event = WorkerEvent | ProgressEvent | DataEvent -class AmqClient: +class EventBusClient: app: MessagingTemplate complete: threading.Event timed_out: bool | None diff --git a/src/blueapi/cli/rest.py b/src/blueapi/cli/rest.py index 5e363faee..0fe7abd6e 100644 --- a/src/blueapi/cli/rest.py +++ b/src/blueapi/cli/rest.py @@ -15,7 +15,7 @@ ) from blueapi.worker import Task, TrackableTask, WorkerState -from .amq import BlueskyRemoteError +from .event_bus_client import BlueskyRemoteError T = TypeVar("T")