From aca65079a7d2a9dd273260ade8679b32adcd8fb5 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Tue, 23 Apr 2024 16:23:08 +0100 Subject: [PATCH 01/12] made stomp config optional --- src/blueapi/cli/cli.py | 9 ++++-- src/blueapi/messaging/stomptemplate.py | 4 ++- src/blueapi/service/handler.py | 38 ++++++++++++++------------ tests/messaging/test_stomptemplate.py | 5 +++- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 8372f8006..37cc58254 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -134,7 +134,9 @@ def get_devices(obj: dict) -> None: def listen_to_events(obj: dict) -> None: """Listen to events output by blueapi""" config: ApplicationConfig = obj["config"] - amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp)) + _message_template = StompMessagingTemplate.autoconfigured(config.stomp) + if _message_template is not None: + amq_client = AmqClient(_message_template) def on_event( context: MessageContext, @@ -172,8 +174,9 @@ def run_plan( client: BlueapiRestClient = obj["rest_client"] logger = logging.getLogger(__name__) - - amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp)) + _message_template = StompMessagingTemplate.autoconfigured(config.stomp) + if _message_template is not None: + amq_client = AmqClient(_message_template) finished_event: deque[WorkerEvent] = deque() def store_finished_event(event: WorkerEvent) -> None: diff --git a/src/blueapi/messaging/stomptemplate.py b/src/blueapi/messaging/stomptemplate.py index bf107c36b..8704f9f3e 100644 --- a/src/blueapi/messaging/stomptemplate.py +++ b/src/blueapi/messaging/stomptemplate.py @@ -101,7 +101,9 @@ def __init__( self._subscriptions = {} @classmethod - def autoconfigured(cls, config: StompConfig) -> MessagingTemplate: + def autoconfigured(cls, config: StompConfig | None) -> MessagingTemplate | None: + if config is None: + return None return cls( stomp.Connection( [(config.host, config.port)], diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index f38dcf434..6dd9643e4 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -27,7 +27,7 @@ class Handler(BlueskyHandler): _context: BlueskyContext _worker: Worker _config: ApplicationConfig - _messaging_template: MessagingTemplate + _messaging_template: MessagingTemplate | None _initialized: bool = False def __init__( @@ -54,17 +54,19 @@ def __init__( def start(self) -> None: self._worker.start() - event_topic = self._messaging_template.destinations.topic("public.worker.event") - - self._publish_event_streams( - { - self._worker.worker_events: event_topic, - self._worker.progress_events: event_topic, - self._worker.data_events: event_topic, - } - ) + if self._messaging_template is not None: + event_topic = self._messaging_template.destinations.topic( + "public.worker.event" + ) + self._publish_event_streams( + { + self._worker.worker_events: event_topic, + self._worker.progress_events: event_topic, + self._worker.data_events: event_topic, + } + ) - self._messaging_template.connect() + self._messaging_template.connect() self._initialized = True def _publish_event_streams( @@ -74,16 +76,18 @@ def _publish_event_streams( self._publish_event_stream(stream, destination) def _publish_event_stream(self, stream: EventStream, destination: str) -> None: - stream.subscribe( - lambda event, correlation_id: self._messaging_template.send( - destination, event, None, correlation_id - ) - ) + def forward_message(event, correlation_id): + self._messaging_template.send(destination, event, None, correlation_id) + + stream.subscribe(forward_message) def stop(self) -> None: self._initialized = False self._worker.stop() - if self._messaging_template.is_connected(): + if ( + self._messaging_template is not None + and self._messaging_template.is_connected() + ): self._messaging_template.disconnect() @property diff --git a/tests/messaging/test_stomptemplate.py b/tests/messaging/test_stomptemplate.py index 0e7f8afdf..138768a7d 100644 --- a/tests/messaging/test_stomptemplate.py +++ b/tests/messaging/test_stomptemplate.py @@ -28,13 +28,16 @@ def test_stomp_configs(self) -> Iterable[StompConfig]: @pytest.fixture(params=StompTestingSettings().test_stomp_configs()) def disconnected_template(request: pytest.FixtureRequest) -> MessagingTemplate: stomp_config = request.param - return StompMessagingTemplate.autoconfigured(stomp_config) + template = StompMessagingTemplate.autoconfigured(stomp_config) + assert template is not None + return template @pytest.fixture(params=StompTestingSettings().test_stomp_configs()) def template(request: pytest.FixtureRequest) -> Iterable[MessagingTemplate]: stomp_config = request.param template = StompMessagingTemplate.autoconfigured(stomp_config) + assert template is not None template.connect() yield template template.disconnect() From 48db30d9e66835578e8671fbf9de88180215cf04 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Wed, 24 Apr 2024 13:20:47 +0100 Subject: [PATCH 02/12] changed the default stomp configuration --- src/blueapi/cli/cli.py | 9 +++++---- src/blueapi/config.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 37cc58254..c4518c875 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -145,10 +145,11 @@ def on_event( converted = json.dumps(event.dict(), indent=2) print(converted) - print( - "Subscribing to all bluesky events from " - f"{config.stomp.host}:{config.stomp.port}" - ) + if config.stomp is not None: + print( + "Subscribing to all bluesky events from " + f"{config.stomp.host}:{config.stomp.port}" + ) with amq_client: amq_client.subscribe_to_all_events(on_event) input("Press enter to exit") diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 3fafdc8c6..d4376fb4e 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -98,7 +98,7 @@ class ApplicationConfig(BlueapiBaseModel): config tree. """ - stomp: StompConfig = Field(default_factory=StompConfig) + stomp: StompConfig | None = None env: EnvironmentConfig = Field(default_factory=EnvironmentConfig) logging: LoggingConfig = Field(default_factory=LoggingConfig) api: RestConfig = Field(default_factory=RestConfig) From 2f62a1636aa36d4cd33da2bbcf16699432b69de8 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Wed, 24 Apr 2024 15:18:34 +0100 Subject: [PATCH 03/12] added test for none check --- tests/messaging/test_stomptemplate.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/messaging/test_stomptemplate.py b/tests/messaging/test_stomptemplate.py index 138768a7d..4a842a756 100644 --- a/tests/messaging/test_stomptemplate.py +++ b/tests/messaging/test_stomptemplate.py @@ -12,6 +12,7 @@ from blueapi.config import StompConfig from blueapi.messaging import MessageContext, MessagingTemplate, StompMessagingTemplate +from blueapi.service.handler import setup_handler, teardown_handler _TIMEOUT: float = 10.0 _COUNT = itertools.count() @@ -221,3 +222,14 @@ def server(ctx: MessageContext, message: str) -> None: template.send(reply_queue, "ack", correlation_id=ctx.correlation_id) template.subscribe(destination, server) + + +def test_autoconfigured_none_is_none(): + template = StompMessagingTemplate.autoconfigured(None) + assert template is None + + +def test_messaging_template_none_is_none(): + handler = setup_handler(None) + assert handler is None + teardown_handler() From ab39421ec1b72c7c6e1813cd48d5b1f3ab517252 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Wed, 24 Apr 2024 16:44:24 +0100 Subject: [PATCH 04/12] added tests for cli listen to events --- src/blueapi/cli/cli.py | 10 +++++++--- tests/example_yaml/valid_stomp_config.yaml | 6 ++++++ tests/test_cli.py | 19 +++++++++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) create mode 100644 tests/example_yaml/valid_stomp_config.yaml diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index c4518c875..878a2652f 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -133,6 +133,7 @@ def get_devices(obj: dict) -> None: @click.pass_obj def listen_to_events(obj: dict) -> None: """Listen to events output by blueapi""" + logger = logging.getLogger(__name__) config: ApplicationConfig = obj["config"] _message_template = StompMessagingTemplate.autoconfigured(config.stomp) if _message_template is not None: @@ -150,9 +151,12 @@ def on_event( "Subscribing to all bluesky events from " f"{config.stomp.host}:{config.stomp.port}" ) - with amq_client: - amq_client.subscribe_to_all_events(on_event) - input("Press enter to exit") + with amq_client: + amq_client.subscribe_to_all_events(on_event) + input("Press enter to exit") + else: + logger.error("Stomp configuration not found") + raise Exception("Stomp configuration not found ") @controller.command(name="run") diff --git a/tests/example_yaml/valid_stomp_config.yaml b/tests/example_yaml/valid_stomp_config.yaml new file mode 100644 index 000000000..08b32463a --- /dev/null +++ b/tests/example_yaml/valid_stomp_config.yaml @@ -0,0 +1,6 @@ +stomp: + auth: + username: admin + passcode: admin + host: localhost + port: 61613 \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py index eb42d0673..eef3f6c17 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -169,3 +169,22 @@ def test_config_passed_down_to_command_children( "params": {"time": 5}, } } + + +def test_invalid_stomp_config_for_listner(runner: CliRunner): + result = runner.invoke(main, ["controller", "listen"]) + assert result.exit_code == 1 + + +def test_valid_stomp_config_for_listner(runner: CliRunner): + with patch("uvicorn.run", side_effect=None): + result = runner.invoke( + main, + [ + "-c", + "tests/example_yaml/valid_stomp_config.yaml", + "controller", + "listen", + ], + ) + assert result.exit_code == 1 From 12596f011f33f136f7dea7929809fb8bb3fa768b Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 25 Apr 2024 13:08:13 +0100 Subject: [PATCH 05/12] updated tests for cli --- src/blueapi/cli/amq.py | 19 +++++++++++------- src/blueapi/cli/cli.py | 18 ++++++++--------- tests/example_yaml/rest_config.yaml | 6 ++++++ tests/messaging/test_stomptemplate.py | 9 +++++---- tests/test_cli.py | 28 ++++++++++++++++----------- 5 files changed, 48 insertions(+), 32 deletions(-) diff --git a/src/blueapi/cli/amq.py b/src/blueapi/cli/amq.py index face01b4b..6c05ff64d 100644 --- a/src/blueapi/cli/amq.py +++ b/src/blueapi/cli/amq.py @@ -23,16 +23,20 @@ class AmqClient: complete: threading.Event timed_out: bool | None - def __init__(self, app: MessagingTemplate) -> None: + def __init__(self, app: MessagingTemplate | None) -> None: + if app is None: + raise RuntimeError("Message bus needs to be configured") self.app = app self.complete = threading.Event() self.timed_out = None def __enter__(self) -> None: - self.app.connect() + if self.app is not None: + self.app.connect() def __exit__(self, exc_type, exc_value, exc_traceback) -> None: - self.app.disconnect() + if self.app is not None: + self.app.disconnect() def subscribe_to_topics( self, @@ -65,10 +69,11 @@ def subscribe_to_all_events( self, on_event: Callable[[MessageContext, _Event], None], ) -> None: - self.app.subscribe( - self.app.destinations.topic("public.worker.event"), - on_event, - ) + if self.app is not None: + self.app.subscribe( + self.app.destinations.topic("public.worker.event"), + on_event, + ) def wait_for_complete(self, timeout: float | None = None) -> None: self.timed_out = not self.complete.wait(timeout=timeout) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 878a2652f..bd9b2b4c2 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -133,11 +133,8 @@ def get_devices(obj: dict) -> None: @click.pass_obj def listen_to_events(obj: dict) -> None: """Listen to events output by blueapi""" - logger = logging.getLogger(__name__) config: ApplicationConfig = obj["config"] - _message_template = StompMessagingTemplate.autoconfigured(config.stomp) - if _message_template is not None: - amq_client = AmqClient(_message_template) + amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp)) def on_event( context: MessageContext, @@ -151,12 +148,9 @@ def on_event( "Subscribing to all bluesky events from " f"{config.stomp.host}:{config.stomp.port}" ) - with amq_client: - amq_client.subscribe_to_all_events(on_event) - input("Press enter to exit") - else: - logger.error("Stomp configuration not found") - raise Exception("Stomp configuration not found ") + with amq_client: + amq_client.subscribe_to_all_events(on_event) + input("Press enter to exit") @controller.command(name="run") @@ -182,6 +176,10 @@ def run_plan( _message_template = StompMessagingTemplate.autoconfigured(config.stomp) if _message_template is not None: amq_client = AmqClient(_message_template) + else: + raise RuntimeError( + "Message bus needs to be configured to get done message after run" + ) finished_event: deque[WorkerEvent] = deque() def store_finished_event(event: WorkerEvent) -> None: diff --git a/tests/example_yaml/rest_config.yaml b/tests/example_yaml/rest_config.yaml index 51a4714b1..82c484a3d 100644 --- a/tests/example_yaml/rest_config.yaml +++ b/tests/example_yaml/rest_config.yaml @@ -1,3 +1,9 @@ api: host: a.fake.host port: 12345 +stomp: + auth: + username: admin + passcode: admin + host: localhost + port: 61613 \ No newline at end of file diff --git a/tests/messaging/test_stomptemplate.py b/tests/messaging/test_stomptemplate.py index 4a842a756..c613e14ed 100644 --- a/tests/messaging/test_stomptemplate.py +++ b/tests/messaging/test_stomptemplate.py @@ -12,7 +12,7 @@ from blueapi.config import StompConfig from blueapi.messaging import MessageContext, MessagingTemplate, StompMessagingTemplate -from blueapi.service.handler import setup_handler, teardown_handler +from blueapi.service.handler import get_handler, setup_handler, teardown_handler _TIMEOUT: float = 10.0 _COUNT = itertools.count() @@ -229,7 +229,8 @@ def test_autoconfigured_none_is_none(): assert template is None -def test_messaging_template_none_is_none(): - handler = setup_handler(None) - assert handler is None +def test_messaging_template_can_be_set_with_none(): + setup_handler(None) teardown_handler() + with pytest.raises(ValueError): + get_handler() diff --git a/tests/test_cli.py b/tests/test_cli.py index eef3f6c17..a59d1a8b1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -174,17 +174,23 @@ def test_config_passed_down_to_command_children( def test_invalid_stomp_config_for_listner(runner: CliRunner): result = runner.invoke(main, ["controller", "listen"]) assert result.exit_code == 1 + assert type(result.exception) is RuntimeError def test_valid_stomp_config_for_listner(runner: CliRunner): - with patch("uvicorn.run", side_effect=None): - result = runner.invoke( - main, - [ - "-c", - "tests/example_yaml/valid_stomp_config.yaml", - "controller", - "listen", - ], - ) - assert result.exit_code == 1 + result = runner.invoke( + main, + [ + "-c", + "tests/example_yaml/valid_stomp_config.yaml", + "controller", + "listen", + ], + input="\n", + ) + assert result.exit_code == 0 + + +def test_invalid_condition_for_run(runner: CliRunner): + result = runner.invoke(main, ["controller", "run", "sleep", '{"time": 5}']) + assert type(result.exception) is RuntimeError From 9e63e2a4d2bc8608cdac5174d0dfc628aae2fb12 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 25 Apr 2024 13:24:29 +0100 Subject: [PATCH 06/12] added stomp mark to pytest --- tests/test_cli.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index a59d1a8b1..43a943fea 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -171,13 +171,12 @@ def test_config_passed_down_to_command_children( } -def test_invalid_stomp_config_for_listner(runner: CliRunner): +def test_invalid_stomp_config_for_listener(runner: CliRunner): result = runner.invoke(main, ["controller", "listen"]) - assert result.exit_code == 1 assert type(result.exception) is RuntimeError - -def test_valid_stomp_config_for_listner(runner: CliRunner): +@pytest.mark.stomp +def test_valid_stomp_config_for_listener(runner: CliRunner): result = runner.invoke( main, [ From 17bde02312fd16e7f440604dfef85304e10f55ec Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 25 Apr 2024 13:26:34 +0100 Subject: [PATCH 07/12] corrected lint --- tests/test_cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_cli.py b/tests/test_cli.py index 43a943fea..68f7990de 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -175,6 +175,7 @@ def test_invalid_stomp_config_for_listener(runner: CliRunner): result = runner.invoke(main, ["controller", "listen"]) assert type(result.exception) is RuntimeError + @pytest.mark.stomp def test_valid_stomp_config_for_listener(runner: CliRunner): result = runner.invoke( From 2eca5fbdbf71212bc425e05025829c249b121727 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 25 Apr 2024 15:33:06 +0100 Subject: [PATCH 08/12] code review changes --- src/blueapi/cli/cli.py | 2 +- tests/example_yaml/rest_config.yaml | 2 +- tests/example_yaml/valid_stomp_config.yaml | 2 +- tests/messaging/test_stomptemplate.py | 2 +- tests/test_cli.py | 18 +++++++++++++++++- 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index bd9b2b4c2..bc7455791 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -178,7 +178,7 @@ def run_plan( amq_client = AmqClient(_message_template) else: raise RuntimeError( - "Message bus needs to be configured to get done message after run" + "Cannot run plans without Stomp configuration to track progress" ) finished_event: deque[WorkerEvent] = deque() diff --git a/tests/example_yaml/rest_config.yaml b/tests/example_yaml/rest_config.yaml index 82c484a3d..c1843f3c9 100644 --- a/tests/example_yaml/rest_config.yaml +++ b/tests/example_yaml/rest_config.yaml @@ -6,4 +6,4 @@ stomp: username: admin passcode: admin host: localhost - port: 61613 \ No newline at end of file + port: 61613 diff --git a/tests/example_yaml/valid_stomp_config.yaml b/tests/example_yaml/valid_stomp_config.yaml index 08b32463a..1ba074172 100644 --- a/tests/example_yaml/valid_stomp_config.yaml +++ b/tests/example_yaml/valid_stomp_config.yaml @@ -3,4 +3,4 @@ stomp: username: admin passcode: admin host: localhost - port: 61613 \ No newline at end of file + port: 61613 diff --git a/tests/messaging/test_stomptemplate.py b/tests/messaging/test_stomptemplate.py index c613e14ed..a85318910 100644 --- a/tests/messaging/test_stomptemplate.py +++ b/tests/messaging/test_stomptemplate.py @@ -224,7 +224,7 @@ def server(ctx: MessageContext, message: str) -> None: template.subscribe(destination, server) -def test_autoconfigured_none_is_none(): +def test_no_default_configuration(): template = StompMessagingTemplate.autoconfigured(None) assert template is None diff --git a/tests/test_cli.py b/tests/test_cli.py index 68f7990de..aabf09bff 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -173,7 +173,23 @@ def test_config_passed_down_to_command_children( def test_invalid_stomp_config_for_listener(runner: CliRunner): result = runner.invoke(main, ["controller", "listen"]) - assert type(result.exception) is RuntimeError + assert ( + isinstance(result.exception, RuntimeError) + and str(result.exception) == "Message bus needs to be configured" + ) + + +def test_cannot_run_plans_without_stomp_config(runner: CliRunner): + with patch("uvicorn.run", side_effect=None): + result = runner.invoke(main, ["serve"]) + assert result.exit_code == 0 + + result = runner.invoke(main, ["controller", "run", "sleep", '{"time": 5}']) + assert ( + isinstance(result.exception, RuntimeError) + and str(result.exception) + == "Cannot run plans without Stomp configuration to track progress" + ) @pytest.mark.stomp From 729765a3480e966ef574475c7643cc23e7beed01 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 25 Apr 2024 15:58:36 +0100 Subject: [PATCH 09/12] removed auth and type check tidy up --- src/blueapi/cli/amq.py | 15 ++++++--------- tests/example_yaml/rest_config.yaml | 3 --- tests/example_yaml/valid_stomp_config.yaml | 3 --- 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/src/blueapi/cli/amq.py b/src/blueapi/cli/amq.py index 6c05ff64d..df163672f 100644 --- a/src/blueapi/cli/amq.py +++ b/src/blueapi/cli/amq.py @@ -31,12 +31,10 @@ def __init__(self, app: MessagingTemplate | None) -> None: self.timed_out = None def __enter__(self) -> None: - if self.app is not None: - self.app.connect() + self.app.connect() def __exit__(self, exc_type, exc_value, exc_traceback) -> None: - if self.app is not None: - self.app.disconnect() + self.app.disconnect() def subscribe_to_topics( self, @@ -69,11 +67,10 @@ def subscribe_to_all_events( self, on_event: Callable[[MessageContext, _Event], None], ) -> None: - if self.app is not None: - self.app.subscribe( - self.app.destinations.topic("public.worker.event"), - on_event, - ) + self.app.subscribe( + self.app.destinations.topic("public.worker.event"), + on_event, + ) def wait_for_complete(self, timeout: float | None = None) -> None: self.timed_out = not self.complete.wait(timeout=timeout) diff --git a/tests/example_yaml/rest_config.yaml b/tests/example_yaml/rest_config.yaml index c1843f3c9..b6ea43a5e 100644 --- a/tests/example_yaml/rest_config.yaml +++ b/tests/example_yaml/rest_config.yaml @@ -2,8 +2,5 @@ api: host: a.fake.host port: 12345 stomp: - auth: - username: admin - passcode: admin host: localhost port: 61613 diff --git a/tests/example_yaml/valid_stomp_config.yaml b/tests/example_yaml/valid_stomp_config.yaml index 1ba074172..b4004a7d3 100644 --- a/tests/example_yaml/valid_stomp_config.yaml +++ b/tests/example_yaml/valid_stomp_config.yaml @@ -1,6 +1,3 @@ stomp: - auth: - username: admin - passcode: admin host: localhost port: 61613 From 12f77b5cb998cdccbce4a74ffd825b50e2d6ec2c Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Fri, 26 Apr 2024 11:24:47 +0100 Subject: [PATCH 10/12] added code review changes --- src/blueapi/cli/amq.py | 4 +--- src/blueapi/cli/cli.py | 20 +++++++++++--------- src/blueapi/messaging/stomptemplate.py | 4 +--- src/blueapi/service/handler.py | 11 +++++++---- tests/messaging/test_stomptemplate.py | 5 ----- tests/test_cli.py | 4 ---- 6 files changed, 20 insertions(+), 28 deletions(-) diff --git a/src/blueapi/cli/amq.py b/src/blueapi/cli/amq.py index df163672f..face01b4b 100644 --- a/src/blueapi/cli/amq.py +++ b/src/blueapi/cli/amq.py @@ -23,9 +23,7 @@ class AmqClient: complete: threading.Event timed_out: bool | None - def __init__(self, app: MessagingTemplate | None) -> None: - if app is None: - raise RuntimeError("Message bus needs to be configured") + def __init__(self, app: MessagingTemplate) -> None: self.app = app self.complete = threading.Event() self.timed_out = None diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index bc7455791..d8be05ad9 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -134,7 +134,10 @@ def get_devices(obj: dict) -> None: def listen_to_events(obj: dict) -> None: """Listen to events output by blueapi""" config: ApplicationConfig = obj["config"] - amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp)) + if config.stomp is not None: + amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp)) + else: + raise RuntimeError("Message bus needs to be configured") def on_event( context: MessageContext, @@ -143,11 +146,10 @@ def on_event( converted = json.dumps(event.dict(), indent=2) print(converted) - if config.stomp is not None: - print( - "Subscribing to all bluesky events from " - f"{config.stomp.host}:{config.stomp.port}" - ) + print( + "Subscribing to all bluesky events from " + f"{config.stomp.host}:{config.stomp.port}" + ) with amq_client: amq_client.subscribe_to_all_events(on_event) input("Press enter to exit") @@ -173,13 +175,13 @@ def run_plan( client: BlueapiRestClient = obj["rest_client"] logger = logging.getLogger(__name__) - _message_template = StompMessagingTemplate.autoconfigured(config.stomp) - if _message_template is not None: - amq_client = AmqClient(_message_template) + if config.stomp is not None: + _message_template = StompMessagingTemplate.autoconfigured(config.stomp) else: raise RuntimeError( "Cannot run plans without Stomp configuration to track progress" ) + amq_client = AmqClient(_message_template) finished_event: deque[WorkerEvent] = deque() def store_finished_event(event: WorkerEvent) -> None: diff --git a/src/blueapi/messaging/stomptemplate.py b/src/blueapi/messaging/stomptemplate.py index 8704f9f3e..bf107c36b 100644 --- a/src/blueapi/messaging/stomptemplate.py +++ b/src/blueapi/messaging/stomptemplate.py @@ -101,9 +101,7 @@ def __init__( self._subscriptions = {} @classmethod - def autoconfigured(cls, config: StompConfig | None) -> MessagingTemplate | None: - if config is None: - return None + def autoconfigured(cls, config: StompConfig) -> MessagingTemplate: return cls( stomp.Connection( [(config.host, config.port)], diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index 6dd9643e4..d13f6d55e 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -46,10 +46,13 @@ def __init__( self._context, broadcast_statuses=self._config.env.events.broadcast_status_events, ) - self._messaging_template = ( - messaging_template - or StompMessagingTemplate.autoconfigured(self._config.stomp) - ) + if self._config.stomp is None: + self._messaging_template = messaging_template + else: + self._messaging_template = ( + messaging_template + or StompMessagingTemplate.autoconfigured(self._config.stomp) + ) def start(self) -> None: self._worker.start() diff --git a/tests/messaging/test_stomptemplate.py b/tests/messaging/test_stomptemplate.py index a85318910..66e0f8b03 100644 --- a/tests/messaging/test_stomptemplate.py +++ b/tests/messaging/test_stomptemplate.py @@ -224,11 +224,6 @@ def server(ctx: MessageContext, message: str) -> None: template.subscribe(destination, server) -def test_no_default_configuration(): - template = StompMessagingTemplate.autoconfigured(None) - assert template is None - - def test_messaging_template_can_be_set_with_none(): setup_handler(None) teardown_handler() diff --git a/tests/test_cli.py b/tests/test_cli.py index aabf09bff..67a25d544 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -180,10 +180,6 @@ def test_invalid_stomp_config_for_listener(runner: CliRunner): def test_cannot_run_plans_without_stomp_config(runner: CliRunner): - with patch("uvicorn.run", side_effect=None): - result = runner.invoke(main, ["serve"]) - assert result.exit_code == 0 - result = runner.invoke(main, ["controller", "run", "sleep", '{"time": 5}']) assert ( isinstance(result.exception, RuntimeError) From f4f656ec8c17d346d0071a35f41b9b08129bda4f Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Fri, 26 Apr 2024 11:44:44 +0100 Subject: [PATCH 11/12] added type check for forward message function --- src/blueapi/service/handler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index d13f6d55e..aa8bc391f 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -79,8 +79,9 @@ def _publish_event_streams( self._publish_event_stream(stream, destination) def _publish_event_stream(self, stream: EventStream, destination: str) -> None: - def forward_message(event, correlation_id): - self._messaging_template.send(destination, event, None, correlation_id) + def forward_message(event: Any, correlation_id: str | None) -> None: + if self._messaging_template is not None: + self._messaging_template.send(destination, event, None, correlation_id) stream.subscribe(forward_message) From 40abc55d45fac91b2e312a6dd70dddafb93d7017 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Fri, 26 Apr 2024 11:56:19 +0100 Subject: [PATCH 12/12] added missed import --- src/blueapi/service/handler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index aa8bc391f..0280c5d3d 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -1,5 +1,6 @@ import logging from collections.abc import Mapping +from typing import Any from blueapi.config import ApplicationConfig from blueapi.core import BlueskyContext