From 11c5592bd4fe16e817cfa75cd513d77d1c965de1 Mon Sep 17 00:00:00 2001 From: Andrey Maslennikov Date: Mon, 16 Sep 2024 10:15:44 +0200 Subject: [PATCH] Use Pydantic to verify System schemas (#158) * Move Parser to a higher level * Dirty implementation of Pydantic models for Systems * Remove System Parsers * Simplify code a bit * Enable 'groups' parsing * Handle group name the same way as partition name * Make ruff happy * Add missing module * Fixes * Add pydantic to requirements * Fix tests * Fix tests * Update test, still fails * Make it work * Add missing file * Test all systems * Add mode for verifying system TOMLs * Fixes * Make ruff happy * Extend testing * Fixes * Update README * Address review comments --- README.md | 10 + conf/common/system/example_slurm_cluster.toml | 48 ++-- pyproject.toml | 1 + requirements.txt | 1 + src/cloudai/__init__.py | 19 +- src/cloudai/__main__.py | 29 ++ src/cloudai/_core/registry.py | 63 +++-- src/cloudai/_core/system_parser.py | 74 ----- src/cloudai/{_core => }/parser.py | 68 +++-- src/cloudai/parser/system_parser/__init__.py | 15 - .../system_parser/kubernetes_system_parser.py | 86 ------ .../system_parser/slurm_system_parser.py | 162 ----------- .../system_parser/standalone_system_parser.py | 50 ---- .../systems/kubernetes/kubernetes_system.py | 70 ++--- src/cloudai/systems/slurm/__init__.py | 4 +- src/cloudai/systems/slurm/slurm_node.py | 20 +- src/cloudai/systems/slurm/slurm_system.py | 256 +++++++++--------- src/cloudai/systems/standalone_system.py | 36 +-- .../parser/__init__.py => tests/conftest.py | 24 +- tests/test_acceptance.py | 18 -- tests/test_base_installer.py | 19 -- tests/test_init.py | 4 +- tests/test_job_submission_error.py | 15 +- tests/test_parser.py | 67 ++++- tests/test_registry.py | 35 --- tests/test_slurm_command_gen_strategy.py | 21 +- tests/test_slurm_install_strategy.py | 17 +- tests/test_slurm_system.py | 41 +-- tests/test_slurm_system_parser.py | 115 -------- tests/test_standalone_system.py | 17 +- tests/test_toml_files.py | 16 ++ 31 files changed, 483 insertions(+), 938 deletions(-) delete mode 100644 src/cloudai/_core/system_parser.py rename src/cloudai/{_core => }/parser.py (63%) delete mode 100644 src/cloudai/parser/system_parser/__init__.py delete mode 100644 src/cloudai/parser/system_parser/kubernetes_system_parser.py delete mode 100644 src/cloudai/parser/system_parser/slurm_system_parser.py delete mode 100644 src/cloudai/parser/system_parser/standalone_system_parser.py rename src/cloudai/parser/__init__.py => tests/conftest.py (57%) delete mode 100644 tests/test_slurm_system_parser.py diff --git a/README.md b/README.md index 2b4d6024..2be34ada 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,16 @@ cloudai\ --tests-dir conf/common/test ``` +Verify if system configs are valid: +```bash +cloudai\ + --mode verify-systems\ + --tests-dir conf/common/test\ + --test-templates-dir conf/common/test_template\ + --system-config conf/common/system +``` +`--system-config` can be a file or a directory to verify all configs in the directory. + ## Contributing Feel free to contribute to the CloudAI project. Your contributions are highly appreciated. diff --git a/conf/common/system/example_slurm_cluster.toml b/conf/common/system/example_slurm_cluster.toml index 93b3fbc5..ddf2f210 100644 --- a/conf/common/system/example_slurm_cluster.toml +++ b/conf/common/system/example_slurm_cluster.toml @@ -25,31 +25,29 @@ mpi = "pmix" gpus_per_node = 8 ntasks_per_node = 8 -[partitions] - [partitions.partition_1] - name = "partition_1" - nodes = ["node-[001-100]"] - - [partitions.partition_2] - name = "partition_2" - nodes = ["node-[101-200]"] - - [partitions.partition_1.groups] - [partitions.partition_1.groups.group_1] - name = "group_1" - nodes = ["node-[001-025]"] - - [partitions.partition_1.groups.group_2] - name = "group_2" - nodes = ["node-[026-050]"] - - [partitions.partition_1.groups.group_3] - name = "group_3" - nodes = ["node-[051-075]"] - - [partitions.partition_1.groups.group_4] - name = "group_4" - nodes = ["node-[076-100]"] +[[partitions]] +name = "partition_1" +nodes = ["node-[001-100]"] + + [[partitions.groups]] + name = "group_1" + nodes = ["node-[001-025]"] + + [[partitions.groups]] + name = "group_2" + nodes = ["node-[026-050]"] + + [[partitions.groups]] + name = "group_3" + nodes = ["node-[051-075]"] + + [[partitions.groups]] + name = "group_4" + nodes = ["node-[076-100]"] + +[[partitions]] +name = "partition_2" +nodes = ["node-[101-200]"] [global_env_vars] # NCCL Specific Configurations diff --git a/pyproject.toml b/pyproject.toml index 4d9d2d91..ce1ce69a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "tbparse==0.0.8", "toml==0.10.2", "kubernetes==30.1.0", + "pydantic==2.8.2", ] [build-system] diff --git a/requirements.txt b/requirements.txt index 49603d95..ddaf06e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ pandas==2.2.1 tbparse==0.0.8 toml==0.10.2 kubernetes==30.1.0 +pydantic==2.8.2 diff --git a/src/cloudai/__init__.py b/src/cloudai/__init__.py index 19852423..67e3e5e8 100644 --- a/src/cloudai/__init__.py +++ b/src/cloudai/__init__.py @@ -27,22 +27,22 @@ from ._core.job_status_result import JobStatusResult from ._core.job_status_retrieval_strategy import JobStatusRetrievalStrategy from ._core.json_gen_strategy import JsonGenStrategy -from ._core.parser import Parser from ._core.registry import Registry from ._core.report_generation_strategy import ReportGenerationStrategy from ._core.runner import Runner from ._core.system import System from ._core.test import Test +from ._core.test_parser import TestParser from ._core.test_scenario import TestRun, TestScenario +from ._core.test_scenario_parser import TestScenarioParser from ._core.test_template import TestTemplate +from ._core.test_template_parser import TestTemplateParser from ._core.test_template_strategy import TestTemplateStrategy from .installer.installer import Installer from .installer.kubernetes_installer import KubernetesInstaller from .installer.slurm_installer import SlurmInstaller from .installer.standalone_installer import StandaloneInstaller -from .parser.system_parser.kubernetes_system_parser import KubernetesSystemParser -from .parser.system_parser.slurm_system_parser import SlurmSystemParser -from .parser.system_parser.standalone_system_parser import StandaloneSystemParser +from .parser import Parser from .report_generator import ReportGenerator from .runner.kubernetes.kubernetes_runner import KubernetesRunner from .runner.slurm.slurm_runner import SlurmRunner @@ -93,10 +93,6 @@ from .systems.slurm.slurm_system import SlurmSystem from .systems.standalone_system import StandaloneSystem -Registry().add_system_parser("standalone", StandaloneSystemParser) -Registry().add_system_parser("slurm", SlurmSystemParser) -Registry().add_system_parser("kubernetes", KubernetesSystemParser) - Registry().add_runner("slurm", SlurmRunner) Registry().add_runner("kubernetes", KubernetesRunner) Registry().add_runner("standalone", StandaloneRunner) @@ -165,6 +161,10 @@ Registry().add_installer("standalone", StandaloneInstaller) Registry().add_installer("kubernetes", KubernetesInstaller) +Registry().add_system("slurm", SlurmSystem) +Registry().add_system("standalone", StandaloneSystem) +Registry().add_system("kubernetes", KubernetesSystem) + __all__ = [ "BaseInstaller", "BaseJob", @@ -189,4 +189,7 @@ "TestScenario", "TestTemplate", "TestTemplateStrategy", + "TestParser", + "TestScenarioParser", + "TestTemplateParser", ] diff --git a/src/cloudai/__main__.py b/src/cloudai/__main__.py index fcce2238..b4782939 100644 --- a/src/cloudai/__main__.py +++ b/src/cloudai/__main__.py @@ -88,6 +88,7 @@ def parse_arguments() -> argparse.Namespace: "run", "generate-report", "uninstall", + "verify-systems", ], help=( "Operating mode: 'install' to install test templates, 'dry-run' " @@ -253,11 +254,39 @@ def handle_generate_report(test_scenario: TestScenario, output_dir: Path) -> Non logging.info("Report generation completed.") +def handle_verify_systems(root: Path) -> int: + if not root.exists(): + logging.error(f"Tests directory {root} does not exist.") + return 1 + + test_tomls = [root] + if root.is_dir(): + test_tomls = list(root.glob("*.toml")) + if not test_tomls: + logging.error(f"No test tomls found in {root}") + return 1 + + rc = 0 + for test_toml in test_tomls: + logging.info(f"Verifying {test_toml}...") + try: + Parser.parse_system(test_toml) + except Exception: + rc = 1 + break + + return rc + + def main() -> None: args = parse_arguments() setup_logging(args.log_file, args.log_level) + if args.mode == "verify-systems": + rc = handle_verify_systems(Path(args.system_config)) + exit(rc) + system_config_path = Path(args.system_config) test_templates_dir = Path(args.test_templates_dir) tests_dir = Path(args.tests_dir) diff --git a/src/cloudai/_core/registry.py b/src/cloudai/_core/registry.py index 1de7c1f9..35ecd0b9 100644 --- a/src/cloudai/_core/registry.py +++ b/src/cloudai/_core/registry.py @@ -18,7 +18,6 @@ from .base_installer import BaseInstaller from .base_runner import BaseRunner -from .base_system_parser import BaseSystemParser from .grading_strategy import GradingStrategy from .job_id_retrieval_strategy import JobIdRetrievalStrategy from .job_status_retrieval_strategy import JobStatusRetrievalStrategy @@ -42,7 +41,6 @@ def __new__(cls, name, bases, dct): class Registry(metaclass=Singleton): """Registry for implementations mappings.""" - system_parsers_map: Dict[str, Type[BaseSystemParser]] = {} runners_map: Dict[str, Type[BaseRunner]] = {} strategies_map: Dict[ Tuple[ @@ -70,36 +68,7 @@ class Registry(metaclass=Singleton): ] = {} test_templates_map: Dict[str, Type[TestTemplate]] = {} installers_map: Dict[str, Type[BaseInstaller]] = {} - - def add_system_parser(self, name: str, value: Type[BaseSystemParser]) -> None: - """ - Add a new system parser implementation mapping. - - Args: - name (str): The name of the system parser. - value (Type[BaseSystemParser]): The system parser implementation. - - Raises: - ValueError: If the system parser implementation already exists. - """ - if name in self.system_parsers_map: - raise ValueError(f"Duplicating implementation for '{name}', use 'update()' for replacement.") - self.update_system_parser(name, value) - - def update_system_parser(self, name: str, value: Type[BaseSystemParser]) -> None: - """ - Create or replace system parser implementation mapping. - - Args: - name (str): The name of the system parser. - value (Type[BaseSystemParser]): The system parser implementation. - - Raises: - ValueError: If value is not a subclass of BaseSystemParser. - """ - if not issubclass(value, BaseSystemParser): - raise ValueError(f"Invalid system implementation for '{name}', should be subclass of 'System'.") - self.system_parsers_map[name] = value + systems_map: Dict[str, Type[System]] = {} def add_runner(self, name: str, value: Type[BaseRunner]) -> None: """ @@ -274,3 +243,33 @@ def update_installer(self, name: str, value: Type[BaseInstaller]) -> None: if not issubclass(value, BaseInstaller): raise ValueError(f"Invalid installer implementation for '{name}', should be subclass of 'BaseInstaller'.") self.installers_map[name] = value + + def add_system(self, name: str, value: Type[System]) -> None: + """ + Add a new system implementation mapping. + + Args: + name (str): The name of the system. + value (Type[System]): The system implementation. + + Raises: + ValueError: If the system implementation already exists. + """ + if name in self.systems_map: + raise ValueError(f"Duplicating implementation for '{name}', use 'update()' for replacement.") + self.update_system(name, value) + + def update_system(self, name: str, value: Type[System]) -> None: + """ + Create or replace system implementation mapping. + + Args: + name (str): The name of the system. + value (Type[System]): The system implementation. + + Raises: + ValueError: If value is not a subclass of System. + """ + if not issubclass(value, System): + raise ValueError(f"Invalid system implementation for '{name}', should be subclass of 'System'.") + self.systems_map[name] = value diff --git a/src/cloudai/_core/system_parser.py b/src/cloudai/_core/system_parser.py deleted file mode 100644 index 4b93fd8a..00000000 --- a/src/cloudai/_core/system_parser.py +++ /dev/null @@ -1,74 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pathlib import Path - -import toml - -from .registry import Registry -from .system import System - - -class SystemParser: - """ - Parser for parsing system configurations. - - Attributes - _parsers (Dict[str, Type[BaseSystemParser]]): A mapping from system types to their corresponding parser - classes. - file_path (Path): The file path to the system configuration file. - """ - - _parsers = {} - - def __init__(self, file_path: Path): - """ - Initialize a SystemParser instance. - - Args: - file_path (Path): The file path to the system configuration file. - """ - self.file_path: Path = file_path - - def parse(self) -> System: - """ - Parse the system configuration file, identifying the scheduler type and invoking the appropriate parser. - - Raises - FileNotFoundError: If the file path does not exist or is not a file. - KeyError: If the 'scheduler' key is missing from the configuration. - ValueError: If the 'scheduler' value is unsupported. - - Returns - System: The parsed system object. - """ - if not self.file_path.is_file(): - raise FileNotFoundError(f"The file '{self.file_path}' does not exist.") - - with self.file_path.open("r") as file: - data = toml.load(file) - scheduler = data.get("scheduler", "").lower() - registry = Registry() - if scheduler not in registry.system_parsers_map: - raise ValueError( - f"Unsupported system type '{scheduler}'. " - f"Supported types: {', '.join(registry.system_parsers_map.keys())}" - ) - parser_class = registry.system_parsers_map[scheduler] - if parser_class is None: - raise NotImplementedError(f"No parser registered for system type: {scheduler}") - parser = parser_class() - return parser.parse(data) diff --git a/src/cloudai/_core/parser.py b/src/cloudai/parser.py similarity index 63% rename from src/cloudai/_core/parser.py rename to src/cloudai/parser.py index 82dd2b80..50215cda 100644 --- a/src/cloudai/_core/parser.py +++ b/src/cloudai/parser.py @@ -18,27 +18,24 @@ from pathlib import Path from typing import List, Optional, Tuple -from .system import System -from .system_parser import SystemParser -from .test import Test -from .test_parser import TestParser -from .test_scenario import TestScenario -from .test_scenario_parser import TestScenarioParser -from .test_template import TestTemplate -from .test_template_parser import TestTemplateParser +import toml +from pydantic import ValidationError +from pydantic_core import ErrorDetails +from cloudai import ( + Registry, + System, + Test, + TestParser, + TestScenario, + TestScenarioParser, + TestTemplate, + TestTemplateParser, +) -class Parser: - """ - Main parser for parsing all types of configurations. - Attributes - system_config_path (str): The file path for system configurations. - test_template_path (str): The file path for test template configurations. - test_path (str): The file path for test configurations. - test_scenario_path (str): The file path for test scenario configurations. - logger (logging.Logger): Logger for the parser. - """ +class Parser: + """Main parser for parsing all types of configurations.""" def __init__(self, system_config_path: Path, test_templates_dir: Path) -> None: """ @@ -65,9 +62,7 @@ def parse( if not test_path.exists(): raise FileNotFoundError(f"Test path '{test_path}' not found.") - system_parser = SystemParser(self.system_config_path) - system = system_parser.parse() - logging.debug("Parsed system config") + system = self.parse_system(self.system_config_path) test_template_parser = TestTemplateParser(system, self.test_template_path) test_templates: List[TestTemplate] = test_template_parser.parse_all() @@ -90,3 +85,34 @@ def parse( filtered_tests = [t for t in tests if t.name in scenario_tests] return system, filtered_tests, test_scenario + + @staticmethod + def parse_system(system_config_path: Path) -> System: + registry = Registry() + with Path(system_config_path).open() as f: + logging.debug(f"Opened system config file: {system_config_path}") + data = toml.load(f) + scheduler = data.get("scheduler", "").lower() + if scheduler not in registry.systems_map: + raise ValueError( + f"Unsupported system type '{scheduler}' in {system_config_path}. " + f"Supported types: {', '.join(registry.systems_map.keys())}" + ) + + try: + system = registry.systems_map[scheduler](**data) + except ValidationError as e: + for err in e.errors(include_url=False): + err_msg = Parser.format_validation_error(err) + logging.error(err_msg) + raise ValueError("Failed to parse system definition") from e + + return system + + @staticmethod + def format_validation_error(err: ErrorDetails) -> str: + logging.error(f"Validation error: {err}") + if err["msg"] == "Field required": + return f"Field '{'.'.join(str(v) for v in err['loc'])}': {err['msg']}" + + return f"Field '{'.'.join(str(v) for v in err['loc'])}' with value '{err['input']}' is invalid: {err['msg']}" diff --git a/src/cloudai/parser/system_parser/__init__.py b/src/cloudai/parser/system_parser/__init__.py deleted file mode 100644 index 23bfc249..00000000 --- a/src/cloudai/parser/system_parser/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/cloudai/parser/system_parser/kubernetes_system_parser.py b/src/cloudai/parser/system_parser/kubernetes_system_parser.py deleted file mode 100644 index a8e36c7e..00000000 --- a/src/cloudai/parser/system_parser/kubernetes_system_parser.py +++ /dev/null @@ -1,86 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pathlib import Path -from typing import Any, Dict - -from cloudai import BaseSystemParser -from cloudai.systems.kubernetes import KubernetesSystem - - -class KubernetesSystemParser(BaseSystemParser): - """Parser for parsing Kubernetes system configurations.""" - - def parse(self, data: Dict[str, Any]) -> KubernetesSystem: - """ - Parse the Kubernetes system configuration. - - Args: - data (Dict[str, Any]): The loaded configuration data. - - Returns: - KubernetesSystem: The parsed Kubernetes system object. - - Raises: - ValueError: If any mandatory field is missing from the data. - """ - - def get_mandatory_field(field_name: str) -> str: - value = data.get(field_name) - if not value: - raise ValueError( - f"Mandatory field '{field_name}' is missing in the Kubernetes system schema. " - f"Please ensure that '{field_name}' is present and correctly defined in your system configuration." - ) - return value - - def safe_int(value): - try: - return int(value) if value is not None else None - except ValueError: - return None - - def str_to_bool(value: Any) -> bool: - if isinstance(value, bool): - return value - return value.lower() in ("true", "1", "yes") - - # Extract and validate mandatory fields - name = get_mandatory_field("name") - install_path = Path(get_mandatory_field("install_path")).resolve() - output_path = Path(get_mandatory_field("output_path")).resolve() - default_image = get_mandatory_field("default_image") - default_namespace = get_mandatory_field("default_namespace") - - # Extract optional fields - kube_config_path = data.get("kube_config_path") - if not kube_config_path or len(kube_config_path) == 0 or kube_config_path.isspace(): - home_directory = Path.home() - kube_config_path = home_directory / ".kube" / "config" - else: - kube_config_path = Path(kube_config_path).resolve() - - global_env_vars = data.get("global_env_vars", {}) - - return KubernetesSystem( - name=name, - install_path=install_path, - output_path=output_path, - kube_config_path=kube_config_path, - default_namespace=default_namespace, - default_image=default_image, - global_env_vars=global_env_vars, - ) diff --git a/src/cloudai/parser/system_parser/slurm_system_parser.py b/src/cloudai/parser/system_parser/slurm_system_parser.py deleted file mode 100644 index 27bdd696..00000000 --- a/src/cloudai/parser/system_parser/slurm_system_parser.py +++ /dev/null @@ -1,162 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pathlib import Path -from typing import Any, Dict, List - -from cloudai import BaseSystemParser -from cloudai.systems.slurm import SlurmNode, SlurmNodeState, SlurmSystem - - -class SlurmSystemParser(BaseSystemParser): - """Parser for parsing Slurm system configurations.""" - - def parse(self, data: Dict[str, Any]) -> SlurmSystem: # noqa: C901 - """ - Parse the Slurm system configuration. - - Args: - data (Dict[str, Any]): The loaded configuration data. - - Returns: - SlurmSystem: The parsed Slurm system object. - - Raises: - ValueError: If 'name' or 'partitions' are missing from the data or if there are node list parsing issues - or group membership conflicts. - """ - - def safe_int(value): - try: - return int(value) if value is not None else None - except ValueError: - return None - - def str_to_bool(value: Any) -> bool: - if isinstance(value, bool): - return value - return value.lower() in ("true", "1", "yes") - - name = data.get("name") - if not name: - raise ValueError("Missing mandatory field: 'name'") - - install_path = data.get("install_path") - if not install_path: - raise ValueError("Field 'install_path' is required.") - install_path = Path(install_path).absolute() - - output_path = data.get("output_path") - if not output_path: - raise ValueError("Field 'output_path' is required.") - output_path = Path(output_path).absolute() - - default_partition = data.get("default_partition") - if not default_partition: - raise ValueError("Field 'default_partition' is required.") - - partitions = data.get("partitions") - if not partitions: - raise ValueError("Missing mandatory field: 'partitions'") - - # Check if default_partition exists in partitions - partition_names = [partition_data.get("name") for partition_data in partitions.values()] - if default_partition not in partition_names: - raise ValueError(f"Default partition '{default_partition}' is not listed in partitions.") - - global_env_vars = data.get("global_env_vars", {}) - - account = data.get("account") - distribution = data.get("distribution") - - mpi = data.get("mpi", "pmix") - gpus_per_node = safe_int(data.get("gpus_per_node")) - ntasks_per_node = safe_int(data.get("ntasks_per_node")) - - cache_docker_images_locally = str_to_bool(data.get("cache_docker_images_locally", "False")) - - nodes_dict: Dict[str, SlurmNode] = {} - updated_partitions: Dict[str, List[SlurmNode]] = {} - updated_groups: Dict[str, Dict[str, List[SlurmNode]]] = {} - - for partition_data in partitions.values(): - partition_name = partition_data.get("name") - if not partition_name: - raise ValueError("Partition data does not include a 'name' field.") - - raw_nodes = partition_data.get("nodes", []) - node_names = set() - for group in raw_nodes: - node_names.update(set(SlurmSystem.parse_node_list(group))) - - if not node_names: - raise ValueError(f"No valid nodes found in partition '{partition_name}'") - - partition_nodes = [] - for node_name in node_names: - if node_name not in nodes_dict: - node = SlurmNode( - name=node_name, - partition=partition_name, - state=SlurmNodeState.UNKNOWN_STATE, - ) - nodes_dict[node_name] = node - else: - node = nodes_dict[node_name] - node.partition = partition_name - partition_nodes.append(node) - updated_partitions[partition_name] = partition_nodes - - groups = partition_data.get("groups", {}) - updated_groups[partition_name] = {} - for group_data in groups.values(): - group_name = group_data.get("name") - if not group_name: - raise ValueError("Group data does not include a 'name' field.") - - raw_nodes = group_data.get("nodes", []) - group_node_names = set() - for group in raw_nodes: - group_node_names.update(set(SlurmSystem.parse_node_list(group))) - - group_nodes = [] - for group_node_name in group_node_names: - if group_node_name in nodes_dict: - group_nodes.append(nodes_dict[group_node_name]) - else: - raise ValueError( - f"Node '{group_node_name}' in group '{group_name}' not found in partition " - "'{partition_name}' nodes." - ) - - updated_groups[partition_name][group_name] = group_nodes - - return SlurmSystem( - name=name, - install_path=install_path, - output_path=output_path, - default_partition=default_partition, - partitions=updated_partitions, - account=account, - distribution=distribution, - mpi=mpi, - gpus_per_node=gpus_per_node, - ntasks_per_node=ntasks_per_node, - cache_docker_images_locally=cache_docker_images_locally, - groups=updated_groups, - global_env_vars=global_env_vars, - extra_srun_args=data.get("extra_srun_args"), - ) diff --git a/src/cloudai/parser/system_parser/standalone_system_parser.py b/src/cloudai/parser/system_parser/standalone_system_parser.py deleted file mode 100644 index 4e8e4869..00000000 --- a/src/cloudai/parser/system_parser/standalone_system_parser.py +++ /dev/null @@ -1,50 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pathlib import Path -from typing import Any, Dict - -from cloudai import BaseSystemParser -from cloudai.systems import StandaloneSystem - - -class StandaloneSystemParser(BaseSystemParser): - """Parser for parsing Standalone system configurations.""" - - def parse(self, data: Dict[str, Any]) -> StandaloneSystem: - """ - Parse the Standalone system configuration. - - Args: - data (Dict[str, Any]): The loaded configuration data. - - Returns: - StandaloneSystem: The parsed Standalone system object. - - Raises: - ValueError: If 'name' or 'output_path' are missing from the data or if there are node list parsing issues - or group membership conflicts. - """ - name = data.get("name") - if not name: - raise ValueError("Missing mandatory field: 'name'") - - output_path = data.get("output_path") - if not output_path: - raise ValueError("Field 'output_path' is required.") - output_path = Path(output_path).absolute() - - return StandaloneSystem(name=name, output_path=output_path) diff --git a/src/cloudai/systems/kubernetes/kubernetes_system.py b/src/cloudai/systems/kubernetes/kubernetes_system.py index b9a8d887..fd157908 100644 --- a/src/cloudai/systems/kubernetes/kubernetes_system.py +++ b/src/cloudai/systems/kubernetes/kubernetes_system.py @@ -17,16 +17,17 @@ import logging import time from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, cast +from typing import Any, Dict, List, Tuple, cast from kubernetes import client, config from kubernetes.client import ApiException, CustomObjectsApi, V1DeleteOptions, V1Job +from pydantic import BaseModel, ConfigDict from cloudai import BaseJob, System from cloudai.runner.kubernetes.kubernetes_job import KubernetesJob -class KubernetesSystem(System): +class KubernetesSystem(BaseModel, System): """ Represents a Kubernetes system. @@ -36,34 +37,22 @@ class KubernetesSystem(System): default_image (str): Default Docker image to be used for jobs. """ - def __init__( - self, - name: str, - install_path: Path, - output_path: Path, - kube_config_path: Path, - default_namespace: str, - default_image: str, - global_env_vars: Optional[Dict[str, Any]] = None, - ) -> None: - """ - Initialize a KubernetesSystem instance. - - Args: - name (str): Name of the Kubernetes system. - install_path (Path): The installation path of CloudAI. - output_path (Path): Path to the output directory. - kube_config_path (Path): Path to the Kubernetes config file. - default_namespace (str): The default Kubernetes namespace for jobs. - default_image (str): Default Docker image to be used for jobs. - global_env_vars (Optional[Dict[str, Any]]): Dictionary containing additional configuration settings for - the system. - """ - super().__init__(name, "kubernetes", install_path, output_path, global_env_vars) - self.kube_config_path = kube_config_path.resolve() - self.default_namespace = default_namespace - self.default_image = default_image - + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + name: str + install_path: Path + output_path: Path + kube_config_path: Path + default_namespace: str + default_image: str + scheduler: str = "kubernetes" + global_env_vars: Dict[str, Any] = {} + _core_v1: client.CoreV1Api + _batch_v1: client.BatchV1Api + _custom_objects_api: CustomObjectsApi + + def __post_init__(self) -> None: + """Initialize the KubernetesSystem instance.""" # Load the Kubernetes configuration if not self.kube_config_path.exists(): error_message = ( @@ -76,12 +65,27 @@ def __init__( # Instantiate Kubernetes APIs logging.debug(f"Loading kube config from: {self.kube_config_path}") config.load_kube_config(config_file=str(self.kube_config_path)) - self.core_v1 = client.CoreV1Api() - self.batch_v1 = client.BatchV1Api() - self.custom_objects_api = CustomObjectsApi() logging.debug(f"{self.__class__.__name__} initialized") + @property + def core_v1(self) -> client.CoreV1Api: + if self._core_v1 is None: + self._core_v1 = client.CoreV1Api() + return self._core_v1 + + @property + def batch_v1(self) -> client.BatchV1Api: + if self._batch_v1 is None: + self._batch_v1 = client.BatchV1Api() + return self._batch_v1 + + @property + def custom_objects_api(self) -> CustomObjectsApi: + if self._custom_objects_api is None: + self._custom_objects_api = CustomObjectsApi() + return self._custom_objects_api + def __repr__(self) -> str: """ Provide a structured string representation of the system. diff --git a/src/cloudai/systems/slurm/__init__.py b/src/cloudai/systems/slurm/__init__.py index 26b4b973..6689b902 100644 --- a/src/cloudai/systems/slurm/__init__.py +++ b/src/cloudai/systems/slurm/__init__.py @@ -15,10 +15,12 @@ # limitations under the License. from .slurm_node import SlurmNode, SlurmNodeState -from .slurm_system import SlurmSystem +from .slurm_system import SlurmGroup, SlurmPartition, SlurmSystem __all__ = [ + "SlurmGroup", "SlurmNode", "SlurmNodeState", + "SlurmPartition", "SlurmSystem", ] diff --git a/src/cloudai/systems/slurm/slurm_node.py b/src/cloudai/systems/slurm/slurm_node.py index f7e9a25c..67a90cb1 100644 --- a/src/cloudai/systems/slurm/slurm_node.py +++ b/src/cloudai/systems/slurm/slurm_node.py @@ -16,6 +16,8 @@ from enum import Enum +from pydantic import BaseModel, ConfigDict + class SlurmNodeState(Enum): """ @@ -94,7 +96,7 @@ class SlurmNodeState(Enum): UNKNOWN_STATE = "UNKNOWN" -class SlurmNode: +class SlurmNode(BaseModel): """ Represents a Slurm compute node with detailed state and partition info. @@ -105,17 +107,11 @@ class SlurmNode: user (str): The name of the user currently using the node. Defaults to N/A if the node is not being used. """ - def __init__( - self, - name: str, - partition: str, - state: SlurmNodeState, - user: str = "N/A", - ) -> None: - self.name = name - self.partition = partition - self.state = state - self.user = user + model_config = ConfigDict(extra="forbid") + name: str + partition: str + state: SlurmNodeState + user: str = "N/A" def allocatable(self, free_only: bool = True) -> bool: """ diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index 18a512c1..c9f256b6 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -20,13 +20,85 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple +from pydantic import BaseModel, ConfigDict + from cloudai import BaseJob, System from cloudai.util import CommandShell from .slurm_node import SlurmNode, SlurmNodeState -class SlurmSystem(System): +def parse_node_list(node_list: str) -> List[str]: + """ + Expand a list of node names (with ranges) into a flat list of individual node names, keeping leading zeroes. + + Args: + node_list (str): A list of node names, possibly including ranges. + + Returns: + List[str]: A flat list of expanded node names with preserved zeroes. + """ + node_list = node_list.strip() + nodes = [] + if not node_list: + return [] + + components = re.split(r",\s*(?![^[]*\])", node_list) + for component in components: + if "[" not in component: + nodes.append(component) + else: + header, node_number = component.split("[") + node_number = node_number.replace("]", "") + ranges = node_number.split(",") + for r in ranges: + if "-" in r: + start_node, end_node = r.split("-") + number_of_digits = len(end_node) + nodes.extend( + [f"{header}{str(i).zfill(number_of_digits)}" for i in range(int(start_node), int(end_node) + 1)] + ) + else: + nodes.append(f"{header}{r}") + + return nodes + + +class SlurmGroup(BaseModel): + """Represents a group of nodes within a partition.""" + + model_config = ConfigDict(extra="forbid") + name: str + nodes: List[str] + + +class SlurmPartition(BaseModel): + """Represents a partition within a Slurm system.""" + + model_config = ConfigDict(extra="forbid") + name: str + nodes: List[str] + groups: List[SlurmGroup] = [] + + _slurm_nodes: List[SlurmNode] = [] + + @property + def slurm_nodes(self) -> List[SlurmNode]: + if self._slurm_nodes: + return self._slurm_nodes + + node_names = set() + for nodes_list in self.nodes: + node_names.update(set(parse_node_list(nodes_list))) + + self._slurm_nodes = [ + SlurmNode(name=node_name, partition=self.name, state=SlurmNodeState.UNKNOWN_STATE) + for node_name in node_names + ] + return self._slurm_nodes + + +class SlurmSystem(BaseModel, System): """ Represents a Slurm system. @@ -47,77 +119,37 @@ class SlurmSystem(System): cmd_shell (CommandShell): An instance of CommandShell for executing system commands. """ - def __init__( - self, - name: str, - install_path: Path, - output_path: Path, - default_partition: str, - partitions: Dict[str, List[SlurmNode]], - account: Optional[str] = None, - distribution: Optional[str] = None, - mpi: Optional[str] = None, - gpus_per_node: Optional[int] = None, - ntasks_per_node: Optional[int] = None, - cache_docker_images_locally: bool = False, - groups: Optional[Dict[str, Dict[str, List[SlurmNode]]]] = None, - global_env_vars: Optional[Dict[str, Any]] = None, - extra_srun_args: Optional[str] = None, - ) -> None: - """ - Initialize a SlurmSystem instance. - - Args: - name (str): Name of the Slurm system. - install_path (Path): The installation path of CloudAI. - output_path (Path): Path to the output directory. - default_partition (str): Default partition. - partitions (Dict[str, List[SlurmNode]]): Partitions in the system. - account (Optional[str]): Account name for charging resources used by this job. - distribution (Optional[str]): Specifies alternate distribution methods for remote processes. - mpi (Optional[str]): Indicates the Process Management Interface (PMI) implementation to be used for - inter-process communication. - gpus_per_node (Optional[int]): Specifies the number of GPUs available per node. - ntasks_per_node (Optional[int]): Specifies the number of tasks that can run concurrently on a single node. - cache_docker_images_locally (bool): Whether to cache Docker images locally for the Slurm system. - groups (Optional[Dict[str, Dict[str, List[SlurmNode]]]]): Nested mapping of group names to lists of - SlurmNodes within partitions, defining the group composition within each partition. Defaults to an - empty dictionary if not provided. - global_env_vars (Optional[Dict[str, Any]]): Dictionary containing additional configuration settings for - the system. - extra_srun_args (Optional[str]): Additional arguments to be passed to the srun command. - """ - super().__init__(name, "slurm", install_path, output_path, global_env_vars) - self.default_partition = default_partition - self.partitions = partitions - self.account = account - self.distribution = distribution - self.mpi = mpi - self.gpus_per_node = gpus_per_node - self.ntasks_per_node = ntasks_per_node - self.cache_docker_images_locally = cache_docker_images_locally - self.groups = groups if groups is not None else {} - self.extra_srun_args = extra_srun_args - self.cmd_shell = CommandShell() - logging.debug(f"{self.__class__.__name__} initialized") - - def __repr__(self) -> str: - """ - Provide a structured string representation of the system. - - Including the system name, scheduler type, and a simplified view similar to the `sinfo` command output, - focusing on the partition, state, and nodelist. - """ - header = f"System Name: {self.name}\nScheduler Type: {self.scheduler}" - parts = [header, "\tPARTITION STATE NODELIST"] - for partition_name, nodes in self.partitions.items(): - state_count = {} - for node in nodes: - state_count.setdefault(node.state, []).append(node.name) - for state, names in state_count.items(): - node_list_str = self.format_node_list(names) - parts.append(f"\t{partition_name:<10} {state.name:<7} {node_list_str}") - return "\n".join(parts) + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + name: str + install_path: Path + output_path: Path + default_partition: str + partitions: List[SlurmPartition] + account: Optional[str] = None + distribution: Optional[str] = None + mpi: str = "pmix" + gpus_per_node: Optional[int] = None + ntasks_per_node: Optional[int] = None + cache_docker_images_locally: bool = False + global_env_vars: Dict[str, Any] = {} + scheduler: str = "standalone" + monitor_interval: int = 1 + cmd_shell: CommandShell = CommandShell() + extra_srun_args: Optional[str] = None + + @property + def groups(self) -> Dict[str, Dict[str, List[SlurmNode]]]: + groups: Dict[str, Dict[str, List[SlurmNode]]] = {} + for part in self.partitions: + groups[part.name] = {} + for group in part.groups: + node_names = set() + for group_nodes in group.nodes: + node_names.update(set(parse_node_list(group_nodes))) + groups[part.name][group.name] = [node for node in part.slurm_nodes if node.name in node_names] + + return groups def update(self) -> None: """ @@ -231,45 +263,6 @@ def kill(self, job: BaseJob) -> None: assert isinstance(job.id, int) self.scancel(job.id) - @classmethod - def parse_node_list(cls, node_list: str) -> List[str]: - """ - Expand a list of node names (with ranges) into a flat list of individual node names, keeping leading zeroes. - - Args: - node_list (str): A list of node names, possibly including ranges. - - Returns: - List[str]: A flat list of expanded node names with preserved zeroes. - """ - node_list = node_list.strip() - nodes = [] - if not node_list: - return [] - - components = re.split(r",\s*(?![^[]*\])", node_list) - for component in components: - if "[" not in component: - nodes.append(component) - else: - header, node_number = component.split("[") - node_number = node_number.replace("]", "") - ranges = node_number.split(",") - for r in ranges: - if "-" in r: - start_node, end_node = r.split("-") - number_of_digits = len(end_node) - nodes.extend( - [ - f"{header}{str(i).zfill(number_of_digits)}" - for i in range(int(start_node), int(end_node) + 1) - ] - ) - else: - nodes.append(f"{header}{r}") - - return nodes - @classmethod def format_node_list(cls, node_names: List[str]) -> str: """ @@ -347,9 +340,27 @@ def format_range(lst: List[int], padding: int) -> List[str]: return ", ".join(formatted_ranges) + def __repr__(self) -> str: + """ + Provide a structured string representation of the system. + + Including the system name, scheduler type, and a simplified view similar to the `sinfo` command output, + focusing on the partition, state, and nodelist. + """ + header = f"System Name: {self.name}\nScheduler Type: {self.scheduler}" + parts = [header, "\tPARTITION STATE NODELIST"] + for partition in self.partitions: + state_count = {} + for node in partition.slurm_nodes: + state_count.setdefault(node.state, []).append(node.name) + for state, names in state_count.items(): + node_list_str = self.format_node_list(names) + parts.append(f"\t{partition.name:<10} {state.name:<7} {node_list_str}") + return "\n".join(parts) + def get_partition_names(self) -> List[str]: """Return a list of all partition names.""" - return list(self.partitions.keys()) + return [partition.name for partition in self.partitions] def get_partition_nodes(self, partition_name: str) -> List[SlurmNode]: """ @@ -364,9 +375,10 @@ def get_partition_nodes(self, partition_name: str) -> List[SlurmNode]: Raises: ValueError: If the partition does not exist. """ - if partition_name not in self.partitions: - raise ValueError(f"Partition '{partition_name}' not found.") - return self.partitions[partition_name] + for partition in self.partitions: + if partition.name == partition_name: + return partition.slurm_nodes + raise ValueError(f"Partition '{partition_name}' not found.") def get_partition_node_names(self, partition_name: str) -> List[str]: """ @@ -515,7 +527,7 @@ def is_node_in_system(self, node_name: str) -> bool: Returns: True if the node is part of the system, otherwise False. """ - return any(any(node.name == node_name for node in nodes) for nodes in self.partitions.values()) + return any(any(node.name == node_name for node in part.slurm_nodes) for part in self.partitions) def scancel(self, job_id: int) -> None: """ @@ -598,7 +610,7 @@ def parse_squeue_output(self, squeue_output: str) -> Dict[str, str]: node_list_part, user = parts[0], "|".join(parts[1:]) # Handle cases where multiple node groups or ranges are specified - for node in self.parse_node_list(node_list_part): + for node in parse_node_list(node_list_part): node_user_map[node] = user.strip() return node_user_map @@ -619,17 +631,17 @@ def parse_sinfo_output(self, sinfo_output: str, node_user_map: Dict[str, str]) - continue partition, _, _, _, state, nodelist = parts[:6] partition = partition.rstrip("*") - node_names = self.parse_node_list(nodelist) + node_names = parse_node_list(nodelist) # Convert state to enum, handling states with suffixes state_enum = self.convert_state_to_enum(state) for node_name in node_names: # Find the partition and node to update the state - for part_name, nodes in self.partitions.items(): - if part_name != partition: + for part in self.partitions: + if part.name != partition: continue - for node in nodes: + for node in part.slurm_nodes: if node.name == node_name: node.state = state_enum node.user = node_user_map.get(node_name, "N/A") @@ -726,7 +738,7 @@ def parse_nodes(self, nodes: List[str]) -> List[str]: else: # Handle both individual node names and ranges if self.is_node_in_system(node_spec) or "[" in node_spec: - expanded_nodes = self.parse_node_list(node_spec) + expanded_nodes = parse_node_list(node_spec) parsed_nodes += expanded_nodes else: raise ValueError(f"Node '{node_spec}' not found.") diff --git a/src/cloudai/systems/standalone_system.py b/src/cloudai/systems/standalone_system.py index 6531d4f2..151b06ba 100644 --- a/src/cloudai/systems/standalone_system.py +++ b/src/cloudai/systems/standalone_system.py @@ -17,37 +17,27 @@ import logging from pathlib import Path +from pydantic import BaseModel, ConfigDict + from cloudai import BaseJob, System -from cloudai.util import CommandShell +from cloudai.util.command_shell import CommandShell -class StandaloneSystem(System): +class StandaloneSystem(BaseModel, System): """ - Represents a standalone system without a job scheduler. + Class representing a Standalone system. - Attributes - cmd_shell (CommandShell): An instance of CommandShell for executing system commands. + This class is used for systems that execute commands directly without a job scheduler. """ - def __init__(self, name: str, output_path: Path) -> None: - """ - Initialize a StandaloneSystem instance. - - Args: - name (str): Name of the standalone system. - output_path (Path): Path to the output directory. - """ - super().__init__(name, "standalone", Path(), output_path, {}) - self.cmd_shell = CommandShell() - - def __repr__(self) -> str: - """ - Provide a string representation of the StandaloneSystem instance. + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) - Returns - str: String representation of the standalone system including its name and scheduler type. - """ - return f"StandaloneSystem(name={self.name}, scheduler={self.scheduler})" + name: str + install_path: Path + output_path: Path + scheduler: str = "standalone" + monitor_interval: int = 1 + cmd_shell: CommandShell = CommandShell() def update(self) -> None: """ diff --git a/src/cloudai/parser/__init__.py b/tests/conftest.py similarity index 57% rename from src/cloudai/parser/__init__.py rename to tests/conftest.py index a2ad3218..37b46099 100644 --- a/src/cloudai/parser/__init__.py +++ b/tests/conftest.py @@ -14,10 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .system_parser.slurm_system_parser import SlurmSystemParser -from .system_parser.standalone_system_parser import StandaloneSystemParser +from pathlib import Path -__all__ = [ - "StandaloneSystemParser", - "SlurmSystemParser", -] +import pytest +from cloudai.systems.slurm.slurm_system import SlurmPartition, SlurmSystem + + +@pytest.fixture +def slurm_system() -> SlurmSystem: + system = SlurmSystem( + name="test_system", + install_path=Path("/fake/path"), + output_path=Path("/fake/output"), + default_partition="main", + partitions=[ + SlurmPartition(name="main", nodes=["node-[033-064]"]), + SlurmPartition(name="backup", nodes=["node0[1-8]"]), + ], + ) + return system diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index f69895d0..39cfd973 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -24,7 +24,6 @@ from cloudai.__main__ import handle_dry_run_and_run, setup_logging from cloudai._core.base_installer import BaseInstaller from cloudai.systems import SlurmSystem -from cloudai.systems.slurm import SlurmNode, SlurmNodeState SLURM_TEST_SCENARIOS = [ {"path": Path("conf/common/test_scenario/sleep.toml"), "expected_dirs_number": 4, "log_file": "sleep_debug.log"}, @@ -69,23 +68,6 @@ def test_slurm(tmp_path: Path, scenario: Dict): assert log_file_path.exists(), f"Log file {log_file_path} was not created" -@pytest.fixture -def slurm_system() -> SlurmSystem: - nodes = [SlurmNode(name=f"node-0{i}", partition="main", state=SlurmNodeState.UNKNOWN_STATE) for i in range(33, 65)] - backup_nodes = [ - SlurmNode(name=f"node0{i}", partition="backup", state=SlurmNodeState.UNKNOWN_STATE) for i in range(1, 9) - ] - - system = SlurmSystem( - name="test_system", - install_path=Path("/fake/path"), - output_path=Path("/fake/output"), - default_partition="main", - partitions={"main": nodes, "backup": backup_nodes}, - ) - return system - - @pytest.fixture def test_template_success() -> TestTemplate: template = MagicMock(spec=TestTemplate) diff --git a/tests/test_base_installer.py b/tests/test_base_installer.py index 0b84d83a..ea9fb617 100644 --- a/tests/test_base_installer.py +++ b/tests/test_base_installer.py @@ -15,7 +15,6 @@ # limitations under the License. from concurrent.futures import Future -from pathlib import Path from unittest.mock import MagicMock, Mock, patch import pytest @@ -23,24 +22,6 @@ from cloudai._core.base_installer import BaseInstaller from cloudai._core.test import Test from cloudai.systems import SlurmSystem -from cloudai.systems.slurm import SlurmNode, SlurmNodeState - - -@pytest.fixture -def slurm_system() -> SlurmSystem: - nodes = [SlurmNode(name=f"node-0{i}", partition="main", state=SlurmNodeState.UNKNOWN_STATE) for i in range(33, 65)] - backup_nodes = [ - SlurmNode(name=f"node0{i}", partition="backup", state=SlurmNodeState.UNKNOWN_STATE) for i in range(1, 9) - ] - - system = SlurmSystem( - name="test_system", - install_path=Path("/fake/path"), - output_path=Path("/fake/output"), - default_partition="main", - partitions={"main": nodes, "backup": backup_nodes}, - ) - return system @pytest.fixture diff --git a/tests/test_init.py b/tests/test_init.py index 25a7b51d..07a842e6 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -70,8 +70,8 @@ from cloudai.systems.standalone_system import StandaloneSystem -def test_system_parsers(): - parsers = Registry().system_parsers_map.keys() +def test_systems(): + parsers = Registry().systems_map.keys() assert "standalone" in parsers assert "slurm" in parsers assert "kubernetes" in parsers diff --git a/tests/test_job_submission_error.py b/tests/test_job_submission_error.py index 6785c23e..e09fe984 100644 --- a/tests/test_job_submission_error.py +++ b/tests/test_job_submission_error.py @@ -15,6 +15,7 @@ # limitations under the License. import subprocess +from pathlib import Path from unittest.mock import MagicMock, Mock import pytest @@ -24,7 +25,7 @@ from cloudai._core.test_scenario import TestRun, TestScenario from cloudai.runner.slurm.slurm_runner import SlurmRunner from cloudai.systems import SlurmSystem -from cloudai.systems.slurm import SlurmNode, SlurmNodeState +from cloudai.systems.slurm.slurm_system import SlurmPartition from cloudai.util import CommandShell @@ -58,17 +59,13 @@ def get_job_id(self, stdout, stderr): @pytest.fixture -def slurm_system(tmpdir): - nodes = [ - SlurmNode(name="nodeA001", partition="main", state=SlurmNodeState.UNKNOWN_STATE), - SlurmNode(name="nodeB001", partition="main", state=SlurmNodeState.UNKNOWN_STATE), - ] +def slurm_system(tmp_path: Path): system = SlurmSystem( name="test_system", - install_path=tmpdir, - output_path=tmpdir, + install_path=tmp_path, + output_path=tmp_path, default_partition="main", - partitions={"main": nodes}, + partitions=[SlurmPartition(name="main", nodes=["nodeA001", "nodeB001"])], ) return system diff --git a/tests/test_parser.py b/tests/test_parser.py index d2c1004e..89707a3e 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -15,30 +15,31 @@ # limitations under the License. from pathlib import Path +from typing import cast from unittest.mock import Mock, patch import pytest from cloudai import Parser +from cloudai.systems.slurm.slurm_system import SlurmSystem +from pydantic_core import ErrorDetails class Test_Parser: @pytest.fixture() def parser(self, tmp_path: Path) -> Parser: - system = tmp_path / "system.toml" + system = Path.cwd() / "conf" / "common" / "system" / "standalone_system.toml" templates_dir = tmp_path / "templates" return Parser(system, templates_dir) def test_no_tests_dir(self, parser: Parser): - tests_dir = parser.system_config_path.parent / "tests" + tests_dir = parser.test_template_path.parent / "tests" with pytest.raises(FileNotFoundError) as exc_info: parser.parse(tests_dir, None) assert "Test path" in str(exc_info.value) - @patch("cloudai._core.system_parser.SystemParser.parse") @patch("cloudai._core.test_parser.TestParser.parse_all") - def test_no_scenario(self, test_parser: Mock, _, parser: Parser): - tests_dir = parser.system_config_path.parent / "tests" - tests_dir.mkdir() + def test_no_scenario(self, test_parser: Mock, parser: Parser): + tests_dir = parser.system_config_path.parent.parent / "test" fake_tests = [] for i in range(3): fake_tests.append(Mock()) @@ -50,12 +51,10 @@ def test_no_scenario(self, test_parser: Mock, _, parser: Parser): _, tests, _ = parser.parse(tests_dir, None) assert len(tests) == 3 - @patch("cloudai._core.system_parser.SystemParser.parse") @patch("cloudai._core.test_parser.TestParser.parse_all") @patch("cloudai._core.test_scenario_parser.TestScenarioParser.parse") - def test_scenario_filters_tests(self, test_scenario_parser: Mock, test_parser: Mock, _, parser: Parser): - tests_dir = parser.system_config_path.parent / "tests" - tests_dir.mkdir() + def test_scenario_filters_tests(self, test_scenario_parser: Mock, test_parser: Mock, parser: Parser): + tests_dir = parser.system_config_path.parent.parent / "test" fake_tests = [] for i in range(3): fake_tests.append(Mock()) @@ -67,3 +66,51 @@ def test_scenario_filters_tests(self, test_scenario_parser: Mock, test_parser: M test_scenario_parser.return_value = fake_scenario _, tests, _ = parser.parse(tests_dir, Path()) assert len(tests) == 1 + + def test_parse_system(self, parser: Parser): + parser.system_config_path = Path("conf/common/system/example_slurm_cluster.toml") + system = cast(SlurmSystem, parser.parse_system(parser.system_config_path)) + + assert len(system.partitions) == 2 + names = [partition.name for partition in system.partitions] + assert "partition_1" in names + assert "partition_2" in names + + assert len(system.groups) == 2 + assert "partition_1" in system.groups + assert "partition_2" in system.groups + + # checking number of nodes in each partition + assert len(system.partitions[0].slurm_nodes) == 100 + assert len(system.partitions[1].slurm_nodes) == 100 + + # checking groups + assert len(system.groups["partition_2"]) == 0 + assert len(system.groups["partition_1"]) == 4 + assert "group_1" in system.groups["partition_1"] + assert "group_2" in system.groups["partition_1"] + assert "group_3" in system.groups["partition_1"] + assert "group_4" in system.groups["partition_1"] + + # checking number of nodes in each group + assert len(system.groups["partition_1"]["group_1"]) == 25 + assert len(system.groups["partition_1"]["group_2"]) == 25 + assert len(system.groups["partition_1"]["group_3"]) == 25 + assert len(system.groups["partition_1"]["group_4"]) == 25 + + @pytest.mark.parametrize( + "error, expected_msg", + [ + ( + ErrorDetails(type="missing", loc=("field",), msg="Field required", input=None), + "Field 'field': Field required", + ), + ( + ErrorDetails(type="value_error", loc=("field", "subf"), msg="Invalid field", input="value"), + "Field 'field.subf' with value 'value' is invalid: Invalid field", + ), + ], + ) + def test_log_validation_errors_with_required_field_error(self, error: ErrorDetails, expected_msg: str): + err_msg = Parser.format_validation_error(error) + assert err_msg == expected_msg diff --git a/tests/test_registry.py b/tests/test_registry.py index f0257236..832dc21d 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -18,50 +18,15 @@ from cloudai import JobIdRetrievalStrategy, JobStatusRetrievalStrategy, ReportGenerationStrategy, System, TestTemplate from cloudai._core.base_installer import BaseInstaller from cloudai._core.base_runner import BaseRunner -from cloudai._core.base_system_parser import BaseSystemParser from cloudai._core.registry import Registry from cloudai._core.test_template_strategy import TestTemplateStrategy -class MySystemParser(BaseSystemParser): - pass - - -class AnotherSystemParser(BaseSystemParser): - pass - - @pytest.fixture def registry(): return Registry() -class TestRegistry__SystemParsersMap: - """This test verifies Registry class functionality. - - Since Registry is a Singleton, the order of cases is important. - Only covers the system_parsers_map attribute. - """ - - def test_add_system(self, registry: Registry): - registry.add_system_parser("system", MySystemParser) - assert registry.system_parsers_map["system"] == MySystemParser - - def test_add_system_duplicate(self, registry: Registry): - with pytest.raises(ValueError) as exc_info: - registry.add_system_parser("system", MySystemParser) - assert "Duplicating implementation for 'system'" in str(exc_info.value) - - def test_update_system(self, registry: Registry): - registry.update_system_parser("system", AnotherSystemParser) - assert registry.system_parsers_map["system"] == AnotherSystemParser - - def test_invalid_type(self, registry: Registry): - with pytest.raises(ValueError) as exc_info: - registry.update_system_parser("TestSystem", str) # pyright: ignore - assert "Invalid system implementation for 'TestSystem'" in str(exc_info.value) - - class MyRunner(BaseRunner): pass diff --git a/tests/test_slurm_command_gen_strategy.py b/tests/test_slurm_command_gen_strategy.py index c30f9852..df2656ff 100644 --- a/tests/test_slurm_command_gen_strategy.py +++ b/tests/test_slurm_command_gen_strategy.py @@ -25,7 +25,8 @@ NeMoLauncherSlurmCommandGenStrategy, ) from cloudai.systems import SlurmSystem -from cloudai.systems.slurm import SlurmNode, SlurmNodeState +from cloudai.systems.slurm import SlurmNodeState +from cloudai.systems.slurm.slurm_system import SlurmPartition from cloudai.systems.slurm.strategy import SlurmCommandGenStrategy @@ -36,19 +37,15 @@ def slurm_system(tmp_path: Path) -> SlurmSystem: install_path=tmp_path / "install", output_path=tmp_path / "output", default_partition="main", - extra_srun_args="", - partitions={ - "main": [ - SlurmNode(name="node1", partition="main", state=SlurmNodeState.IDLE), - SlurmNode(name="node2", partition="main", state=SlurmNodeState.IDLE), - SlurmNode(name="node3", partition="main", state=SlurmNodeState.IDLE), - SlurmNode(name="node4", partition="main", state=SlurmNodeState.IDLE), - ] - }, + partitions=[ + SlurmPartition(name="main", nodes=["node[1-4]"]), + ], mpi="fake-mpi", ) - slurm_system.install_path.mkdir() - slurm_system.output_path.mkdir() + for node in slurm_system.partitions[0].slurm_nodes: + node.state = SlurmNodeState.IDLE + Path(slurm_system.install_path).mkdir() + Path(slurm_system.output_path).mkdir() return slurm_system diff --git a/tests/test_slurm_install_strategy.py b/tests/test_slurm_install_strategy.py index f96b2dc7..5e41d68a 100644 --- a/tests/test_slurm_install_strategy.py +++ b/tests/test_slurm_install_strategy.py @@ -26,7 +26,8 @@ ) from cloudai.schema.test_template.ucc_test.slurm_install_strategy import UCCTestSlurmInstallStrategy from cloudai.systems import SlurmSystem -from cloudai.systems.slurm import SlurmNode, SlurmNodeState +from cloudai.systems.slurm import SlurmNodeState +from cloudai.systems.slurm.slurm_system import SlurmPartition from cloudai.systems.slurm.strategy import SlurmInstallStrategy @@ -37,15 +38,13 @@ def slurm_system(tmp_path: Path) -> SlurmSystem: install_path=tmp_path / "install", output_path=tmp_path / "output", default_partition="main", - partitions={ - "main": [ - SlurmNode(name="node1", partition="main", state=SlurmNodeState.IDLE), - SlurmNode(name="node2", partition="main", state=SlurmNodeState.IDLE), - SlurmNode(name="node3", partition="main", state=SlurmNodeState.IDLE), - SlurmNode(name="node4", partition="main", state=SlurmNodeState.IDLE), - ] - }, + partitions=[ + SlurmPartition(name="main", nodes=["node[1-4]"]), + ], + mpi="fake-mpi", ) + for node in slurm_system.partitions[0].slurm_nodes: + node.state = SlurmNodeState.IDLE Path(slurm_system.install_path).mkdir() Path(slurm_system.output_path).mkdir() return slurm_system diff --git a/tests/test_slurm_system.py b/tests/test_slurm_system.py index f802bb5b..312f4670 100644 --- a/tests/test_slurm_system.py +++ b/tests/test_slurm_system.py @@ -14,30 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pathlib import Path from typing import List from unittest.mock import patch import pytest from cloudai.systems import SlurmSystem -from cloudai.systems.slurm import SlurmNode, SlurmNodeState - - -@pytest.fixture -def slurm_system(): - nodes = [SlurmNode(name=f"node-0{i}", partition="main", state=SlurmNodeState.UNKNOWN_STATE) for i in range(33, 65)] - backup_nodes = [ - SlurmNode(name=f"node0{i}", partition="backup", state=SlurmNodeState.UNKNOWN_STATE) for i in range(1, 9) - ] - - system = SlurmSystem( - name="test_system", - install_path=Path("/fake/path"), - output_path=Path("/fake/output"), - default_partition="main", - partitions={"main": nodes, "backup": backup_nodes}, - ) - return system +from cloudai.systems.slurm import SlurmNodeState +from cloudai.systems.slurm.slurm_system import parse_node_list def test_parse_squeue_output(slurm_system): @@ -57,7 +40,7 @@ def test_parse_squeue_output_with_node_ranges_and_root_user(slurm_system): assert user_map == expected_map, "All nodes should be mapped to 'root'" -def test_parse_sinfo_output(slurm_system): +def test_parse_sinfo_output(slurm_system: SlurmSystem) -> None: sinfo_output = """PARTITION AVAIL TIMELIMIT NODES STATE NODELIST main up 3:00:00 1 inval node-036 main up 3:00:00 5 drain node-[045-046,059,061-062] @@ -90,7 +73,10 @@ def test_parse_sinfo_output(slurm_system): inval_nodes = set(["node-036"]) drain_nodes = set(["node-045", "node-046", "node-059", "node-061", "node-062"]) resv_nodes = set(["node-034", "node-035"]) - for node in slurm_system.partitions["main"]: + + parts_by_name = {part.name: part for part in slurm_system.partitions} + + for node in parts_by_name["main"].slurm_nodes: if node.name in inval_nodes: assert node.state == SlurmNodeState.INVALID_REGISTRATION elif node.name in drain_nodes: @@ -98,9 +84,8 @@ def test_parse_sinfo_output(slurm_system): elif node.name in resv_nodes: assert node.state == SlurmNodeState.RESERVED else: - print("node :", node) assert node.state == SlurmNodeState.ALLOCATED - for node in slurm_system.partitions["backup"]: + for node in parts_by_name["backup"].slurm_nodes: assert node.state == SlurmNodeState.IDLE @@ -110,8 +95,10 @@ def test_update_node_states_with_mocked_outputs(mock_get_sinfo, mock_get_squeue, mock_get_squeue.return_value = "node-115|user1" mock_get_sinfo.return_value = "PARTITION AVAIL TIMELIMIT NODES STATE NODELIST\n" "main up infinite 1 idle node-115" + parts_by_name = {part.name: part for part in slurm_system.partitions} + slurm_system.update_node_states() - for node in slurm_system.partitions["main"]: + for node in parts_by_name["main"].slurm_nodes: if node.name == "node-115": assert node.state == SlurmNodeState.IDLE assert node.user == "user1" @@ -122,7 +109,7 @@ def test_update_node_states_with_mocked_outputs(mock_get_sinfo, mock_get_squeue, ) slurm_system.update_node_states() - for node in slurm_system.partitions["backup"]: + for node in parts_by_name["backup"].slurm_nodes: if node.name == "node01": assert node.state == SlurmNodeState.ALLOCATED assert node.user == "root" @@ -152,6 +139,6 @@ def test_update_node_states_with_mocked_outputs(mock_get_sinfo, mock_get_squeue, ), ], ) -def test_parse_node_list(node_list: str, expected_parsed_node_list: List[str], slurm_system): - parsed_node_list = slurm_system.parse_node_list(node_list) +def test_parse_node_list(node_list: str, expected_parsed_node_list: List[str]): + parsed_node_list = parse_node_list(node_list) assert parsed_node_list == expected_parsed_node_list diff --git a/tests/test_slurm_system_parser.py b/tests/test_slurm_system_parser.py deleted file mode 100644 index d4d274a1..00000000 --- a/tests/test_slurm_system_parser.py +++ /dev/null @@ -1,115 +0,0 @@ -# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pathlib import Path -from typing import Any, Dict - -import pytest -from cloudai.parser.system_parser.slurm_system_parser import SlurmSystemParser -from cloudai.systems.slurm import SlurmSystem - - -@pytest.fixture -def example_data() -> Dict[str, Any]: - return { - "name": "test_system", - "install_path": "/fake/path", - "output_path": "/fake/output", - "default_partition": "main", - "partitions": { - "main": { - "name": "main", - "nodes": ["node-[033-034]"], - "groups": {"group1": {"name": "group1", "nodes": ["node-033"]}}, - }, - "backup": { - "name": "backup", - "nodes": ["node-[01-02]"], - "groups": {"group2": {"name": "group2", "nodes": ["node-01"]}}, - }, - }, - "cache_docker_images_locally": "True", - } - - -@pytest.mark.parametrize( - "mpi_value, expected_mpi", - [ - ("pmix", "pmix"), - ("pmi2", "pmi2"), - ("", "pmix"), - ], -) -def test_parse_slurm_system_parser_with_mpi(example_data, mpi_value, expected_mpi): - if mpi_value: - example_data["mpi"] = mpi_value - - parser = SlurmSystemParser() - slurm_system = parser.parse(example_data) - - assert isinstance(slurm_system, SlurmSystem) - assert slurm_system.name == "test_system" - assert slurm_system.install_path == Path("/fake/path") - assert slurm_system.output_path == Path("/fake/output") - assert slurm_system.default_partition == "main" - assert slurm_system.cache_docker_images_locally is True - assert "main" in slurm_system.partitions - assert "backup" in slurm_system.partitions - assert "group1" in slurm_system.groups["main"] - assert "group2" in slurm_system.groups["backup"] - assert slurm_system.mpi == expected_mpi - - -@pytest.mark.parametrize( - "input_value, expected_result", - [ - ("True", True), - ("False", False), - ("true", True), - ("false", False), - ("1", True), - ("0", False), - ("yes", True), - ("no", False), - (True, True), - (False, False), - ], -) -def test_str_to_bool_conversion(input_value, expected_result): - parser = SlurmSystemParser() - result = parser.parse( - { - "name": "test_system", - "install_path": "/fake/path", - "output_path": "/fake/output", - "default_partition": "main", - "partitions": { - "main": { - "name": "main", - "nodes": ["node-[033-034]"], - "groups": {"group1": {"name": "group1", "nodes": ["node-033"]}}, - }, - "backup": { - "name": "backup", - "nodes": ["node-[01-02]"], - "groups": {"group2": {"name": "group2", "nodes": ["node-01"]}}, - }, - }, - "cache_docker_images_locally": input_value, - } - ) - - assert result.cache_docker_images_locally == expected_result diff --git a/tests/test_standalone_system.py b/tests/test_standalone_system.py index 8524407f..7f1e8dbc 100644 --- a/tests/test_standalone_system.py +++ b/tests/test_standalone_system.py @@ -31,7 +31,11 @@ def standalone_system(): Returns: StandaloneSystem: A new instance of StandaloneSystem for testing. """ - return StandaloneSystem("StandaloneTestSystem", Path("/fake/output/path")) + return StandaloneSystem( + name="StandaloneTestSystem", + install_path=Path("/fake/install/path"), + output_path=Path("/fake/output/path"), + ) @pytest.fixture @@ -103,14 +107,3 @@ def test_kill_job(mock_execute, standalone_system, standalone_job): kill_command = f"kill -9 {standalone_job.id}" mock_execute.assert_called_once_with(kill_command) - - -def test_repr(standalone_system): - """ - Test the string representation of StandaloneSystem. - - Args: - standalone_system (StandaloneSystem): Instance of the system under test. - """ - expected_repr = f"StandaloneSystem(name={standalone_system.name}, scheduler=standalone)" - assert repr(standalone_system) == expected_repr diff --git a/tests/test_toml_files.py b/tests/test_toml_files.py index 5a755566..7726fb93 100644 --- a/tests/test_toml_files.py +++ b/tests/test_toml_files.py @@ -18,6 +18,7 @@ import pytest import toml +from cloudai import Parser TOML_FILES = list(Path("conf").glob("**/*.toml")) @@ -32,3 +33,18 @@ def test_toml_files(toml_file: Path): """ with toml_file.open("r") as f: assert toml.load(f) is not None + + +ALL_SYSTEMS = [p for p in Path("conf/").glob("**/*.toml") if "scheduler =" in p.read_text()] + + +@pytest.mark.parametrize("system_file", ALL_SYSTEMS, ids=lambda x: str(x)) +def test_systems(system_file: Path): + """ + Validate the syntax of a system configuration file. + + Args: + system_file (Path): The path to the system configuration file to validate. + """ + system = Parser(system_file, Path("conf/test_template")).parse_system(system_file) + assert system is not None