Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to Use pathlib.Path for Path-Related Variables #177

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cloudai/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def handle_generate_report(test_scenario: TestScenario, output_dir: Path) -> Non
output_dir (Path): The path to the output directory.
"""
logging.info("Generating report based on system and test scenario")
generator = ReportGenerator(str(output_dir))
generator = ReportGenerator(output_dir)
generator.generate_report(test_scenario)

logging.info("Report generation completed.")
Expand All @@ -274,7 +274,7 @@ def main() -> None:
system, tests, test_scenario = parser.parse(tests_dir, test_scenario_path)

if output_dir:
system.output_path = str(output_dir.absolute())
system.output_path = Path(output_dir.absolute())
system.update()

if args.mode in ["install", "uninstall"]:
Expand Down
103 changes: 92 additions & 11 deletions src/cloudai/_core/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,121 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Union

from .system import System
from .test import Test


class BaseJob:
class BaseJob(ABC):
"""
Base class for representing a job created by executing a test.

Attributes
id (int): The unique identifier of the job.
mode (str): The mode of the job (e.g., 'run', 'dry-run').
system (System): The system in which the job is running.
test (Test): The test instance associated with this job.
output_path (str): The path where the job's output is stored.
output_path (Path): The path where the job's output is stored.
terminated_by_dependency (bool): Flag to indicate if the job was terminated due to a dependency.
"""

def __init__(self, job_id: int, test: Test, output_path: str):
def __init__(self, mode: str, system: System, test: Test, output_path: Path):
"""
Initialize a BaseJob instance.

Args:
job_id (int): The unique identifier of the job.
output_path (str): The path where the job's output is stored.
mode (str): The mode of the job (e.g., 'run', 'dry-run').
system (System): The system in which the job is running.
test (Test): The test instance associated with the job.
output_path (Path): The path where the job's output is stored.
"""
self.id = job_id
self.mode = mode
self.system = system
self.test = test
self.output_path = output_path
self.terminated_by_dependency = False

def increment_iteration(self):
def get_mode(self) -> str:
"""
Retrieve the mode of the job.

Returns
str: The mode of the job.
"""
return self.mode

def get_system(self) -> System:
"""
Get the system in which the job is running.

Returns
System: The system associated with the job.
"""
return self.system

def get_test(self) -> Test:
"""
Get the test instance associated with this job.

Returns
Test: The test instance associated with the job.
"""
return self.test

def get_output_path(self) -> Path:
"""
Retrieve the path where the job's output is stored.

Returns
Path: The path to the job's output directory.
"""
Increment the iteration count of the associated test.
return self.output_path

This method should be called when the job completes an iteration and is ready to proceed to the next one.
def is_terminated_by_dependency(self) -> bool:
"""
Check if the job was terminated due to a dependency.

Returns
bool: True if the job was terminated by a dependency, False otherwise.
"""
return self.terminated_by_dependency

@abstractmethod
def get_id(self) -> Union[str, int]:
"""
Abstract method to retrieve the unique identifier of the job.

Returns
Union[str, int]: The unique identifier of the job.
"""
pass

def is_running(self) -> bool:
"""
Check if the specified job is currently running.

Returns
bool: True if the job is running, False otherwise.
"""
if self.mode == "dry-run":
return True
return self.system.is_job_running(self)

def is_completed(self) -> bool:
"""
Check if a job is completed.

Returns
bool: True if the job is completed, False otherwise.
"""
if self.mode == "dry-run":
return True
return self.system.is_job_completed(self)

def increment_iteration(self):
"""Increment the iteration count of the associated test."""
self.test.current_iteration += 1

def __repr__(self) -> str:
Expand All @@ -57,4 +138,4 @@ def __repr__(self) -> str:
Returns
str: String representation of the job.
"""
return f"BaseJob(id={self.id}, test={self.test.name}, terminated_by_dependency={self.terminated_by_dependency})"
return f"BaseJob(mode={self.mode}, system={self.system.name}, test={self.test.name})"
93 changes: 29 additions & 64 deletions src/cloudai/_core/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import asyncio
import logging
import os
import signal
import sys
from abc import ABC, abstractmethod
from asyncio import Task
from datetime import datetime
from pathlib import Path
from types import FrameType
from typing import Dict, List, Optional

Expand All @@ -37,14 +37,14 @@ class BaseRunner(ABC):
"""
Abstract base class for a Runner that manages test execution.

This class provides a framework for executing tests within a given test
scenario, handling dependencies and execution order.
This class provides a framework for executing tests within a given test scenario, handling dependencies and
execution order.

Attributes
mode (str): The operation mode ('dry-run', 'run').
system (System): The system schema object.
test_scenario (TestScenario): The test scenario to run.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
monitor_interval (int): Interval in seconds for monitoring jobs.
jobs (List[BaseJob]): List to track jobs created by the runner.
test_to_job_map (Dict[Test, BaseJob]): Mapping from tests to their jobs.
Expand Down Expand Up @@ -78,21 +78,21 @@ def __init__(
self.shutting_down = False
self.register_signal_handlers()

def setup_output_directory(self, base_output_path: str) -> str:
def setup_output_directory(self, base_output_path: Path) -> Path:
"""
Set up and return the output directory path for the runner instance.

Args:
base_output_path (str): The base output directory.
base_output_path (Path): The base output directory.

Returns:
str: The path to the output directory.
Path: The path to the output directory.
"""
if not os.path.exists(base_output_path):
os.makedirs(base_output_path)
if not base_output_path.exists():
base_output_path.mkdir()
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_subpath = os.path.join(base_output_path, f"{self.test_scenario.name}_{current_time}")
os.makedirs(output_subpath)
output_subpath = base_output_path / f"{self.test_scenario.name}_{current_time}"
output_subpath.mkdir()
return output_subpath

def register_signal_handlers(self):
Expand Down Expand Up @@ -136,7 +136,7 @@ async def shutdown(self):
return
logging.info("Terminating all jobs...")
for job in self.jobs:
self.kill_job(job)
self.system.kill(job)
logging.info("All jobs have been killed.")

sys.exit(0)
Expand Down Expand Up @@ -210,7 +210,7 @@ async def check_start_post_init_dependencies(self):
items = list(self.test_to_job_map.items())

for test, job in items:
if self.is_job_running(job):
if job.is_running():
await self.check_and_schedule_start_post_init_dependent_tests(test)

async def check_and_schedule_start_post_init_dependent_tests(self, started_test: Test):
Expand Down Expand Up @@ -242,9 +242,9 @@ def find_dependency_free_tests(self) -> List[Test]:

return dependency_free_tests

def get_job_output_path(self, test: Test) -> str:
def get_job_output_path(self, test: Test) -> Path:
"""
Generate and ensures the existence of the output directory for a given test.
Generate and ensure the existence of the output directory for a given test.

It constructs the path based on the test's section name and current iteration, creating the directories if they
do not exist.
Expand All @@ -253,23 +253,24 @@ def get_job_output_path(self, test: Test) -> str:
test (Test): The test instance for which to generate the output directory path.

Returns:
str: The path to the job's output directory.
Path: The path to the job's output directory.

Raises:
ValueError: If the test's section name is None.
FileNotFoundError: If the base output directory does not exist.
PermissionError: If there is a permission issue creating the directories.
"""
job_output_path = ""
if not self.output_path.exists():
raise FileNotFoundError(f"Output directory {self.output_path} does not exist")

job_output_path = None # Initialize the variable

if not os.path.exists(self.output_path):
raise FileNotFoundError(f"Output directory {self.output_path} " f"does not exist")
try:
assert test.section_name is not None, "test.section_name must not be None"
test_output_path = os.path.join(self.output_path, test.section_name)
os.makedirs(test_output_path, exist_ok=True)
job_output_path = os.path.join(test_output_path, str(test.current_iteration))
os.makedirs(job_output_path, exist_ok=True)
test_output_path = self.output_path / test.section_name
test_output_path.mkdir()
job_output_path = test_output_path / str(test.current_iteration)
job_output_path.mkdir()
except PermissionError as e:
raise PermissionError(f"Cannot create directory {job_output_path}: {e}") from e

Expand All @@ -285,7 +286,7 @@ async def monitor_jobs(self) -> int:
successful_jobs_count = 0

for job in list(self.jobs):
if self.is_job_completed(job):
if job.is_completed():
if self.mode == "dry-run":
successful_jobs_count += 1
await self.handle_job_completion(job)
Expand All @@ -297,7 +298,7 @@ async def monitor_jobs(self) -> int:
await self.handle_job_completion(job)
else:
error_message = (
f"Job {job.id} for test {job.test.section_name} failed: "
f"Job {job.get_id()} for test {job.test.section_name} failed: "
f"{job_status_result.error_message}"
)
logging.error(error_message)
Expand All @@ -307,7 +308,7 @@ async def monitor_jobs(self) -> int:
job_status_result = self.get_job_status(job)
if not job_status_result.is_successful:
error_message = (
f"Job {job.id} for test {job.test.section_name} failed: "
f"Job {job.get_id()} for test {job.test.section_name} failed: "
f"{job_status_result.error_message}"
)
logging.error(error_message)
Expand Down Expand Up @@ -346,32 +347,6 @@ async def handle_job_completion(self, completed_job: BaseJob):
else:
await self.handle_dependencies(completed_job)

@abstractmethod
def is_job_running(self, job: BaseJob) -> bool:
"""
Check if a job is currently running.

Args:
job (BaseJob): The job to check.

Returns:
bool: True if the job is running, False otherwise.
"""
pass

@abstractmethod
def is_job_completed(self, job: BaseJob) -> bool:
"""
Determine if a job is completed.

Args:
job (BaseJob): The job to be checked.

Returns:
bool: True if the job is completed, False otherwise.
"""
pass

async def handle_dependencies(self, completed_job: BaseJob) -> List[Task]:
"""
Handle the start_post_comp and end_post_comp dependencies for a completed job.
Expand Down Expand Up @@ -402,16 +377,6 @@ async def handle_dependencies(self, completed_job: BaseJob) -> List[Task]:

return tasks

@abstractmethod
def kill_job(self, job: BaseJob):
"""
Kill a specific job.

Args:
job (BaseJob): The job to be killed.
"""
pass

async def delayed_kill_job(self, job: BaseJob, delay: int = 0):
"""
Schedule termination of a Standalone job after a specified delay.
Expand All @@ -420,7 +385,7 @@ async def delayed_kill_job(self, job: BaseJob, delay: int = 0):
job (BaseJob): The job to be terminated.
delay (int): Delay in seconds after which the job should be terminated.
"""
logging.info(f"Scheduling termination of job {job.id} after {delay} seconds.")
logging.info(f"Scheduling termination of job {job.get_id()} after {delay} seconds.")
await asyncio.sleep(delay)
job.terminated_by_dependency = True
self.kill_job(job)
self.system.kill(job)
5 changes: 3 additions & 2 deletions src/cloudai/_core/command_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path
from typing import Dict, List

from .test_template_strategy import TestTemplateStrategy
Expand All @@ -34,7 +35,7 @@ def gen_exec_command(
cmd_args: Dict[str, str],
extra_env_vars: Dict[str, str],
extra_cmd_args: str,
output_path: str,
output_path: Path,
num_nodes: int,
nodes: List[str],
) -> str:
Expand All @@ -46,7 +47,7 @@ def gen_exec_command(
cmd_args (Dict[str, str]): Command-line arguments for the test.
extra_env_vars (Dict[str, str]): Additional environment variables.
extra_cmd_args (str): Additional command-line arguments.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
num_nodes (int): The number of nodes to be used for the test execution.
nodes (List[str]): List of nodes for test execution, optional.

Expand Down
Loading