diff --git a/src/safeds_runner/server/main.py b/src/safeds_runner/server/main.py index 4266feb..888c4a7 100644 --- a/src/safeds_runner/server/main.py +++ b/src/safeds_runner/server/main.py @@ -2,6 +2,7 @@ import json import logging +import sys import flask.app import flask_sock @@ -100,6 +101,10 @@ def ws_main(ws: simple_websocket.Server, pipeline_manager: PipelineManager) -> N ws.close(message=error_short) return match received_object.type: + case "shutdown": + logging.debug("Requested shutdown...") + pipeline_manager.shutdown() + sys.exit(0) case "program": program_data, invalid_message = messages.validate_program_message_data(received_object.data) if program_data is None: diff --git a/src/safeds_runner/server/messages.py b/src/safeds_runner/server/messages.py index 68056e6..0d037de 100644 --- a/src/safeds_runner/server/messages.py +++ b/src/safeds_runner/server/messages.py @@ -13,6 +13,7 @@ message_type_placeholder_value = "placeholder_value" message_type_runtime_error = "runtime_error" message_type_runtime_progress = "runtime_progress" +message_type_shutdown = "shutdown" message_types = [ message_type_program, @@ -21,6 +22,7 @@ message_type_placeholder_value, message_type_runtime_error, message_type_runtime_progress, + message_type_shutdown, ] diff --git a/src/safeds_runner/server/pipeline_manager.py b/src/safeds_runner/server/pipeline_manager.py index 11743b5..7a52700 100644 --- a/src/safeds_runner/server/pipeline_manager.py +++ b/src/safeds_runner/server/pipeline_manager.py @@ -145,6 +145,14 @@ def get_placeholder(self, execution_id: str, placeholder_name: str) -> tuple[str value = self._placeholder_map[execution_id][placeholder_name] return _get_placeholder_type(value), value + def shutdown(self) -> None: + """ + Shut down the multiprocessing manager to end the used subprocess. + + This should only be called if this PipelineManager is not intended to be reused again. + """ + self._multiprocessing_manager.shutdown() + class PipelineProcess: """A process that executes a Safe-DS pipeline.""" diff --git a/tests/safeds_runner/server/test_websocket_mock.py b/tests/safeds_runner/server/test_websocket_mock.py index 5796c7f..315821a 100644 --- a/tests/safeds_runner/server/test_websocket_mock.py +++ b/tests/safeds_runner/server/test_websocket_mock.py @@ -1,4 +1,5 @@ import json +import multiprocessing import os import sys import threading @@ -15,6 +16,7 @@ message_type_runtime_error, message_type_runtime_progress, ) +from safeds_runner.server.pipeline_manager import PipelineManager class MockWebsocketConnection: @@ -366,3 +368,34 @@ def test_should_successfully_execute_simple_flow(messages: list[str], expected_r mock_connection.wait_for_messages(1) query_result_invalid = Message.from_dict(json.loads(mock_connection.get_next_received_message())) assert query_result_invalid == expected_response + + +@pytest.mark.skipif( + sys.platform.startswith("win") and os.getenv("COVERAGE_RCFILE") is not None, + reason=( + "skipping multiprocessing tests on windows if coverage is enabled, as pytest " + "causes Manager to hang, when using multiprocessing coverage" + ), +) +@pytest.mark.parametrize( + argnames="messages", + argvalues=[ + [ + json.dumps({"type": "shutdown", "id": "", "data": ""}), + ], + ], + ids=["shutdown_message"], +) +def test_should_shut_itself_down(messages: list[str]) -> None: + process = multiprocessing.Process(target=helper_should_shut_itself_down_run_in_subprocess, args=(messages,)) + process.start() + process.join(30) + assert process.exitcode == 0 + + +def helper_should_shut_itself_down_run_in_subprocess(sub_messages: list[str]) -> None: + mock_connection = MockWebsocketConnection(sub_messages) + ws_main(mock_connection, PipelineManager()) + + +helper_should_shut_itself_down_run_in_subprocess.__test__ = False # type: ignore[attr-defined]