Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redefine Job, Runner, and System Responsibilities; Refactor Classes #176

Merged
merged 3 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions src/cloudai/_core/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# limitations under the License.

from pathlib import Path
from typing import Union

from .system import System
from .test import Test


Expand All @@ -24,32 +26,55 @@ class BaseJob:
Base class for representing a job created by executing a test.

Attributes
id (int): The unique identifier of the job.
id (Union[str, int]): The unique identifier of the job.
mode (str): The mode of the job (e.g., 'run', 'dry-run').
TaekyungHeo marked this conversation as resolved.
Show resolved Hide resolved
system (System): The system in which the job is running.
TaekyungHeo marked this conversation as resolved.
Show resolved Hide resolved
test (Test): The test instance associated with this job.
output_path (Path): The path where the job's output is stored.
terminated_by_dependency (bool): Flag to indicate if the job was terminated due to a dependency.
"""

def __init__(self, job_id: int, test: Test, output_path: Path):
def __init__(self, mode: str, system: System, test: Test, output_path: Path):
"""
Initialize a BaseJob instance.

Args:
job_id (int): The unique identifier of the job.
output_path (Path): The path where the job's output is stored.
mode (str): The mode of the job (e.g., 'run', 'dry-run').
system (System): The system in which the job is running.
test (Test): The test instance associated with the job.
output_path (Path): The path where the job's output is stored.
"""
self.id = job_id
self.test = test
self.output_path = output_path
self.terminated_by_dependency = False
self.id: Union[str, int] = 0
self.mode: str = mode
self.system: System = system
self.test: Test = test
self.output_path: Path = output_path
self.terminated_by_dependency: bool = False

def increment_iteration(self):
def is_running(self) -> bool:
"""
Check if the specified job is currently running.

Returns
bool: True if the job is running, False otherwise.
"""
Increment the iteration count of the associated test.
if self.mode == "dry-run":
return True
return self.system.is_job_running(self)

This method should be called when the job completes an iteration and is ready to proceed to the next one.
def is_completed(self) -> bool:
"""
Check if a job is completed.

Returns
bool: True if the job is completed, False otherwise.
"""
if self.mode == "dry-run":
return True
return self.system.is_job_completed(self)

def increment_iteration(self):
"""Increment the iteration count of the associated test."""
self.test.current_iteration += 1

def __repr__(self) -> str:
Expand All @@ -59,4 +84,4 @@ def __repr__(self) -> str:
Returns
str: String representation of the job.
"""
return f"BaseJob(id={self.id}, test={self.test.name}, terminated_by_dependency={self.terminated_by_dependency})"
return f"BaseJob(id={self.id}, mode={self.mode}, system={self.system.name}, test={self.test.name})"
48 changes: 6 additions & 42 deletions src/cloudai/_core/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class BaseRunner(ABC):
"""
Abstract base class for a Runner that manages test execution.

This class provides a framework for executing tests within a given test
scenario, handling dependencies and execution order.
This class provides a framework for executing tests within a given test scenario, handling dependencies and
execution order.

Attributes
mode (str): The operation mode ('dry-run', 'run').
Expand Down Expand Up @@ -136,7 +136,7 @@ async def shutdown(self):
return
logging.info("Terminating all jobs...")
for job in self.jobs:
self.kill_job(job)
self.system.kill(job)
TaekyungHeo marked this conversation as resolved.
Show resolved Hide resolved
logging.info("All jobs have been killed.")

sys.exit(0)
Expand Down Expand Up @@ -210,7 +210,7 @@ async def check_start_post_init_dependencies(self):
items = list(self.test_to_job_map.items())

for test, job in items:
if self.is_job_running(job):
if job.is_running():
await self.check_and_schedule_start_post_init_dependent_tests(test)

async def check_and_schedule_start_post_init_dependent_tests(self, started_test: Test):
Expand Down Expand Up @@ -286,7 +286,7 @@ async def monitor_jobs(self) -> int:
successful_jobs_count = 0

for job in list(self.jobs):
if self.is_job_completed(job):
if job.is_completed():
if self.mode == "dry-run":
successful_jobs_count += 1
await self.handle_job_completion(job)
Expand Down Expand Up @@ -347,32 +347,6 @@ async def handle_job_completion(self, completed_job: BaseJob):
else:
await self.handle_dependencies(completed_job)

@abstractmethod
def is_job_running(self, job: BaseJob) -> bool:
"""
Check if a job is currently running.

Args:
job (BaseJob): The job to check.

Returns:
bool: True if the job is running, False otherwise.
"""
pass

@abstractmethod
def is_job_completed(self, job: BaseJob) -> bool:
"""
Determine if a job is completed.

Args:
job (BaseJob): The job to be checked.

Returns:
bool: True if the job is completed, False otherwise.
"""
pass

async def handle_dependencies(self, completed_job: BaseJob) -> List[Task]:
"""
Handle the start_post_comp and end_post_comp dependencies for a completed job.
Expand Down Expand Up @@ -403,16 +377,6 @@ async def handle_dependencies(self, completed_job: BaseJob) -> List[Task]:

return tasks

@abstractmethod
def kill_job(self, job: BaseJob):
"""
Kill a specific job.

Args:
job (BaseJob): The job to be killed.
"""
pass

async def delayed_kill_job(self, job: BaseJob, delay: int = 0):
"""
Schedule termination of a Standalone job after a specified delay.
Expand All @@ -424,4 +388,4 @@ async def delayed_kill_job(self, job: BaseJob, delay: int = 0):
logging.info(f"Scheduling termination of job {job.id} after {delay} seconds.")
await asyncio.sleep(delay)
job.terminated_by_dependency = True
self.kill_job(job)
self.system.kill(job)
78 changes: 77 additions & 1 deletion src/cloudai/_core/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
# limitations under the License.


import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .base_job import BaseJob


class System(ABC):
Expand Down Expand Up @@ -59,4 +64,75 @@ def __repr__(self) -> str:

@abstractmethod
def update(self) -> None:
raise NotImplementedError("Subclasses must implement this method.")
"""
Update the system's state.

Raises
NotImplementedError: Raised if the method is not implemented in a subclass.
"""
error_message = (
"System update method is not implemented. All subclasses of the System class must implement the "
"'update' method to ensure the system's state can be refreshed as needed."
)
logging.error(error_message)
raise NotImplementedError(error_message)

@abstractmethod
def is_job_running(self, job: "BaseJob") -> bool:
TaekyungHeo marked this conversation as resolved.
Show resolved Hide resolved
"""
Check if a given job is currently running.

Args:
job (BaseJob): The job to check.

Returns:
bool: True if the job is running, False otherwise.

Raises:
NotImplementedError: Raised if the method is not implemented in a subclass.
"""
error_message = (
"Job running status check method is not implemented. All subclasses of the System class must implement the"
" 'is_job_running' method to determine whether a job is currently active."
)
logging.error(error_message)
raise NotImplementedError(error_message)

@abstractmethod
def is_job_completed(self, job: "BaseJob") -> bool:
"""
Check if a given job is completed.

Args:
job (BaseJob): The job to check.

Returns:
bool: True if the job is completed, False otherwise.

Raises:
NotImplementedError: Raised if the method is not implemented in a subclass.
"""
error_message = (
"Job completion status check method is not implemented. All subclasses of the System class must implement "
"the 'is_job_completed' method to determine whether a job has finished execution."
)
logging.error(error_message)
raise NotImplementedError(error_message)

@abstractmethod
def kill(self, job: "BaseJob") -> None:
"""
Terminate a given job.

Args:
job (BaseJob): The job to be terminated.

Raises:
NotImplementedError: Raised if the method is not implemented in a subclass.
"""
error_message = (
"Job termination method is not implemented. All subclasses of the System class must implement the 'kill' "
"method to terminate a job that is currently running."
)
logging.error(error_message)
raise NotImplementedError(error_message)
23 changes: 21 additions & 2 deletions src/cloudai/_core/test_scenario_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,30 @@ def _create_section_test(self, section: str, test_info: Dict[str, Any]) -> Test:
f"reference from the test scenario schema."
)

test = copy.deepcopy(self.test_mapping[test_name])
test.test_template = self.test_mapping[test_name].test_template
original_test = self.test_mapping[test_name]

test = Test(
name=original_test.name,
description=original_test.description,
test_template=original_test.test_template,
env_vars=copy.deepcopy(original_test.env_vars),
cmd_args=copy.deepcopy(original_test.cmd_args),
extra_env_vars=copy.deepcopy(original_test.extra_env_vars),
extra_cmd_args=original_test.extra_cmd_args,
dependencies=copy.deepcopy(original_test.dependencies),
iterations=original_test.iterations,
num_nodes=original_test.num_nodes,
nodes=original_test.nodes,
sol=original_test.sol,
weight=original_test.weight,
ideal_perf=original_test.ideal_perf,
time_limit=original_test.time_limit,
)

test.section_name = section
test.num_nodes = int(test_info.get("num_nodes", 1))
test.nodes = test_info.get("nodes", [])

return test

def _parse_dependencies_for_test(
Expand Down
25 changes: 22 additions & 3 deletions src/cloudai/runner/slurm/slurm_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from cloudai import BaseJob
from pathlib import Path
from typing import Union

from cloudai import BaseJob, System, Test


class SlurmJob(BaseJob):
"""Represents a job in a Slurm environment."""
"""
A job class for execution on a Slurm system.

Attributes
id (Union[str, int]): The unique identifier of the job.
"""

def __init__(self, mode: str, system: System, test: Test, job_id: Union[str, int], output_path: Path):
BaseJob.__init__(self, mode, system, test, output_path)
self.id = job_id

def __repr__(self) -> str:
"""
Return a string representation of the SlurmJob instance.

pass
Returns
str: String representation of the job.
"""
return f"SlurmJob(id={self.id}, test={self.test.name})"
Loading