diff --git a/src/cloudai/__init__.py b/src/cloudai/__init__.py index 1c9d63a5..f702fb47 100644 --- a/src/cloudai/__init__.py +++ b/src/cloudai/__init__.py @@ -26,6 +26,7 @@ from ._core.job_id_retrieval_strategy import JobIdRetrievalStrategy from ._core.job_status_result import JobStatusResult from ._core.job_status_retrieval_strategy import JobStatusRetrievalStrategy +from ._core.json_gen_strategy import JsonGenStrategy from ._core.parser import Parser from ._core.registry import Registry from ._core.report_generation_strategy import ReportGenerationStrategy @@ -43,6 +44,7 @@ from .parser.system_parser.slurm_system_parser import SlurmSystemParser from .parser.system_parser.standalone_system_parser import StandaloneSystemParser from .report_generator import ReportGenerator +from .runner.kubernetes.kubernetes_runner import KubernetesRunner from .runner.slurm.slurm_runner import SlurmRunner from .runner.standalone.standalone_runner import StandaloneRunner from .schema.test_template.chakra_replay.grading_strategy import ChakraReplayGradingStrategy @@ -92,6 +94,7 @@ Registry().add_system_parser("kubernetes", KubernetesSystemParser) Registry().add_runner("slurm", SlurmRunner) +Registry().add_runner("kubernetes", KubernetesRunner) Registry().add_runner("standalone", StandaloneRunner) Registry().add_strategy(InstallStrategy, [SlurmSystem], [NcclTest], NcclTestSlurmInstallStrategy) @@ -156,6 +159,7 @@ "BaseRunner", "BaseSystemParser", "CommandGenStrategy", + "JsonGenStrategy", "Grader", "GradingStrategy", "Installer", diff --git a/src/cloudai/_core/json_gen_strategy.py b/src/cloudai/_core/json_gen_strategy.py new file mode 100644 index 00000000..42e3f68a --- /dev/null +++ b/src/cloudai/_core/json_gen_strategy.py @@ -0,0 +1,59 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import abstractmethod +from pathlib import Path +from typing import Any, Dict, List + +from .test_template_strategy import TestTemplateStrategy + + +class JsonGenStrategy(TestTemplateStrategy): + """ + Abstract base class for generating Kubernetes job specifications based on system and test parameters. + + It specifies how to generate JSON job specifications based on system and test parameters. + """ + + @abstractmethod + def gen_json( + self, + env_vars: Dict[str, str], + cmd_args: Dict[str, str], + extra_env_vars: Dict[str, str], + extra_cmd_args: str, + output_path: Path, + job_name: str, + num_nodes: int, + nodes: List[str], + ) -> Dict[Any, Any]: + """ + Generate the Kubernetes job specification based on the given parameters. + + Args: + env_vars (Dict[str, str]): Environment variables for the job. + cmd_args (Dict[str, str]): Command-line arguments for the job. + extra_env_vars (Dict[str, str]): Additional environment variables. + extra_cmd_args (str): Additional command-line arguments. + output_path (Path): Path to the output directory. + job_name (str): The name of the job. + num_nodes (int): The number of nodes to be used for job execution. + nodes (List[str]): List of nodes for job execution, optional. + + Returns: + Dict[Any, Any]: The generated Kubernetes job specification in JSON format. + """ + pass diff --git a/src/cloudai/_core/test.py b/src/cloudai/_core/test.py index 51925fa2..06e43e19 100644 --- a/src/cloudai/_core/test.py +++ b/src/cloudai/_core/test.py @@ -16,7 +16,7 @@ import sys from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from .job_status_result import JobStatusResult from .test_template import TestTemplate @@ -150,6 +150,31 @@ def gen_exec_command(self, output_path: Path) -> str: self.nodes, ) + def gen_json(self, output_path: Path, job_name: str) -> Dict[Any, Any]: + """ + Generate the JSON string for the Kubernetes job specification for this specific test. + + Args: + output_path (Path): Path to the output directory. + job_name (str): The name of the job. + + Returns: + Dict[Any, Any]: A dictionary representing the Kubernetes job specification. + """ + if self.time_limit is not None: + self.cmd_args["time_limit"] = self.time_limit + + return self.test_template.gen_json( + self.env_vars, + self.cmd_args, + self.extra_env_vars, + self.extra_cmd_args, + output_path, + job_name, + self.num_nodes, + self.nodes, + ) + def get_job_id(self, stdout: str, stderr: str) -> Optional[int]: """ Retrieve the job ID using the test template's method. diff --git a/src/cloudai/_core/test_template.py b/src/cloudai/_core/test_template.py index 06d3f1ac..72833715 100644 --- a/src/cloudai/_core/test_template.py +++ b/src/cloudai/_core/test_template.py @@ -24,6 +24,7 @@ from .job_id_retrieval_strategy import JobIdRetrievalStrategy from .job_status_result import JobStatusResult from .job_status_retrieval_strategy import JobStatusRetrievalStrategy +from .json_gen_strategy import JsonGenStrategy from .report_generation_strategy import ReportGenerationStrategy from .system import System @@ -42,6 +43,7 @@ class TestTemplate: logger (logging.Logger): Logger for the test template. install_strategy (InstallStrategy): Strategy for installing test prerequisites. command_gen_strategy (CommandGenStrategy): Strategy for generating execution commands. + json_gen_strategy (JsonGenStrategy): Strategy for generating json string. job_id_retrieval_strategy (JobIdRetrievalStrategy): Strategy for retrieving job IDs. report_generation_strategy (ReportGenerationStrategy): Strategy for generating reports. grading_strategy (GradingStrategy): Strategy for grading performance based on test outcomes. @@ -72,6 +74,7 @@ def __init__( self.cmd_args = cmd_args self.install_strategy: Optional[InstallStrategy] = None self.command_gen_strategy: Optional[CommandGenStrategy] = None + self.json_gen_strategy: Optional[JsonGenStrategy] = None self.job_id_retrieval_strategy: Optional[JobIdRetrievalStrategy] = None self.job_status_retrieval_strategy: Optional[JobStatusRetrievalStrategy] = None self.report_generation_strategy: Optional[ReportGenerationStrategy] = None @@ -166,6 +169,51 @@ def gen_exec_command( nodes, ) + def gen_json( + self, + env_vars: Dict[str, str], + cmd_args: Dict[str, str], + extra_env_vars: Dict[str, str], + extra_cmd_args: str, + output_path: Path, + job_name: str, + num_nodes: int, + nodes: List[str], + ) -> Dict[Any, Any]: + """ + Generate a JSON string representing the Kubernetes job specification for this test using this template. + + Args: + env_vars (Dict[str, str]): Environment variables for the test. + cmd_args (Dict[str, str]): Command-line arguments for the test. + extra_env_vars (Dict[str, str]): Extra environment variables. + extra_cmd_args (str): Extra command-line arguments. + output_path (Path): Path to the output directory. + job_name (str): The name of the job. + num_nodes (int): The number of nodes to be used for the test execution. + nodes (List[str]): A list of nodes where the test will be executed. + + Returns: + Dict[Any, Any]: A dictionary representing the Kubernetes job specification. + """ + if not nodes: + nodes = [] + if self.json_gen_strategy is None: + raise ValueError( + "json_gen_strategy is missing. Ensure the strategy is registered in the Registry " + "by calling the appropriate registration function for the system type." + ) + return self.json_gen_strategy.gen_json( + env_vars, + cmd_args, + extra_env_vars, + extra_cmd_args, + output_path, + job_name, + num_nodes, + nodes, + ) + def get_job_id(self, stdout: str, stderr: str) -> Optional[int]: """ Retrieve the job ID from the execution output using the job ID retrieval strategy. diff --git a/src/cloudai/_core/test_template_parser.py b/src/cloudai/_core/test_template_parser.py index ed4481ea..d470449f 100644 --- a/src/cloudai/_core/test_template_parser.py +++ b/src/cloudai/_core/test_template_parser.py @@ -24,6 +24,7 @@ from .install_strategy import InstallStrategy from .job_id_retrieval_strategy import JobIdRetrievalStrategy from .job_status_retrieval_strategy import JobStatusRetrievalStrategy +from .json_gen_strategy import JsonGenStrategy from .registry import Registry from .report_generation_strategy import ReportGenerationStrategy from .system import System @@ -127,6 +128,10 @@ def _parse_data(self, data: Dict[str, Any]) -> TestTemplate: CommandGenStrategy, self._fetch_strategy(CommandGenStrategy, type(obj.system), type(obj), env_vars, cmd_args), ) + obj.json_gen_strategy = cast( + JsonGenStrategy, + self._fetch_strategy(JsonGenStrategy, type(obj.system), type(obj), env_vars, cmd_args), + ) obj.job_id_retrieval_strategy = cast( JobIdRetrievalStrategy, self._fetch_strategy(JobIdRetrievalStrategy, type(obj.system), type(obj), env_vars, cmd_args), diff --git a/src/cloudai/runner/kubernetes/__init__.py b/src/cloudai/runner/kubernetes/__init__.py new file mode 100644 index 00000000..23bfc249 --- /dev/null +++ b/src/cloudai/runner/kubernetes/__init__.py @@ -0,0 +1,15 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/cloudai/runner/kubernetes/kubernetes_runner.py b/src/cloudai/runner/kubernetes/kubernetes_runner.py new file mode 100644 index 00000000..3f7c381c --- /dev/null +++ b/src/cloudai/runner/kubernetes/kubernetes_runner.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import cast + +from cloudai import BaseJob, BaseRunner, Test +from cloudai.systems import KubernetesSystem + +from .kubernetes_job import KubernetesJob + + +class KubernetesRunner(BaseRunner): + """Implementation of the Runner for a system using Kubernetes.""" + + def _submit_test(self, test: Test) -> KubernetesJob: + """ + Submit a test for execution on Kubernetes and return a KubernetesJob object. + + Args: + test (Test): The test to be executed. + + Returns: + KubernetesJob: A KubernetesJob object containing job details. + """ + logging.info(f"Running test: {test.section_name}") + job_output_path = self.get_job_output_path(test) + job_name = test.section_name.replace(".", "-").lower() + job_spec = test.gen_json(job_output_path, job_name) + job_kind = job_spec.get("kind", "").lower() + logging.info(f"Generated JSON string for test {test.section_name}: {job_spec}") + job_namespace = "" + + if self.mode == "run": + k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system) + job_name, job_namespace = k8s_system.create_job(job_spec) + + return KubernetesJob(self.mode, self.system, test, job_namespace, job_name, job_kind, job_output_path) + + def kill_job(self, job: BaseJob) -> None: + """ + Terminate a Kubernetes job. + + Args: + job (BaseJob): The job to be terminated, casted to KubernetesJob. + """ + k8s_system = cast(KubernetesSystem, self.system) + k_job = cast(KubernetesJob, job) + k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system) + k8s_system.delete_job(k_job.get_namespace(), k_job.get_name(), k_job.get_kind()) diff --git a/tests/test_init.py b/tests/test_init.py index 88d5323b..a49c7d21 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -78,7 +78,8 @@ def test_runners(): runners = Registry().runners_map.keys() assert "standalone" in runners assert "slurm" in runners - assert len(runners) == 2 + assert "kubernetes" in runners + assert len(runners) == 3 @pytest.mark.parametrize(