Skip to content

Commit

Permalink
Merge pull request #176 from TaekyungHeo/refactor-job
Browse files Browse the repository at this point in the history
Redefine Job, Runner, and System Responsibilities; Refactor Classes
  • Loading branch information
TaekyungHeo authored Aug 29, 2024
2 parents 9481911 + 82aeaa9 commit e0b3418
Show file tree
Hide file tree
Showing 12 changed files with 471 additions and 359 deletions.
49 changes: 37 additions & 12 deletions src/cloudai/_core/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# limitations under the License.

from pathlib import Path
from typing import Union

from .system import System
from .test import Test


Expand All @@ -24,32 +26,55 @@ class BaseJob:
Base class for representing a job created by executing a test.
Attributes
id (int): The unique identifier of the job.
id (Union[str, int]): The unique identifier of the job.
mode (str): The mode of the job (e.g., 'run', 'dry-run').
system (System): The system in which the job is running.
test (Test): The test instance associated with this job.
output_path (Path): The path where the job's output is stored.
terminated_by_dependency (bool): Flag to indicate if the job was terminated due to a dependency.
"""

def __init__(self, job_id: int, test: Test, output_path: Path):
def __init__(self, mode: str, system: System, test: Test, output_path: Path):
"""
Initialize a BaseJob instance.
Args:
job_id (int): The unique identifier of the job.
output_path (Path): The path where the job's output is stored.
mode (str): The mode of the job (e.g., 'run', 'dry-run').
system (System): The system in which the job is running.
test (Test): The test instance associated with the job.
output_path (Path): The path where the job's output is stored.
"""
self.id = job_id
self.test = test
self.output_path = output_path
self.terminated_by_dependency = False
self.id: Union[str, int] = 0
self.mode: str = mode
self.system: System = system
self.test: Test = test
self.output_path: Path = output_path
self.terminated_by_dependency: bool = False

def increment_iteration(self):
def is_running(self) -> bool:
"""
Check if the specified job is currently running.
Returns
bool: True if the job is running, False otherwise.
"""
Increment the iteration count of the associated test.
if self.mode == "dry-run":
return True
return self.system.is_job_running(self)

This method should be called when the job completes an iteration and is ready to proceed to the next one.
def is_completed(self) -> bool:
"""
Check if a job is completed.
Returns
bool: True if the job is completed, False otherwise.
"""
if self.mode == "dry-run":
return True
return self.system.is_job_completed(self)

def increment_iteration(self):
"""Increment the iteration count of the associated test."""
self.test.current_iteration += 1

def __repr__(self) -> str:
Expand All @@ -59,4 +84,4 @@ def __repr__(self) -> str:
Returns
str: String representation of the job.
"""
return f"BaseJob(id={self.id}, test={self.test.name}, terminated_by_dependency={self.terminated_by_dependency})"
return f"BaseJob(id={self.id}, mode={self.mode}, system={self.system.name}, test={self.test.name})"
48 changes: 6 additions & 42 deletions src/cloudai/_core/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class BaseRunner(ABC):
"""
Abstract base class for a Runner that manages test execution.
This class provides a framework for executing tests within a given test
scenario, handling dependencies and execution order.
This class provides a framework for executing tests within a given test scenario, handling dependencies and
execution order.
Attributes
mode (str): The operation mode ('dry-run', 'run').
Expand Down Expand Up @@ -136,7 +136,7 @@ async def shutdown(self):
return
logging.info("Terminating all jobs...")
for job in self.jobs:
self.kill_job(job)
self.system.kill(job)
logging.info("All jobs have been killed.")

sys.exit(0)
Expand Down Expand Up @@ -210,7 +210,7 @@ async def check_start_post_init_dependencies(self):
items = list(self.test_to_job_map.items())

for test, job in items:
if self.is_job_running(job):
if job.is_running():
await self.check_and_schedule_start_post_init_dependent_tests(test)

async def check_and_schedule_start_post_init_dependent_tests(self, started_test: Test):
Expand Down Expand Up @@ -286,7 +286,7 @@ async def monitor_jobs(self) -> int:
successful_jobs_count = 0

for job in list(self.jobs):
if self.is_job_completed(job):
if job.is_completed():
if self.mode == "dry-run":
successful_jobs_count += 1
await self.handle_job_completion(job)
Expand Down Expand Up @@ -347,32 +347,6 @@ async def handle_job_completion(self, completed_job: BaseJob):
else:
await self.handle_dependencies(completed_job)

@abstractmethod
def is_job_running(self, job: BaseJob) -> bool:
"""
Check if a job is currently running.
Args:
job (BaseJob): The job to check.
Returns:
bool: True if the job is running, False otherwise.
"""
pass

@abstractmethod
def is_job_completed(self, job: BaseJob) -> bool:
"""
Determine if a job is completed.
Args:
job (BaseJob): The job to be checked.
Returns:
bool: True if the job is completed, False otherwise.
"""
pass

async def handle_dependencies(self, completed_job: BaseJob) -> List[Task]:
"""
Handle the start_post_comp and end_post_comp dependencies for a completed job.
Expand Down Expand Up @@ -403,16 +377,6 @@ async def handle_dependencies(self, completed_job: BaseJob) -> List[Task]:

return tasks

@abstractmethod
def kill_job(self, job: BaseJob):
"""
Kill a specific job.
Args:
job (BaseJob): The job to be killed.
"""
pass

async def delayed_kill_job(self, job: BaseJob, delay: int = 0):
"""
Schedule termination of a Standalone job after a specified delay.
Expand All @@ -424,4 +388,4 @@ async def delayed_kill_job(self, job: BaseJob, delay: int = 0):
logging.info(f"Scheduling termination of job {job.id} after {delay} seconds.")
await asyncio.sleep(delay)
job.terminated_by_dependency = True
self.kill_job(job)
self.system.kill(job)
78 changes: 77 additions & 1 deletion src/cloudai/_core/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
# limitations under the License.


import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .base_job import BaseJob


class System(ABC):
Expand Down Expand Up @@ -59,4 +64,75 @@ def __repr__(self) -> str:

@abstractmethod
def update(self) -> None:
raise NotImplementedError("Subclasses must implement this method.")
"""
Update the system's state.
Raises
NotImplementedError: Raised if the method is not implemented in a subclass.
"""
error_message = (
"System update method is not implemented. All subclasses of the System class must implement the "
"'update' method to ensure the system's state can be refreshed as needed."
)
logging.error(error_message)
raise NotImplementedError(error_message)

@abstractmethod
def is_job_running(self, job: "BaseJob") -> bool:
"""
Check if a given job is currently running.
Args:
job (BaseJob): The job to check.
Returns:
bool: True if the job is running, False otherwise.
Raises:
NotImplementedError: Raised if the method is not implemented in a subclass.
"""
error_message = (
"Job running status check method is not implemented. All subclasses of the System class must implement the"
" 'is_job_running' method to determine whether a job is currently active."
)
logging.error(error_message)
raise NotImplementedError(error_message)

@abstractmethod
def is_job_completed(self, job: "BaseJob") -> bool:
"""
Check if a given job is completed.
Args:
job (BaseJob): The job to check.
Returns:
bool: True if the job is completed, False otherwise.
Raises:
NotImplementedError: Raised if the method is not implemented in a subclass.
"""
error_message = (
"Job completion status check method is not implemented. All subclasses of the System class must implement "
"the 'is_job_completed' method to determine whether a job has finished execution."
)
logging.error(error_message)
raise NotImplementedError(error_message)

@abstractmethod
def kill(self, job: "BaseJob") -> None:
"""
Terminate a given job.
Args:
job (BaseJob): The job to be terminated.
Raises:
NotImplementedError: Raised if the method is not implemented in a subclass.
"""
error_message = (
"Job termination method is not implemented. All subclasses of the System class must implement the 'kill' "
"method to terminate a job that is currently running."
)
logging.error(error_message)
raise NotImplementedError(error_message)
23 changes: 21 additions & 2 deletions src/cloudai/_core/test_scenario_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,30 @@ def _create_section_test(self, section: str, test_info: Dict[str, Any]) -> Test:
f"reference from the test scenario schema."
)

test = copy.deepcopy(self.test_mapping[test_name])
test.test_template = self.test_mapping[test_name].test_template
original_test = self.test_mapping[test_name]

test = Test(
name=original_test.name,
description=original_test.description,
test_template=original_test.test_template,
env_vars=copy.deepcopy(original_test.env_vars),
cmd_args=copy.deepcopy(original_test.cmd_args),
extra_env_vars=copy.deepcopy(original_test.extra_env_vars),
extra_cmd_args=original_test.extra_cmd_args,
dependencies=copy.deepcopy(original_test.dependencies),
iterations=original_test.iterations,
num_nodes=original_test.num_nodes,
nodes=original_test.nodes,
sol=original_test.sol,
weight=original_test.weight,
ideal_perf=original_test.ideal_perf,
time_limit=original_test.time_limit,
)

test.section_name = section
test.num_nodes = int(test_info.get("num_nodes", 1))
test.nodes = test_info.get("nodes", [])

return test

def _parse_dependencies_for_test(
Expand Down
25 changes: 22 additions & 3 deletions src/cloudai/runner/slurm/slurm_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from cloudai import BaseJob
from pathlib import Path
from typing import Union

from cloudai import BaseJob, System, Test


class SlurmJob(BaseJob):
"""Represents a job in a Slurm environment."""
"""
A job class for execution on a Slurm system.
Attributes
id (Union[str, int]): The unique identifier of the job.
"""

def __init__(self, mode: str, system: System, test: Test, job_id: Union[str, int], output_path: Path):
BaseJob.__init__(self, mode, system, test, output_path)
self.id = job_id

def __repr__(self) -> str:
"""
Return a string representation of the SlurmJob instance.
pass
Returns
str: String representation of the job.
"""
return f"SlurmJob(id={self.id}, test={self.test.name})"
Loading

0 comments on commit e0b3418

Please sign in to comment.