Skip to content

Commit

Permalink
Merge pull request #183 from TaekyungHeo/refactor-path-rebase
Browse files Browse the repository at this point in the history
Refactor to Use pathlib.Path for Path-Related Variables
  • Loading branch information
srinivas212 authored Aug 28, 2024
2 parents 35d1489 + 1f5b36e commit e8a959a
Show file tree
Hide file tree
Showing 64 changed files with 627 additions and 651 deletions.
4 changes: 2 additions & 2 deletions src/cloudai/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def handle_generate_report(test_scenario: TestScenario, output_dir: Path) -> Non
output_dir (Path): The path to the output directory.
"""
logging.info("Generating report based on system and test scenario")
generator = ReportGenerator(str(output_dir))
generator = ReportGenerator(output_dir)
generator.generate_report(test_scenario)

logging.info("Report generation completed.")
Expand All @@ -274,7 +274,7 @@ def main() -> None:
system, tests, test_scenario = parser.parse(tests_dir, test_scenario_path)

if output_dir:
system.output_path = str(output_dir.absolute())
system.output_path = output_dir.absolute()
system.update()

if args.mode in ["install", "uninstall"]:
Expand Down
8 changes: 5 additions & 3 deletions src/cloudai/_core/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path

from .test import Test


Expand All @@ -24,17 +26,17 @@ class BaseJob:
Attributes
id (int): The unique identifier of the job.
test (Test): The test instance associated with this job.
output_path (str): The path where the job's output is stored.
output_path (Path): The path where the job's output is stored.
terminated_by_dependency (bool): Flag to indicate if the job was terminated due to a dependency.
"""

def __init__(self, job_id: int, test: Test, output_path: str):
def __init__(self, job_id: int, test: Test, output_path: Path):
"""
Initialize a BaseJob instance.
Args:
job_id (int): The unique identifier of the job.
output_path (str): The path where the job's output is stored.
output_path (Path): The path where the job's output is stored.
test (Test): The test instance associated with the job.
"""
self.id = job_id
Expand Down
39 changes: 20 additions & 19 deletions src/cloudai/_core/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import asyncio
import logging
import os
import signal
import sys
from abc import ABC, abstractmethod
from asyncio import Task
from datetime import datetime
from pathlib import Path
from types import FrameType
from typing import Dict, List, Optional

Expand All @@ -44,7 +44,7 @@ class BaseRunner(ABC):
mode (str): The operation mode ('dry-run', 'run').
system (System): The system schema object.
test_scenario (TestScenario): The test scenario to run.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
monitor_interval (int): Interval in seconds for monitoring jobs.
jobs (List[BaseJob]): List to track jobs created by the runner.
test_to_job_map (Dict[Test, BaseJob]): Mapping from tests to their jobs.
Expand Down Expand Up @@ -78,21 +78,21 @@ def __init__(
self.shutting_down = False
self.register_signal_handlers()

def setup_output_directory(self, base_output_path: str) -> str:
def setup_output_directory(self, base_output_path: Path) -> Path:
"""
Set up and return the output directory path for the runner instance.
Args:
base_output_path (str): The base output directory.
base_output_path (Path): The base output directory.
Returns:
str: The path to the output directory.
Path: The path to the output directory.
"""
if not os.path.exists(base_output_path):
os.makedirs(base_output_path)
if not base_output_path.exists():
base_output_path.mkdir()
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_subpath = os.path.join(base_output_path, f"{self.test_scenario.name}_{current_time}")
os.makedirs(output_subpath)
output_subpath = base_output_path / f"{self.test_scenario.name}_{current_time}"
output_subpath.mkdir()
return output_subpath

def register_signal_handlers(self):
Expand Down Expand Up @@ -242,9 +242,9 @@ def find_dependency_free_tests(self) -> List[Test]:

return dependency_free_tests

def get_job_output_path(self, test: Test) -> str:
def get_job_output_path(self, test: Test) -> Path:
"""
Generate and ensures the existence of the output directory for a given test.
Generate and ensure the existence of the output directory for a given test.
It constructs the path based on the test's section name and current iteration, creating the directories if they
do not exist.
Expand All @@ -253,23 +253,24 @@ def get_job_output_path(self, test: Test) -> str:
test (Test): The test instance for which to generate the output directory path.
Returns:
str: The path to the job's output directory.
Path: The path to the job's output directory.
Raises:
ValueError: If the test's section name is None.
FileNotFoundError: If the base output directory does not exist.
PermissionError: If there is a permission issue creating the directories.
"""
job_output_path = ""
if not self.output_path.exists():
raise FileNotFoundError(f"Output directory {self.output_path} does not exist")

job_output_path = Path() # avoid reportPossiblyUnboundVariable from pyright

if not os.path.exists(self.output_path):
raise FileNotFoundError(f"Output directory {self.output_path} " f"does not exist")
try:
assert test.section_name is not None, "test.section_name must not be None"
test_output_path = os.path.join(self.output_path, test.section_name)
os.makedirs(test_output_path, exist_ok=True)
job_output_path = os.path.join(test_output_path, str(test.current_iteration))
os.makedirs(job_output_path, exist_ok=True)
test_output_path = self.output_path / test.section_name
test_output_path.mkdir()
job_output_path = test_output_path / str(test.current_iteration)
job_output_path.mkdir()
except PermissionError as e:
raise PermissionError(f"Cannot create directory {job_output_path}: {e}") from e

Expand Down
5 changes: 3 additions & 2 deletions src/cloudai/_core/command_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path
from typing import Dict, List

from .test_template_strategy import TestTemplateStrategy
Expand All @@ -34,7 +35,7 @@ def gen_exec_command(
cmd_args: Dict[str, str],
extra_env_vars: Dict[str, str],
extra_cmd_args: str,
output_path: str,
output_path: Path,
num_nodes: int,
nodes: List[str],
) -> str:
Expand All @@ -46,7 +47,7 @@ def gen_exec_command(
cmd_args (Dict[str, str]): Command-line arguments for the test.
extra_env_vars (Dict[str, str]): Additional environment variables.
extra_cmd_args (str): Additional command-line arguments.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
num_nodes (int): The number of nodes to be used for the test execution.
nodes (List[str]): List of nodes for test execution, optional.
Expand Down
28 changes: 13 additions & 15 deletions src/cloudai/_core/grader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import csv
import logging
import os
from pathlib import Path
from typing import Dict, List

from .test import Test
Expand All @@ -28,11 +28,11 @@ class Grader:
Class responsible for grading the performance of tests within a test scenario and generating a report.
Attributes
output_path (str): The path where the performance results are stored.
output_path (Path): The path where the performance results are stored.
logger (logging.Logger): Logger for the class, used to log messages related to the grading process.
"""

def __init__(self, output_path: str) -> None:
def __init__(self, output_path: Path) -> None:
self.output_path = output_path

def grade(self, test_scenario: TestScenario) -> str:
Expand All @@ -57,7 +57,7 @@ def grade(self, test_scenario: TestScenario) -> str:
if not section_name:
logging.warning(f"Missing section name for test {test.name}")
continue
test_output_dir = os.path.join(self.output_path, section_name)
test_output_dir = self.output_path / section_name
perfs = self._get_perfs_from_subdirs(test_output_dir, test)
avg_perf = sum(perfs) / len(perfs) if perfs else 0
test_perfs[test.name] = perfs + [avg_perf]
Expand All @@ -69,24 +69,22 @@ def grade(self, test_scenario: TestScenario) -> str:
self._save_report(report)
return report

def _get_perfs_from_subdirs(self, directory_path: str, test: Test) -> List[float]:
def _get_perfs_from_subdirs(self, directory_path: Path, test: Test) -> List[float]:
"""
Average performance values from subdirectories within a given path, according to the test's grading template.
Args:
directory_path (str): Directory path.
directory_path (Path): Directory path.
test (Test): The test to grade.
Returns:
List[float]: A list of performance values.
"""
perfs = []
for subdir in os.listdir(directory_path):
if subdir.isdigit():
subdir_path = os.path.join(directory_path, subdir)
if os.path.isdir(subdir_path):
perf = test.test_template.grade(subdir_path, test.ideal_perf)
perfs.append(perf)
for subdir in directory_path.iterdir():
if subdir.is_dir() and subdir.name.isdigit():
perf = test.test_template.grade(subdir, test.ideal_perf)
perfs.append(perf)
return perfs

def _generate_report(self, test_perfs: Dict[str, List[float]], overall_avg: float) -> str:
Expand All @@ -102,7 +100,7 @@ def _generate_report(self, test_perfs: Dict[str, List[float]], overall_avg: floa
"""
report_lines = ["Test Performance Report:"]
for test, perfs in test_perfs.items():
report_lines.append(f"{test}: Min: {min(perfs[:-1])}, " f"Max: {max(perfs[:-1])}, " f"Avg: {perfs[-1]}")
report_lines.append(f"{test}: Min: {min(perfs[:-1])}, Max: {max(perfs[:-1])}, Avg: {perfs[-1]}")
report_lines.append(f"Overall Average Performance: {overall_avg}")
return "\n".join(report_lines)

Expand All @@ -113,8 +111,8 @@ def _save_report(self, report: str) -> None:
Args:
report (str): The report to save.
"""
report_path = os.path.join(self.output_path, "performance_report.csv")
with open(report_path, "w", newline="") as file:
report_path = self.output_path / "performance_report.csv"
with report_path.open("w", newline="") as file:
writer = csv.writer(file)
for line in report.split("\n"):
writer.writerow([line])
5 changes: 3 additions & 2 deletions src/cloudai/_core/grading_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path

from .test_template_strategy import TestTemplateStrategy

Expand All @@ -23,12 +24,12 @@ class GradingStrategy(TestTemplateStrategy):
"""Abstract class for grading test performance."""

@abstractmethod
def grade(self, directory_path: str, ideal_perf: float) -> float:
def grade(self, directory_path: Path, ideal_perf: float) -> float:
"""
Grades the performance of a test.
Args:
directory_path (str): Path to the directory containing the test's output.
directory_path (Path): Path to the directory containing the test's output.
ideal_perf (float): The ideal performance value for comparison.
Returns:
Expand Down
5 changes: 3 additions & 2 deletions src/cloudai/_core/job_status_retrieval_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path

from .job_status_result import JobStatusResult

Expand All @@ -23,12 +24,12 @@ class JobStatusRetrievalStrategy:
"""Abstract class to define a strategy for retrieving job statuses from a given output directory."""

@abstractmethod
def get_job_status(self, output_path: str) -> JobStatusResult:
def get_job_status(self, output_path: Path) -> JobStatusResult:
"""
Retrieve the job status from a specified output directory.
Args:
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
Returns:
JobStatusResult: The result containing the job status and an optional error message.
Expand Down
2 changes: 1 addition & 1 deletion src/cloudai/_core/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def parse(
if not test_path.exists():
raise FileNotFoundError(f"Test path '{test_path}' not found.")

system_parser = SystemParser(str(self.system_config_path))
system_parser = SystemParser(self.system_config_path)
system = system_parser.parse()
logging.debug("Parsed system config")

Expand Down
9 changes: 5 additions & 4 deletions src/cloudai/_core/report_generation_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from abc import abstractmethod
from pathlib import Path
from typing import Optional


Expand All @@ -27,26 +28,26 @@ class ReportGenerationStrategy:
"""

@abstractmethod
def can_handle_directory(self, directory_path: str) -> bool:
def can_handle_directory(self, directory_path: Path) -> bool:
"""
Determine if the strategy can handle the directory.
Args:
directory_path (str): Path to the directory.
directory_path (Path): Path to the directory.
Returns:
bool: True if can handle, False otherwise.
"""
pass

@abstractmethod
def generate_report(self, test_name: str, directory_path: str, sol: Optional[float] = None) -> None:
def generate_report(self, test_name: str, directory_path: Path, sol: Optional[float] = None) -> None:
"""
Generate a report from the directory.
Args:
test_name (str): The name of the test.
directory_path (str): Path to the directory.
directory_path (Path): Path to the directory.
sol (Optional[float]): Speed-of-light performance for reference.
"""
pass
13 changes: 4 additions & 9 deletions src/cloudai/_core/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


from abc import ABC, abstractmethod
from pathlib import Path


class System(ABC):
Expand All @@ -25,24 +26,18 @@ class System(ABC):
Attributes
name (str): Unique name of the system.
scheduler (str): Type of scheduler used by the system, determining the specific subclass of System to be used.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
monitor_interval (int): Interval in seconds for monitoring jobs.
"""

def __init__(
self,
name: str,
scheduler: str,
output_path: str,
monitor_interval: int = 1,
) -> None:
def __init__(self, name: str, scheduler: str, output_path: Path, monitor_interval: int = 1) -> None:
"""
Initialize a System instance.
Args:
name (str): Name of the system.
scheduler (str): Type of scheduler used by the system.
output_path (str): Path to the output directory.
output_path (Path): Path to the output directory.
monitor_interval (int): Interval in seconds for monitoring jobs.
"""
self.name = name
Expand Down
Loading

0 comments on commit e8a959a

Please sign in to comment.