Skip to content

Commit

Permalink
Add Kubernetes runner
Browse files Browse the repository at this point in the history
Co-authored-by: Peng Wang <pengwang@nvidia.com>
  • Loading branch information
TaekyungHeo and wpeng102 committed Aug 29, 2024
1 parent a9effd6 commit e8c1139
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/cloudai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ._core.job_id_retrieval_strategy import JobIdRetrievalStrategy
from ._core.job_status_result import JobStatusResult
from ._core.job_status_retrieval_strategy import JobStatusRetrievalStrategy
from ._core.json_gen_strategy import JsonGenStrategy
from ._core.parser import Parser
from ._core.registry import Registry
from ._core.report_generation_strategy import ReportGenerationStrategy
Expand All @@ -43,6 +44,7 @@
from .parser.system_parser.slurm_system_parser import SlurmSystemParser
from .parser.system_parser.standalone_system_parser import StandaloneSystemParser
from .report_generator import ReportGenerator
from .runner.kubernetes.kubernetes_runner import KubernetesRunner
from .runner.slurm.slurm_runner import SlurmRunner
from .runner.standalone.standalone_runner import StandaloneRunner
from .schema.test_template.chakra_replay.grading_strategy import ChakraReplayGradingStrategy
Expand Down Expand Up @@ -92,6 +94,7 @@
Registry().add_system_parser("kubernetes", KubernetesSystemParser)

Registry().add_runner("slurm", SlurmRunner)
Registry().add_runner("kubernetes", KubernetesRunner)
Registry().add_runner("standalone", StandaloneRunner)

Registry().add_strategy(InstallStrategy, [SlurmSystem], [NcclTest], NcclTestSlurmInstallStrategy)
Expand Down Expand Up @@ -156,6 +159,7 @@
"BaseRunner",
"BaseSystemParser",
"CommandGenStrategy",
"JsonGenStrategy",
"Grader",
"GradingStrategy",
"Installer",
Expand Down
59 changes: 59 additions & 0 deletions src/cloudai/_core/json_gen_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from pathlib import Path
from typing import Any, Dict, List

from .test_template_strategy import TestTemplateStrategy


class JsonGenStrategy(TestTemplateStrategy):
"""
Abstract base class for generating Kubernetes job specifications based on system and test parameters.
It specifies how to generate JSON job specifications based on system and test parameters.
"""

@abstractmethod
def gen_json(
self,
env_vars: Dict[str, str],
cmd_args: Dict[str, str],
extra_env_vars: Dict[str, str],
extra_cmd_args: str,
output_path: Path,
job_name: str,
num_nodes: int,
nodes: List[str],
) -> Dict[Any, Any]:
"""
Generate the Kubernetes job specification based on the given parameters.
Args:
env_vars (Dict[str, str]): Environment variables for the job.
cmd_args (Dict[str, str]): Command-line arguments for the job.
extra_env_vars (Dict[str, str]): Additional environment variables.
extra_cmd_args (str): Additional command-line arguments.
output_path (Path): Path to the output directory.
job_name (str): The name of the job.
num_nodes (int): The number of nodes to be used for job execution.
nodes (List[str]): List of nodes for job execution, optional.
Returns:
Dict[Any, Any]: The generated Kubernetes job specification in JSON format.
"""
pass
27 changes: 26 additions & 1 deletion src/cloudai/_core/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import sys
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from .job_status_result import JobStatusResult
from .test_template import TestTemplate
Expand Down Expand Up @@ -150,6 +150,31 @@ def gen_exec_command(self, output_path: Path) -> str:
self.nodes,
)

def gen_json(self, output_path: Path, job_name: str) -> Dict[Any, Any]:
"""
Generate the JSON string for the Kubernetes job specification for this specific test.
Args:
output_path (Path): Path to the output directory.
job_name (str): The name of the job.
Returns:
Dict[Any, Any]: A dictionary representing the Kubernetes job specification.
"""
if self.time_limit is not None:
self.cmd_args["time_limit"] = self.time_limit

return self.test_template.gen_json(
self.env_vars,
self.cmd_args,
self.extra_env_vars,
self.extra_cmd_args,
output_path,
job_name,
self.num_nodes,
self.nodes,
)

def get_job_id(self, stdout: str, stderr: str) -> Optional[int]:
"""
Retrieve the job ID using the test template's method.
Expand Down
48 changes: 48 additions & 0 deletions src/cloudai/_core/test_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .job_id_retrieval_strategy import JobIdRetrievalStrategy
from .job_status_result import JobStatusResult
from .job_status_retrieval_strategy import JobStatusRetrievalStrategy
from .json_gen_strategy import JsonGenStrategy
from .report_generation_strategy import ReportGenerationStrategy
from .system import System

Expand All @@ -42,6 +43,7 @@ class TestTemplate:
logger (logging.Logger): Logger for the test template.
install_strategy (InstallStrategy): Strategy for installing test prerequisites.
command_gen_strategy (CommandGenStrategy): Strategy for generating execution commands.
json_gen_strategy (JsonGenStrategy): Strategy for generating json string.
job_id_retrieval_strategy (JobIdRetrievalStrategy): Strategy for retrieving job IDs.
report_generation_strategy (ReportGenerationStrategy): Strategy for generating reports.
grading_strategy (GradingStrategy): Strategy for grading performance based on test outcomes.
Expand Down Expand Up @@ -72,6 +74,7 @@ def __init__(
self.cmd_args = cmd_args
self.install_strategy: Optional[InstallStrategy] = None
self.command_gen_strategy: Optional[CommandGenStrategy] = None
self.json_gen_strategy: Optional[JsonGenStrategy] = None
self.job_id_retrieval_strategy: Optional[JobIdRetrievalStrategy] = None
self.job_status_retrieval_strategy: Optional[JobStatusRetrievalStrategy] = None
self.report_generation_strategy: Optional[ReportGenerationStrategy] = None
Expand Down Expand Up @@ -166,6 +169,51 @@ def gen_exec_command(
nodes,
)

def gen_json(
self,
env_vars: Dict[str, str],
cmd_args: Dict[str, str],
extra_env_vars: Dict[str, str],
extra_cmd_args: str,
output_path: Path,
job_name: str,
num_nodes: int,
nodes: List[str],
) -> Dict[Any, Any]:
"""
Generate a JSON string representing the Kubernetes job specification for this test using this template.
Args:
env_vars (Dict[str, str]): Environment variables for the test.
cmd_args (Dict[str, str]): Command-line arguments for the test.
extra_env_vars (Dict[str, str]): Extra environment variables.
extra_cmd_args (str): Extra command-line arguments.
output_path (Path): Path to the output directory.
job_name (str): The name of the job.
num_nodes (int): The number of nodes to be used for the test execution.
nodes (List[str]): A list of nodes where the test will be executed.
Returns:
Dict[Any, Any]: A dictionary representing the Kubernetes job specification.
"""
if not nodes:
nodes = []
if self.json_gen_strategy is None:
raise ValueError(
"json_gen_strategy is missing. Ensure the strategy is registered in the Registry "
"by calling the appropriate registration function for the system type."
)
return self.json_gen_strategy.gen_json(
env_vars,
cmd_args,
extra_env_vars,
extra_cmd_args,
output_path,
job_name,
num_nodes,
nodes,
)

def get_job_id(self, stdout: str, stderr: str) -> Optional[int]:
"""
Retrieve the job ID from the execution output using the job ID retrieval strategy.
Expand Down
5 changes: 5 additions & 0 deletions src/cloudai/_core/test_template_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .install_strategy import InstallStrategy
from .job_id_retrieval_strategy import JobIdRetrievalStrategy
from .job_status_retrieval_strategy import JobStatusRetrievalStrategy
from .json_gen_strategy import JsonGenStrategy
from .registry import Registry
from .report_generation_strategy import ReportGenerationStrategy
from .system import System
Expand Down Expand Up @@ -127,6 +128,10 @@ def _parse_data(self, data: Dict[str, Any]) -> TestTemplate:
CommandGenStrategy,
self._fetch_strategy(CommandGenStrategy, type(obj.system), type(obj), env_vars, cmd_args),
)
obj.json_gen_strategy = cast(
JsonGenStrategy,
self._fetch_strategy(JsonGenStrategy, type(obj.system), type(obj), env_vars, cmd_args),
)
obj.job_id_retrieval_strategy = cast(
JobIdRetrievalStrategy,
self._fetch_strategy(JobIdRetrievalStrategy, type(obj.system), type(obj), env_vars, cmd_args),
Expand Down
15 changes: 15 additions & 0 deletions src/cloudai/runner/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
63 changes: 63 additions & 0 deletions src/cloudai/runner/kubernetes/kubernetes_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import cast

from cloudai import BaseJob, BaseRunner, Test
from cloudai.systems import KubernetesSystem

from .kubernetes_job import KubernetesJob


class KubernetesRunner(BaseRunner):
"""Implementation of the Runner for a system using Kubernetes."""

def _submit_test(self, test: Test) -> KubernetesJob:
"""
Submit a test for execution on Kubernetes and return a KubernetesJob object.
Args:
test (Test): The test to be executed.
Returns:
KubernetesJob: A KubernetesJob object containing job details.
"""
logging.info(f"Running test: {test.section_name}")
job_output_path = self.get_job_output_path(test)
job_name = test.section_name.replace(".", "-").lower()
job_spec = test.gen_json(job_output_path, job_name)
job_kind = job_spec.get("kind", "").lower()
logging.info(f"Generated JSON string for test {test.section_name}: {job_spec}")
job_namespace = ""

if self.mode == "run":
k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system)
job_name, job_namespace = k8s_system.create_job(job_spec)

return KubernetesJob(self.mode, self.system, test, job_namespace, job_name, job_kind, job_output_path)

def kill_job(self, job: BaseJob) -> None:
"""
Terminate a Kubernetes job.
Args:
job (BaseJob): The job to be terminated, casted to KubernetesJob.
"""
k8s_system = cast(KubernetesSystem, self.system)
k_job = cast(KubernetesJob, job)
k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system)
k8s_system.delete_job(k_job.get_namespace(), k_job.get_name(), k_job.get_kind())
3 changes: 2 additions & 1 deletion tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def test_runners():
runners = Registry().runners_map.keys()
assert "standalone" in runners
assert "slurm" in runners
assert len(runners) == 2
assert "kubernetes" in runners
assert len(runners) == 3


@pytest.mark.parametrize(
Expand Down

0 comments on commit e8c1139

Please sign in to comment.