diff --git a/src/cloudai/_core/base_job.py b/src/cloudai/_core/base_job.py index 53f4a1ab..344170be 100644 --- a/src/cloudai/_core/base_job.py +++ b/src/cloudai/_core/base_job.py @@ -15,7 +15,9 @@ # limitations under the License. from pathlib import Path +from typing import Union +from .system import System from .test import Test @@ -24,32 +26,55 @@ class BaseJob: Base class for representing a job created by executing a test. Attributes - id (int): The unique identifier of the job. + id (Union[str, int]): The unique identifier of the job. + mode (str): The mode of the job (e.g., 'run', 'dry-run'). + system (System): The system in which the job is running. test (Test): The test instance associated with this job. output_path (Path): The path where the job's output is stored. terminated_by_dependency (bool): Flag to indicate if the job was terminated due to a dependency. """ - def __init__(self, job_id: int, test: Test, output_path: Path): + def __init__(self, mode: str, system: System, test: Test, output_path: Path): """ Initialize a BaseJob instance. Args: - job_id (int): The unique identifier of the job. - output_path (Path): The path where the job's output is stored. + mode (str): The mode of the job (e.g., 'run', 'dry-run'). + system (System): The system in which the job is running. test (Test): The test instance associated with the job. + output_path (Path): The path where the job's output is stored. """ - self.id = job_id - self.test = test - self.output_path = output_path - self.terminated_by_dependency = False + self.id: Union[str, int] = 0 + self.mode: str = mode + self.system: System = system + self.test: Test = test + self.output_path: Path = output_path + self.terminated_by_dependency: bool = False - def increment_iteration(self): + def is_running(self) -> bool: + """ + Check if the specified job is currently running. + + Returns + bool: True if the job is running, False otherwise. """ - Increment the iteration count of the associated test. + if self.mode == "dry-run": + return True + return self.system.is_job_running(self) - This method should be called when the job completes an iteration and is ready to proceed to the next one. + def is_completed(self) -> bool: """ + Check if a job is completed. + + Returns + bool: True if the job is completed, False otherwise. + """ + if self.mode == "dry-run": + return True + return self.system.is_job_completed(self) + + def increment_iteration(self): + """Increment the iteration count of the associated test.""" self.test.current_iteration += 1 def __repr__(self) -> str: @@ -59,4 +84,4 @@ def __repr__(self) -> str: Returns str: String representation of the job. """ - return f"BaseJob(id={self.id}, test={self.test.name}, terminated_by_dependency={self.terminated_by_dependency})" + return f"BaseJob(id={self.id}, mode={self.mode}, system={self.system.name}, test={self.test.name})" diff --git a/src/cloudai/_core/base_runner.py b/src/cloudai/_core/base_runner.py index d1f8e93c..a02eef76 100644 --- a/src/cloudai/_core/base_runner.py +++ b/src/cloudai/_core/base_runner.py @@ -37,8 +37,8 @@ class BaseRunner(ABC): """ Abstract base class for a Runner that manages test execution. - This class provides a framework for executing tests within a given test - scenario, handling dependencies and execution order. + This class provides a framework for executing tests within a given test scenario, handling dependencies and + execution order. Attributes mode (str): The operation mode ('dry-run', 'run'). @@ -136,7 +136,7 @@ async def shutdown(self): return logging.info("Terminating all jobs...") for job in self.jobs: - self.kill_job(job) + self.system.kill(job) logging.info("All jobs have been killed.") sys.exit(0) @@ -210,7 +210,7 @@ async def check_start_post_init_dependencies(self): items = list(self.test_to_job_map.items()) for test, job in items: - if self.is_job_running(job): + if job.is_running(): await self.check_and_schedule_start_post_init_dependent_tests(test) async def check_and_schedule_start_post_init_dependent_tests(self, started_test: Test): @@ -286,7 +286,7 @@ async def monitor_jobs(self) -> int: successful_jobs_count = 0 for job in list(self.jobs): - if self.is_job_completed(job): + if job.is_completed(): if self.mode == "dry-run": successful_jobs_count += 1 await self.handle_job_completion(job) @@ -347,32 +347,6 @@ async def handle_job_completion(self, completed_job: BaseJob): else: await self.handle_dependencies(completed_job) - @abstractmethod - def is_job_running(self, job: BaseJob) -> bool: - """ - Check if a job is currently running. - - Args: - job (BaseJob): The job to check. - - Returns: - bool: True if the job is running, False otherwise. - """ - pass - - @abstractmethod - def is_job_completed(self, job: BaseJob) -> bool: - """ - Determine if a job is completed. - - Args: - job (BaseJob): The job to be checked. - - Returns: - bool: True if the job is completed, False otherwise. - """ - pass - async def handle_dependencies(self, completed_job: BaseJob) -> List[Task]: """ Handle the start_post_comp and end_post_comp dependencies for a completed job. @@ -403,16 +377,6 @@ async def handle_dependencies(self, completed_job: BaseJob) -> List[Task]: return tasks - @abstractmethod - def kill_job(self, job: BaseJob): - """ - Kill a specific job. - - Args: - job (BaseJob): The job to be killed. - """ - pass - async def delayed_kill_job(self, job: BaseJob, delay: int = 0): """ Schedule termination of a Standalone job after a specified delay. @@ -424,4 +388,4 @@ async def delayed_kill_job(self, job: BaseJob, delay: int = 0): logging.info(f"Scheduling termination of job {job.id} after {delay} seconds.") await asyncio.sleep(delay) job.terminated_by_dependency = True - self.kill_job(job) + self.system.kill(job) diff --git a/src/cloudai/_core/system.py b/src/cloudai/_core/system.py index 73484021..c12883e6 100644 --- a/src/cloudai/_core/system.py +++ b/src/cloudai/_core/system.py @@ -15,8 +15,13 @@ # limitations under the License. +import logging from abc import ABC, abstractmethod from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .base_job import BaseJob class System(ABC): @@ -59,4 +64,75 @@ def __repr__(self) -> str: @abstractmethod def update(self) -> None: - raise NotImplementedError("Subclasses must implement this method.") + """ + Update the system's state. + + Raises + NotImplementedError: Raised if the method is not implemented in a subclass. + """ + error_message = ( + "System update method is not implemented. All subclasses of the System class must implement the " + "'update' method to ensure the system's state can be refreshed as needed." + ) + logging.error(error_message) + raise NotImplementedError(error_message) + + @abstractmethod + def is_job_running(self, job: "BaseJob") -> bool: + """ + Check if a given job is currently running. + + Args: + job (BaseJob): The job to check. + + Returns: + bool: True if the job is running, False otherwise. + + Raises: + NotImplementedError: Raised if the method is not implemented in a subclass. + """ + error_message = ( + "Job running status check method is not implemented. All subclasses of the System class must implement the" + " 'is_job_running' method to determine whether a job is currently active." + ) + logging.error(error_message) + raise NotImplementedError(error_message) + + @abstractmethod + def is_job_completed(self, job: "BaseJob") -> bool: + """ + Check if a given job is completed. + + Args: + job (BaseJob): The job to check. + + Returns: + bool: True if the job is completed, False otherwise. + + Raises: + NotImplementedError: Raised if the method is not implemented in a subclass. + """ + error_message = ( + "Job completion status check method is not implemented. All subclasses of the System class must implement " + "the 'is_job_completed' method to determine whether a job has finished execution." + ) + logging.error(error_message) + raise NotImplementedError(error_message) + + @abstractmethod + def kill(self, job: "BaseJob") -> None: + """ + Terminate a given job. + + Args: + job (BaseJob): The job to be terminated. + + Raises: + NotImplementedError: Raised if the method is not implemented in a subclass. + """ + error_message = ( + "Job termination method is not implemented. All subclasses of the System class must implement the 'kill' " + "method to terminate a job that is currently running." + ) + logging.error(error_message) + raise NotImplementedError(error_message) diff --git a/src/cloudai/_core/test_scenario_parser.py b/src/cloudai/_core/test_scenario_parser.py index ba00119a..9d20e3a3 100644 --- a/src/cloudai/_core/test_scenario_parser.py +++ b/src/cloudai/_core/test_scenario_parser.py @@ -129,11 +129,30 @@ def _create_section_test(self, section: str, test_info: Dict[str, Any]) -> Test: f"reference from the test scenario schema." ) - test = copy.deepcopy(self.test_mapping[test_name]) - test.test_template = self.test_mapping[test_name].test_template + original_test = self.test_mapping[test_name] + + test = Test( + name=original_test.name, + description=original_test.description, + test_template=original_test.test_template, + env_vars=copy.deepcopy(original_test.env_vars), + cmd_args=copy.deepcopy(original_test.cmd_args), + extra_env_vars=copy.deepcopy(original_test.extra_env_vars), + extra_cmd_args=original_test.extra_cmd_args, + dependencies=copy.deepcopy(original_test.dependencies), + iterations=original_test.iterations, + num_nodes=original_test.num_nodes, + nodes=original_test.nodes, + sol=original_test.sol, + weight=original_test.weight, + ideal_perf=original_test.ideal_perf, + time_limit=original_test.time_limit, + ) + test.section_name = section test.num_nodes = int(test_info.get("num_nodes", 1)) test.nodes = test_info.get("nodes", []) + return test def _parse_dependencies_for_test( diff --git a/src/cloudai/runner/slurm/slurm_job.py b/src/cloudai/runner/slurm/slurm_job.py index 171ee1af..8c6eeee6 100644 --- a/src/cloudai/runner/slurm/slurm_job.py +++ b/src/cloudai/runner/slurm/slurm_job.py @@ -14,10 +14,29 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cloudai import BaseJob +from pathlib import Path +from typing import Union + +from cloudai import BaseJob, System, Test class SlurmJob(BaseJob): - """Represents a job in a Slurm environment.""" + """ + A job class for execution on a Slurm system. + + Attributes + id (Union[str, int]): The unique identifier of the job. + """ + + def __init__(self, mode: str, system: System, test: Test, job_id: Union[str, int], output_path: Path): + BaseJob.__init__(self, mode, system, test, output_path) + self.id = job_id + + def __repr__(self) -> str: + """ + Return a string representation of the SlurmJob instance. - pass + Returns + str: String representation of the job. + """ + return f"SlurmJob(id={self.id}, test={self.test.name})" diff --git a/src/cloudai/runner/slurm/slurm_runner.py b/src/cloudai/runner/slurm/slurm_runner.py index 0f4e74f7..4ee2822c 100644 --- a/src/cloudai/runner/slurm/slurm_runner.py +++ b/src/cloudai/runner/slurm/slurm_runner.py @@ -15,10 +15,9 @@ # limitations under the License. import logging -from typing import cast +from pathlib import Path -from cloudai import BaseJob, BaseRunner, JobIdRetrievalError, System, Test, TestScenario -from cloudai.systems import SlurmSystem +from cloudai import BaseRunner, JobIdRetrievalError, System, Test, TestScenario from cloudai.util import CommandShell from .slurm_job import SlurmJob @@ -28,14 +27,8 @@ class SlurmRunner(BaseRunner): """ Implementation of the Runner for a system using Slurm. - This class is responsible for executing and managing tests in a Slurm environment. It extends the BaseRunner class, - implementing the abstract methods to work with Slurm jobs. - Attributes - slurm_system (SlurmSystem): This attribute is a casted version of the `system` attribute to `SlurmSystem` type, - ensuring that Slurm-specific properties and methods are accessible. cmd_shell (CommandShell): An instance of CommandShell for executing system commands. - Inherits all other attributes from the BaseRunner class. """ def __init__(self, mode: str, system: System, test_scenario: TestScenario) -> None: @@ -44,11 +37,10 @@ def __init__(self, mode: str, system: System, test_scenario: TestScenario) -> No Args: mode (str): The operation mode ('dry-run', 'run'). - system (System): The system configuration. + system (System): The system object. test_scenario (TestScenario): The test scenario to run. """ super().__init__(mode, system, test_scenario) - self.slurm_system: SlurmSystem = cast(SlurmSystem, system) self.cmd_shell = CommandShell() def _submit_test(self, test: Test) -> SlurmJob: @@ -77,43 +69,4 @@ def _submit_test(self, test: Test) -> SlurmJob: stderr=stderr, message="Failed to retrieve job ID from command output.", ) - return SlurmJob(job_id, test, job_output_path) - - def is_job_running(self, job: BaseJob) -> bool: - """ - Check if the specified job is currently running. - - Args: - job (BaseJob): The job to check. - - Returns: - bool: True if the job is running, False otherwise. - """ - if self.mode == "dry-run": - return True - return self.slurm_system.is_job_running(job.id) - - def is_job_completed(self, job: BaseJob) -> bool: - """ - Check if a Slurm job is completed. - - Args: - job (BaseJob): The job to check. - - Returns: - bool: True if the job is completed, False otherwise. - """ - if self.mode == "dry-run": - return True - s_job = cast(SlurmJob, job) - return self.slurm_system.is_job_completed(s_job.id) - - def kill_job(self, job: BaseJob) -> None: - """ - Terminate a Slurm job. - - Args: - job (BaseJob): The job to be terminated. - """ - s_job = cast(SlurmJob, job) - self.slurm_system.scancel(s_job.id) + return SlurmJob(self.mode, self.system, test, job_id, Path(job_output_path)) diff --git a/src/cloudai/runner/standalone/standalone_job.py b/src/cloudai/runner/standalone/standalone_job.py index f3f54bda..ef020017 100644 --- a/src/cloudai/runner/standalone/standalone_job.py +++ b/src/cloudai/runner/standalone/standalone_job.py @@ -14,10 +14,29 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cloudai import BaseJob +from pathlib import Path +from typing import Union + +from cloudai import BaseJob, System, Test class StandaloneJob(BaseJob): - """Represents a job in a Standalone environment.""" + """ + A job class for standalone execution. + + Attributes + id (Union[str, int]): The unique identifier of the job. + """ + + def __init__(self, mode: str, system: System, test: Test, job_id: Union[str, int], output_path: Path): + BaseJob.__init__(self, mode, system, test, output_path) + self.id = job_id + + def __repr__(self) -> str: + """ + Return a string representation of the StandaloneJob instance. - pass + Returns + str: String representation of the job. + """ + return f"StandaloneJob(id={self.id}, test={self.test.name})" diff --git a/src/cloudai/runner/standalone/standalone_runner.py b/src/cloudai/runner/standalone/standalone_runner.py index 943ff42c..5c4ca57a 100644 --- a/src/cloudai/runner/standalone/standalone_runner.py +++ b/src/cloudai/runner/standalone/standalone_runner.py @@ -15,9 +15,9 @@ # limitations under the License. import logging -from typing import cast +from pathlib import Path -from cloudai import BaseJob, BaseRunner, JobIdRetrievalError, System, Test, TestScenario +from cloudai import BaseRunner, JobIdRetrievalError, System, Test, TestScenario from cloudai.util import CommandShell from .standalone_job import StandaloneJob @@ -27,26 +27,17 @@ class StandaloneRunner(BaseRunner): """ Implementation of the Runner for a system using Standalone. - This class is responsible for executing and managing tests in a standalone environment. It extends the BaseRunner - class, implementing the abstract methods to work with standalone jobs. - Attributes cmd_shell (CommandShell): An instance of CommandShell for executing system commands. - Inherits all other attributes from the BaseRunner class. """ - def __init__( - self, - mode: str, - system: System, - test_scenario: TestScenario, - ): + def __init__(self, mode: str, system: System, test_scenario: TestScenario) -> None: """ - Initialize the StandaloneRunner with a system object, test scenario, and monitor interval. + Initialize the StandaloneRunner. Args: mode (str): The operation mode ('run', 'dry-run'). - system (System): The system configuration. + system (System): The system object. test_scenario (TestScenario): The test scenario to run. """ super().__init__(mode, system, test_scenario) @@ -78,47 +69,4 @@ def _submit_test(self, test: Test) -> StandaloneJob: stderr="", message="Failed to retrieve job ID from command output.", ) - return StandaloneJob(job_id, test, job_output_path) - - def is_job_running(self, job: BaseJob) -> bool: - """ - Check if the specified job is currently running. - - Args: - job (BaseJob): The job to check. - - Returns: - bool: True if the job is running, False otherwise. - """ - return True - - def is_job_completed(self, job: BaseJob) -> bool: - """ - Check if a standalone job is completed. - - Args: - job (StandaloneJob): The job to check. - - Returns: - bool: True if the job is completed, False otherwise. - """ - if self.mode == "dry-run": - return True - - s_job = cast(StandaloneJob, job) - command = f"ps -p {s_job.id}" - logging.debug(f"Checking job status with command: {command}") - stdout = self.cmd_shell.execute(command).communicate()[0] - return str(s_job.id) not in stdout - - def kill_job(self, job: BaseJob): - """ - Terminate a standalone job. - - Args: - job (StandaloneJob): The job to be terminated. - """ - s_job = cast(StandaloneJob, job) - cmd = f"kill -9 {s_job.id}" - logging.info(f"Executing termination command for job {s_job.id}: {cmd}") - self.cmd_shell.execute(cmd) + return StandaloneJob(self.mode, self.system, test, job_id, Path(job_output_path)) diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index a5fd202b..3e91996f 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -20,7 +20,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -from cloudai import System +from cloudai import BaseJob, System from cloudai.util import CommandShell from .slurm_node import SlurmNode, SlurmNodeState @@ -28,7 +28,7 @@ class SlurmSystem(System): """ - Represents a Slurm system, encapsulating the system's configuration. + Represents a Slurm system. Attributes name (str): The name of the Slurm system. @@ -135,6 +135,108 @@ def update(self) -> None: """ self.update_node_states() + def is_job_running(self, job: BaseJob, retry_threshold: int = 3) -> bool: + """ + Determine if a specified Slurm job is currently running by checking its presence and state in the job queue. + + This method queries the Slurm job queue using 'squeue' to identify if the job with the specified ID is running. + It handles transient network or system errors by retrying the query a limited number of times. + + Args: + job (BaseJob): The job to check. + retry_threshold (int): Maximum number of retry attempts for the query in case of transient errors. + + Returns: + bool: True if the job is currently running, False otherwise. + + Raises: + RuntimeError: If an error occurs that prevents determination of the job's running status, or if the status + cannot be determined after the specified number of retries. + """ + retry_count = 0 + command = f"squeue -j {job.id} --noheader --format=%T" + + while retry_count < retry_threshold: + logging.debug(f"Executing command to check job status: {command}") + stdout, stderr = self.cmd_shell.execute(command).communicate() + + if "Socket timed out" in stderr or "slurm_load_jobs error" in stderr: + retry_count += 1 + logging.warning( + f"An error occurred while querying the job status. Retrying... ({retry_count}/{retry_threshold}). " + "CloudAI uses Slurm commands by default to check the job status. The Slurm daemon can become " + "overloaded and unresponsive, causing this error message. CloudAI retries the command multiple " + f"times, with a maximum of {retry_threshold} attempts. Please ensure that the Slurm daemon is " + "running and responsive." + ) + continue + + if stderr: + error_message = f"Error checking job status: {stderr}" + logging.error(error_message) + raise RuntimeError(error_message) + + job_state = stdout.strip() + if job_state == "RUNNING": + return True + + break + + if retry_count == retry_threshold: + error_message = f"Failed to confirm job running status after {retry_threshold} attempts." + logging.error(error_message) + raise RuntimeError(error_message) + + return False + + def is_job_completed(self, job: BaseJob, retry_threshold: int = 3) -> bool: + """ + Check if a Slurm job is completed by querying its status. + + Retries the query a specified number of times if certain errors are encountered. + + Args: + job (BaseJob): The job to check. + retry_threshold (int): Maximum number of retries for transient errors. + + Returns: + bool: True if the job is completed, False otherwise. + + Raises: + RuntimeError: If unable to determine job status after retries, or if a non-retryable error is encountered. + """ + retry_count = 0 + while retry_count < retry_threshold: + command = f"squeue -j {job.id}" + logging.debug(f"Checking job status with command: {command}") + stdout, stderr = self.cmd_shell.execute(command).communicate() + + if "Socket timed out" in stderr: + retry_count += 1 + logging.warning(f"Retrying job status check (attempt {retry_count}/{retry_threshold})") + continue + + if stderr: + error_message = f"Error checking job status: {stderr}" + logging.error(error_message) + raise RuntimeError(error_message) + + return str(job.id) not in stdout + + error_message = f"Failed to confirm job completion status after {retry_threshold} attempts." + logging.error(error_message) + raise RuntimeError(error_message) + + def kill(self, job: BaseJob) -> None: + """ + Terminate a Slurm job. + + Args: + job (BaseJob): The job to be terminated. + """ + assert isinstance(job.id, int) + self.scancel(job.id) + @classmethod def parse_node_list(cls, node_list: str) -> List[str]: """ @@ -421,96 +523,6 @@ def is_node_in_system(self, node_name: str) -> bool: """ return any(any(node.name == node_name for node in nodes) for nodes in self.partitions.values()) - def is_job_running(self, job_id: int, retry_threshold: int = 3) -> bool: - """ - Determine if a specified Slurm job is currently running by checking its presence and state in the job queue. - - This method queries the Slurm job queue using 'squeue' to identify if the - job with the specified ID is running. It handles transient network or - system errors by retrying the query a limited number of times. - - Args: - job_id (int): The ID of the job to check. - retry_threshold (int): The maximum number of retry attempts for the - query in case of transient errors. - - Returns: - bool: True if the job is currently running (i.e., listed in the job - queue with a running state), False otherwise. - - Raises: - RuntimeError: If an error occurs that prevents determination of the - job's running status, or if the status cannot be - determined after the specified number of retries. - """ - retry_count = 0 - command = f"squeue -j {job_id} --noheader --format=%T" - - while retry_count < retry_threshold: - logging.debug(f"Executing command to check job status: {command}") - stdout, stderr = self.cmd_shell.execute(command).communicate() - - if "Socket timed out" in stderr or "slurm_load_jobs error" in stderr: - retry_count += 1 - logging.warning( - f"An error occurred while querying the job status. Retrying... ({retry_count}/{retry_threshold}). " - "CloudAI uses Slurm commands by default to check the job status. The Slurm daemon can become " - "overloaded and unresponsive, causing this error message. CloudAI retries the command multiple " - f"times, with a maximum of {retry_threshold} attempts. There is no action required from the user " - "for this warning. Please ensure that the Slurm daemon is running and responsive." - ) - continue - - if stderr: - raise RuntimeError(f"Error checking job status: {stderr}") - - job_state = stdout.strip() - # If the job is listed with a "RUNNING" state, it's considered active - if job_state == "RUNNING": - return True - - # No need for further retries if we got a clear answer - break - - if retry_count == retry_threshold: - raise RuntimeError("Failed to confirm job running status after " f"{retry_threshold} attempts.") - - # Job is not active if not "RUNNING" or not found - return False - - def is_job_completed(self, job_id: int, retry_threshold: int = 3) -> bool: - """ - Check if a Slurm job is completed by querying its status. - - Retries the query a specified number of times if certain errors are encountered. - - Args: - job_id (int): The ID of the job to check. - retry_threshold (int): Maximum number of retries for transient errors. - - Returns: - bool: True if the job is completed, False otherwise. - - Raises: - RuntimeError: If unable to determine job status after retries, or if a non-retryable error is encountered. - """ - retry_count = 0 - while retry_count < retry_threshold: - command = f"squeue -j {job_id}" - logging.debug(f"Checking job status with command: {command}") - stdout, stderr = self.cmd_shell.execute(command).communicate() - if "Socket timed out" in stderr: - retry_count += 1 - logging.warning(f"Retrying job status check (attempt {retry_count}/" f"{retry_threshold})") - continue - - if stderr: - raise RuntimeError(f"Error checking job status: {stderr}") - - return str(job_id) not in stdout - - raise RuntimeError("Failed to confirm job completion status after " f"{retry_threshold} attempts.") - def scancel(self, job_id: int) -> None: """ Terminates a specified Slurm job by sending a cancellation command. diff --git a/src/cloudai/systems/standalone_system.py b/src/cloudai/systems/standalone_system.py index 609cf9b0..1e0e84f8 100644 --- a/src/cloudai/systems/standalone_system.py +++ b/src/cloudai/systems/standalone_system.py @@ -14,21 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging from pathlib import Path -from cloudai import System +from cloudai import BaseJob, System +from cloudai.util import CommandShell class StandaloneSystem(System): """ - Class representing a Standalone system. - - This class is used for systems that execute commands directly without - a job scheduler. + Represents a standalone system without a job scheduler. Attributes - name (str): Name of the standalone system. - output_path (str): Path to the output directory. + cmd_shell (CommandShell): An instance of CommandShell for executing system commands. """ def __init__(self, name: str, output_path: Path) -> None: @@ -40,15 +38,64 @@ def __init__(self, name: str, output_path: Path) -> None: output_path (Path): Path to the output directory. """ super().__init__(name, "standalone", output_path) + self.cmd_shell = CommandShell() def __repr__(self) -> str: """ Provide a string representation of the StandaloneSystem instance. Returns - str: String representation of the standalone system. + str: String representation of the standalone system including its name and scheduler type. """ - return f"StandaloneSystem(name={self.name}, " f"scheduler={self.scheduler})" + return f"StandaloneSystem(name={self.name}, scheduler={self.scheduler})" def update(self) -> None: + """ + Update the standalone system's state. + + This method is not typically used in standalone systems but is required for interface consistency. + """ pass + + def is_job_running(self, job: BaseJob) -> bool: + """ + Check if a given standalone job is currently running. + + Args: + job (BaseJob): The job to check. + + Returns: + bool: True if the job is running, False otherwise. + """ + command = f"ps -p {job.id}" + logging.debug(f"Checking job status with command: {command}") + stdout = self.cmd_shell.execute(command).communicate()[0] + + # Check if the job's PID is in the ps output + is_running = str(job.id) in stdout + logging.debug(f"Job {job.id} running status: {is_running}") + + return is_running + + def is_job_completed(self, job: BaseJob) -> bool: + """ + Check if a given standalone job is completed. + + Args: + job (BaseJob): The job to check. + + Returns: + bool: True if the job is completed, False otherwise. + """ + return not self.is_job_running(job) + + def kill(self, job: BaseJob) -> None: + """ + Terminate a standalone job. + + Args: + job (BaseJob): The job to be terminated. + """ + cmd = f"kill -9 {job.id}" + logging.debug(f"Executing termination command for job {job.id}: {cmd}") + self.cmd_shell.execute(cmd) diff --git a/tests/test_base_runner.py b/tests/test_base_runner.py deleted file mode 100644 index 1d8ef2fc..00000000 --- a/tests/test_base_runner.py +++ /dev/null @@ -1,86 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime -from unittest.mock import MagicMock, patch - -import pytest -from cloudai import BaseJob, BaseRunner, System, TestScenario - - -class MockRunner(BaseRunner): - def _submit_test(self, test): - job_id = 1 - output_path = self.get_job_output_path(test) - return BaseJob(job_id, test, output_path) - - def is_job_running(self, job): - return False - - def is_job_completed(self, job): - return True - - def kill_job(self, job): - pass - - -@pytest.fixture -def mock_datetime_now(): - with patch("cloudai._core.base_runner.datetime") as mock_datetime: - mock_datetime.now.return_value = datetime(2024, 1, 1, 12, 0, 0) - mock_datetime.strftime = datetime.strftime - yield mock_datetime - - -def test_setup_output_directory(mock_datetime_now, tmp_path): - scenario_name = "test_scenario" - base_output_path = tmp_path / "base_output_path" - expected_time_str = "2024-01-01_12-00-00" - expected_path = base_output_path / f"{scenario_name}_{expected_time_str}" - - # Mock TestScenario and System - mock_test_scenario = MagicMock(spec=TestScenario) - mock_test_scenario.name = scenario_name - mock_system = MagicMock(spec=System) - mock_system.output_path = base_output_path - mock_system.monitor_interval = 5 - - runner = MockRunner("run", mock_system, mock_test_scenario) - - assert base_output_path.exists() - assert expected_path.exists() - assert runner.output_path == expected_path - - -def test_setup_output_directory_existing_base_path(mock_datetime_now, tmp_path): - scenario_name = "test_scenario" - base_output_path = tmp_path / "base_output_path" - expected_time_str = "2024-01-01_12-00-00" - expected_path = base_output_path / f"{scenario_name}_{expected_time_str}" - - base_output_path.mkdir() - - # Mock TestScenario and System - mock_test_scenario = MagicMock(spec=TestScenario) - mock_test_scenario.name = scenario_name - mock_system = MagicMock(spec=System) - mock_system.output_path = base_output_path - mock_system.monitor_interval = 5 - - runner = MockRunner("run", mock_system, mock_test_scenario) - - assert expected_path.exists() - assert runner.output_path == expected_path diff --git a/tests/test_standalone_system.py b/tests/test_standalone_system.py new file mode 100644 index 00000000..8524407f --- /dev/null +++ b/tests/test_standalone_system.py @@ -0,0 +1,116 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from cloudai.runner.standalone.standalone_job import StandaloneJob +from cloudai.systems.standalone_system import StandaloneSystem + + +@pytest.fixture +def standalone_system(): + """ + Fixture to create a StandaloneSystem instance for testing. + + Returns: + StandaloneSystem: A new instance of StandaloneSystem for testing. + """ + return StandaloneSystem("StandaloneTestSystem", Path("/fake/output/path")) + + +@pytest.fixture +def mock_test(): + """ + Fixture to create a mock Test instance for testing. + + Returns: + MagicMock: A mocked Test instance. + """ + return MagicMock(name="MockTest") + + +@pytest.fixture +def standalone_job(standalone_system, mock_test): + """ + Fixture to create a StandaloneJob instance for testing. + + Args: + standalone_system (StandaloneSystem): The system where the job will be executed. + mock_test (Test): The mock test instance associated with the job. + + Returns: + StandaloneJob: A new instance of StandaloneJob for testing. + """ + return StandaloneJob("run", standalone_system, mock_test, 12345, Path("/fake/output/path")) + + +@pytest.mark.parametrize( + "ps_output, expected_result", + [ + ("12345\n", True), # Job is running, PID is in ps output + ("", False), # Job is not running, ps output is empty + ], +) +@patch("cloudai.util.CommandShell.execute") +def test_is_job_running(mock_execute, standalone_system, standalone_job, ps_output, expected_result): + """ + Test if a job is running using a mocked CommandShell. + + Args: + mock_execute (MagicMock): Mocked CommandShell execute method. + standalone_system (StandaloneSystem): Instance of the system under test. + standalone_job (StandaloneJob): Job instance to check. + ps_output (str): Mocked output of the ps command. + expected_result (bool): Expected result for the job running status. + """ + mock_process = MagicMock() + mock_process.communicate.return_value = (ps_output, "") + mock_execute.return_value = mock_process + + assert standalone_system.is_job_running(standalone_job) == expected_result + + +@patch("cloudai.util.CommandShell.execute") +def test_kill_job(mock_execute, standalone_system, standalone_job): + """ + Test if a job can be killed using a mocked CommandShell. + + Args: + mock_execute (MagicMock): Mocked CommandShell execute method. + standalone_system (StandaloneSystem): Instance of the system under test. + standalone_job (StandaloneJob): Job instance to kill. + """ + mock_process = MagicMock() + mock_execute.return_value = mock_process + + standalone_system.kill(standalone_job) + kill_command = f"kill -9 {standalone_job.id}" + + mock_execute.assert_called_once_with(kill_command) + + +def test_repr(standalone_system): + """ + Test the string representation of StandaloneSystem. + + Args: + standalone_system (StandaloneSystem): Instance of the system under test. + """ + expected_repr = f"StandaloneSystem(name={standalone_system.name}, scheduler=standalone)" + assert repr(standalone_system) == expected_repr