Skip to content

Commit

Permalink
Refactor to use pathlib.Path for path-related variables
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo committed Aug 20, 2024
1 parent bc84adf commit c5fd63f
Show file tree
Hide file tree
Showing 52 changed files with 456 additions and 464 deletions.
4 changes: 2 additions & 2 deletions src/cloudai/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def handle_generate_report(test_scenario: TestScenario, output_dir: Path) -> Non
output_dir (Path): The path to the output directory.
"""
logging.info("Generating report based on system and test scenario")
generator = ReportGenerator(str(output_dir))
generator = ReportGenerator(output_dir)
generator.generate_report(test_scenario)

logging.info("Report generation completed.")
Expand All @@ -274,7 +274,7 @@ def main() -> None:
system, tests, test_scenario = parser.parse(tests_dir, test_scenario_path)

if output_dir:
system.output_path = str(output_dir.absolute())
system.output_path = Path(output_dir.absolute())
system.update()

if args.mode in ["install", "uninstall"]:
Expand Down
41 changes: 21 additions & 20 deletions src/cloudai/_core/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import asyncio
import logging
import os
import signal
import sys
from abc import ABC, abstractmethod
from asyncio import Task
from datetime import datetime
from pathlib import Path
from types import FrameType
from typing import Dict, List, Optional

Expand All @@ -44,7 +44,7 @@ class BaseRunner(ABC):
mode (str): The operation mode ('dry-run', 'run').
system (System): The system schema object.
test_scenario (TestScenario): The test scenario to run.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
monitor_interval (int): Interval in seconds for monitoring jobs.
jobs (List[BaseJob]): List to track jobs created by the runner.
test_to_job_map (Dict[Test, BaseJob]): Mapping from tests to their jobs.
Expand Down Expand Up @@ -78,21 +78,21 @@ def __init__(
self.shutting_down = False
self.register_signal_handlers()

def setup_output_directory(self, base_output_path: str) -> str:
def setup_output_directory(self, base_output_path: Path) -> Path:
"""
Set up and return the output directory path for the runner instance.
Args:
base_output_path (str): The base output directory.
base_output_path (Path): The base output directory.
Returns:
str: The path to the output directory.
Path: The path to the output directory.
"""
if not os.path.exists(base_output_path):
os.makedirs(base_output_path)
if not base_output_path.exists():
base_output_path.mkdir()
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_subpath = os.path.join(base_output_path, f"{self.test_scenario.name}_{current_time}")
os.makedirs(output_subpath)
output_subpath = base_output_path / f"{self.test_scenario.name}_{current_time}"
output_subpath.mkdir()
return output_subpath

def register_signal_handlers(self):
Expand Down Expand Up @@ -242,9 +242,9 @@ def find_dependency_free_tests(self) -> List[Test]:

return dependency_free_tests

def get_job_output_path(self, test: Test) -> str:
def get_job_output_path(self, test: Test) -> Path:
"""
Generate and ensures the existence of the output directory for a given test.
Generate and ensure the existence of the output directory for a given test.
It constructs the path based on the test's section name and current iteration, creating the directories if they
do not exist.
Expand All @@ -253,23 +253,24 @@ def get_job_output_path(self, test: Test) -> str:
test (Test): The test instance for which to generate the output directory path.
Returns:
str: The path to the job's output directory.
Path: The path to the job's output directory.
Raises:
ValueError: If the test's section name is None.
FileNotFoundError: If the base output directory does not exist.
PermissionError: If there is a permission issue creating the directories.
"""
job_output_path = ""
if not self.output_path.exists():
raise FileNotFoundError(f"Output directory {self.output_path} does not exist")

job_output_path = None # Initialize the variable

if not os.path.exists(self.output_path):
raise FileNotFoundError(f"Output directory {self.output_path} " f"does not exist")
try:
assert test.section_name is not None, "test.section_name must not be None"
test_output_path = os.path.join(self.output_path, test.section_name)
os.makedirs(test_output_path, exist_ok=True)
job_output_path = os.path.join(test_output_path, str(test.current_iteration))
os.makedirs(job_output_path, exist_ok=True)
test_output_path = self.output_path / test.section_name
test_output_path.mkdir()
job_output_path = test_output_path / str(test.current_iteration)
job_output_path.mkdir()
except PermissionError as e:
raise PermissionError(f"Cannot create directory {job_output_path}: {e}") from e

Expand Down Expand Up @@ -326,7 +327,7 @@ def get_job_status(self, job: BaseJob) -> JobStatusResult:
Returns:
JobStatusResult: The result containing the job status and an optional error message.
"""
return job.test.get_job_status(str(job.output_path))
return job.test.get_job_status(job.output_path)

async def handle_job_completion(self, completed_job: BaseJob):
"""
Expand Down
5 changes: 3 additions & 2 deletions src/cloudai/_core/command_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path
from typing import Dict, List

from .test_template_strategy import TestTemplateStrategy
Expand All @@ -34,7 +35,7 @@ def gen_exec_command(
cmd_args: Dict[str, str],
extra_env_vars: Dict[str, str],
extra_cmd_args: str,
output_path: str,
output_path: Path,
num_nodes: int,
nodes: List[str],
) -> str:
Expand All @@ -46,7 +47,7 @@ def gen_exec_command(
cmd_args (Dict[str, str]): Command-line arguments for the test.
extra_env_vars (Dict[str, str]): Additional environment variables.
extra_cmd_args (str): Additional command-line arguments.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
num_nodes (int): The number of nodes to be used for the test execution.
nodes (List[str]): List of nodes for test execution, optional.
Expand Down
28 changes: 13 additions & 15 deletions src/cloudai/_core/grader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import csv
import logging
import os
from pathlib import Path
from typing import Dict, List

from .test import Test
Expand All @@ -28,11 +28,11 @@ class Grader:
Class responsible for grading the performance of tests within a test scenario and generating a report.
Attributes
output_path (str): The path where the performance results are stored.
output_path (Path): The path where the performance results are stored.
logger (logging.Logger): Logger for the class, used to log messages related to the grading process.
"""

def __init__(self, output_path: str) -> None:
def __init__(self, output_path: Path) -> None:
self.output_path = output_path

def grade(self, test_scenario: TestScenario) -> str:
Expand All @@ -57,7 +57,7 @@ def grade(self, test_scenario: TestScenario) -> str:
if not section_name:
logging.warning(f"Missing section name for test {test.name}")
continue
test_output_dir = os.path.join(self.output_path, section_name)
test_output_dir = self.output_path / section_name
perfs = self._get_perfs_from_subdirs(test_output_dir, test)
avg_perf = sum(perfs) / len(perfs) if perfs else 0
test_perfs[test.name] = perfs + [avg_perf]
Expand All @@ -69,24 +69,22 @@ def grade(self, test_scenario: TestScenario) -> str:
self._save_report(report)
return report

def _get_perfs_from_subdirs(self, directory_path: str, test: Test) -> List[float]:
def _get_perfs_from_subdirs(self, directory_path: Path, test: Test) -> List[float]:
"""
Average performance values from subdirectories within a given path, according to the test's grading template.
Args:
directory_path (str): Directory path.
directory_path (Path): Directory path.
test (Test): The test to grade.
Returns:
List[float]: A list of performance values.
"""
perfs = []
for subdir in os.listdir(directory_path):
if subdir.isdigit():
subdir_path = os.path.join(directory_path, subdir)
if os.path.isdir(subdir_path):
perf = test.test_template.grade(subdir_path, test.ideal_perf)
perfs.append(perf)
for subdir in directory_path.iterdir():
if subdir.is_dir() and subdir.name.isdigit():
perf = test.test_template.grade(subdir, test.ideal_perf)
perfs.append(perf)
return perfs

def _generate_report(self, test_perfs: Dict[str, List[float]], overall_avg: float) -> str:
Expand All @@ -102,7 +100,7 @@ def _generate_report(self, test_perfs: Dict[str, List[float]], overall_avg: floa
"""
report_lines = ["Test Performance Report:"]
for test, perfs in test_perfs.items():
report_lines.append(f"{test}: Min: {min(perfs[:-1])}, " f"Max: {max(perfs[:-1])}, " f"Avg: {perfs[-1]}")
report_lines.append(f"{test}: Min: {min(perfs[:-1])}, Max: {max(perfs[:-1])}, Avg: {perfs[-1]}")
report_lines.append(f"Overall Average Performance: {overall_avg}")
return "\n".join(report_lines)

Expand All @@ -113,8 +111,8 @@ def _save_report(self, report: str) -> None:
Args:
report (str): The report to save.
"""
report_path = os.path.join(self.output_path, "performance_report.csv")
with open(report_path, "w", newline="") as file:
report_path = self.output_path / "performance_report.csv"
with report_path.open("w", newline="") as file:
writer = csv.writer(file)
for line in report.split("\n"):
writer.writerow([line])
5 changes: 3 additions & 2 deletions src/cloudai/_core/grading_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path

from .test_template_strategy import TestTemplateStrategy

Expand All @@ -23,12 +24,12 @@ class GradingStrategy(TestTemplateStrategy):
"""Abstract class for grading test performance."""

@abstractmethod
def grade(self, directory_path: str, ideal_perf: float) -> float:
def grade(self, directory_path: Path, ideal_perf: float) -> float:
"""
Grades the performance of a test.
Args:
directory_path (str): Path to the directory containing the test's output.
directory_path (Path): Path to the directory containing the test's output.
ideal_perf (float): The ideal performance value for comparison.
Returns:
Expand Down
5 changes: 3 additions & 2 deletions src/cloudai/_core/job_status_retrieval_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path

from .job_status_result import JobStatusResult

Expand All @@ -23,12 +24,12 @@ class JobStatusRetrievalStrategy:
"""Abstract class to define a strategy for retrieving job statuses from a given output directory."""

@abstractmethod
def get_job_status(self, output_path: str) -> JobStatusResult:
def get_job_status(self, output_path: Path) -> JobStatusResult:
"""
Retrieve the job status from a specified output directory.
Args:
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
Returns:
JobStatusResult: The result containing the job status and an optional error message.
Expand Down
9 changes: 5 additions & 4 deletions src/cloudai/_core/report_generation_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path
from typing import Optional


Expand All @@ -27,26 +28,26 @@ class ReportGenerationStrategy:
"""

@abstractmethod
def can_handle_directory(self, directory_path: str) -> bool:
def can_handle_directory(self, directory_path: Path) -> bool:
"""
Determine if the strategy can handle the directory.
Args:
directory_path (str): Path to the directory.
directory_path (Path): Path to the directory.
Returns:
bool: True if can handle, False otherwise.
"""
pass

@abstractmethod
def generate_report(self, test_name: str, directory_path: str, sol: Optional[float] = None) -> None:
def generate_report(self, test_name: str, directory_path: Path, sol: Optional[float] = None) -> None:
"""
Generate a report from the directory.
Args:
test_name (str): The name of the test.
directory_path (str): Path to the directory.
directory_path (Path): Path to the directory.
sol (Optional[float]): Speed-of-light performance for reference.
"""
pass
7 changes: 4 additions & 3 deletions src/cloudai/_core/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand All @@ -30,18 +31,18 @@ class System(ABC):
Attributes
name (str): Unique name of the system.
scheduler (str): Type of scheduler used by the system, determining the specific subclass of System to be used.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
monitor_interval (int): Interval in seconds for monitoring jobs.
"""

def __init__(self, name: str, scheduler: str, output_path: str, monitor_interval: int = 1) -> None:
def __init__(self, name: str, scheduler: str, output_path: Path, monitor_interval: int = 1) -> None:
"""
Initialize a System instance.
Args:
name (str): Name of the system.
scheduler (str): Type of scheduler used by the system.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
monitor_interval (int): Interval in seconds for monitoring jobs.
"""
self.name = name
Expand Down
9 changes: 5 additions & 4 deletions src/cloudai/_core/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

import sys
from pathlib import Path
from typing import Dict, List, Optional, Union

from .job_status_result import JobStatusResult
Expand Down Expand Up @@ -126,12 +127,12 @@ def __repr__(self) -> str:
f"nodes={self.nodes})"
)

def gen_exec_command(self, output_path: str) -> str:
def gen_exec_command(self, output_path: Path) -> str:
"""
Generate the command to run this specific test.
Args:
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
Returns:
str: The command string.
Expand Down Expand Up @@ -162,12 +163,12 @@ def get_job_id(self, stdout: str, stderr: str) -> Optional[int]:
"""
return self.test_template.get_job_id(stdout, stderr)

def get_job_status(self, output_path: str) -> JobStatusResult:
def get_job_status(self, output_path: Path) -> JobStatusResult:
"""
Determine the status of a job based on the outputs located in the given output directory.
Args:
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
Returns:
JobStatusResult: The result containing the job status and an optional error message.
Expand Down
Loading

0 comments on commit c5fd63f

Please sign in to comment.