Skip to content

Commit

Permalink
Reflect Andrei's comments
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Maslennikov <andreyma@nvidia.com>
  • Loading branch information
TaekyungHeo and amaslenn committed Aug 29, 2024
1 parent 9841649 commit 82aeaa9
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 102 deletions.
72 changes: 9 additions & 63 deletions src/cloudai/_core/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Union

from .system import System
from .test import Test


class BaseJob(ABC):
class BaseJob:
"""
Base class for representing a job created by executing a test.
Attributes
id (Union[str, int]): The unique identifier of the job.
mode (str): The mode of the job (e.g., 'run', 'dry-run').
system (System): The system in which the job is running.
test (Test): The test instance associated with this job.
Expand All @@ -44,66 +44,12 @@ def __init__(self, mode: str, system: System, test: Test, output_path: Path):
test (Test): The test instance associated with the job.
output_path (Path): The path where the job's output is stored.
"""
self.mode = mode
self.system = system
self.test = test
self.output_path = output_path
self.terminated_by_dependency = False

def get_mode(self) -> str:
"""
Retrieve the mode of the job.
Returns
str: The mode of the job.
"""
return self.mode

def get_system(self) -> System:
"""
Get the system in which the job is running.
Returns
System: The system associated with the job.
"""
return self.system

def get_test(self) -> Test:
"""
Get the test instance associated with this job.
Returns
Test: The test instance associated with the job.
"""
return self.test

def get_output_path(self) -> Path:
"""
Retrieve the path where the job's output is stored.
Returns
Path: The path to the job's output directory.
"""
return self.output_path

def is_terminated_by_dependency(self) -> bool:
"""
Check if the job was terminated due to a dependency.
Returns
bool: True if the job was terminated by a dependency, False otherwise.
"""
return self.terminated_by_dependency

@abstractmethod
def get_id(self) -> Union[str, int]:
"""
Abstract method to retrieve the unique identifier of the job.
Returns
Union[str, int]: The unique identifier of the job.
"""
pass
self.id: Union[str, int] = 0
self.mode: str = mode
self.system: System = system
self.test: Test = test
self.output_path: Path = output_path
self.terminated_by_dependency: bool = False

def is_running(self) -> bool:
"""
Expand Down Expand Up @@ -138,4 +84,4 @@ def __repr__(self) -> str:
Returns
str: String representation of the job.
"""
return f"BaseJob(mode={self.mode}, system={self.system.name}, test={self.test.name})"
return f"BaseJob(id={self.id}, mode={self.mode}, system={self.system.name}, test={self.test.name})"
6 changes: 3 additions & 3 deletions src/cloudai/_core/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ async def monitor_jobs(self) -> int:
await self.handle_job_completion(job)
else:
error_message = (
f"Job {job.get_id()} for test {job.test.section_name} failed: "
f"Job {job.id} for test {job.test.section_name} failed: "
f"{job_status_result.error_message}"
)
logging.error(error_message)
Expand All @@ -308,7 +308,7 @@ async def monitor_jobs(self) -> int:
job_status_result = self.get_job_status(job)
if not job_status_result.is_successful:
error_message = (
f"Job {job.get_id()} for test {job.test.section_name} failed: "
f"Job {job.id} for test {job.test.section_name} failed: "
f"{job_status_result.error_message}"
)
logging.error(error_message)
Expand Down Expand Up @@ -385,7 +385,7 @@ async def delayed_kill_job(self, job: BaseJob, delay: int = 0):
job (BaseJob): The job to be terminated.
delay (int): Delay in seconds after which the job should be terminated.
"""
logging.info(f"Scheduling termination of job {job.get_id()} after {delay} seconds.")
logging.info(f"Scheduling termination of job {job.id} after {delay} seconds.")
await asyncio.sleep(delay)
job.terminated_by_dependency = True
self.system.kill(job)
15 changes: 3 additions & 12 deletions src/cloudai/runner/slurm/slurm_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,9 @@ class SlurmJob(BaseJob):
id (Union[str, int]): The unique identifier of the job.
"""

def __init__(self, mode: str, system: System, test: Test, id: Union[str, int], output_path: Path):
def __init__(self, mode: str, system: System, test: Test, job_id: Union[str, int], output_path: Path):
BaseJob.__init__(self, mode, system, test, output_path)
self.id = id

def get_id(self) -> Union[str, int]:
"""
Retrieve the unique identifier of the job.
Returns
Union[str, int]: The unique identifier of the job.
"""
return self.id
self.id = job_id

def __repr__(self) -> str:
"""
Expand All @@ -48,4 +39,4 @@ def __repr__(self) -> str:
Returns
str: String representation of the job.
"""
return f"SlurmJob(id={self.get_id()}, test={self.test.name})"
return f"SlurmJob(id={self.id}, test={self.test.name})"
15 changes: 3 additions & 12 deletions src/cloudai/runner/standalone/standalone_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,9 @@ class StandaloneJob(BaseJob):
id (Union[str, int]): The unique identifier of the job.
"""

def __init__(self, mode: str, system: System, test: Test, id: Union[str, int], output_path: Path):
def __init__(self, mode: str, system: System, test: Test, job_id: Union[str, int], output_path: Path):
BaseJob.__init__(self, mode, system, test, output_path)
self.id = id

def get_id(self) -> Union[str, int]:
"""
Retrieve the unique identifier of the job.
Returns
Union[str, int]: The unique identifier of the job.
"""
return self.id
self.id = job_id

def __repr__(self) -> str:
"""
Expand All @@ -48,4 +39,4 @@ def __repr__(self) -> str:
Returns
str: String representation of the job.
"""
return f"StandaloneJob(id={self.get_id()}, test={self.test.name})"
return f"StandaloneJob(id={self.id}, test={self.test.name})"
11 changes: 5 additions & 6 deletions src/cloudai/systems/slurm/slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def is_job_running(self, job: BaseJob, retry_threshold: int = 3) -> bool:
cannot be determined after the specified number of retries.
"""
retry_count = 0
command = f"squeue -j {job.get_id()} --noheader --format=%T"
command = f"squeue -j {job.id} --noheader --format=%T"

while retry_count < retry_threshold:
logging.debug(f"Executing command to check job status: {command}")
Expand Down Expand Up @@ -207,7 +207,7 @@ def is_job_completed(self, job: BaseJob, retry_threshold: int = 3) -> bool:
"""
retry_count = 0
while retry_count < retry_threshold:
command = f"squeue -j {job.get_id()}"
command = f"squeue -j {job.id}"
logging.debug(f"Checking job status with command: {command}")
stdout, stderr = self.cmd_shell.execute(command).communicate()

Expand All @@ -221,7 +221,7 @@ def is_job_completed(self, job: BaseJob, retry_threshold: int = 3) -> bool:
logging.error(error_message)
raise RuntimeError(error_message)

return str(job.get_id()) not in stdout
return str(job.id) not in stdout

error_message = f"Failed to confirm job completion status after {retry_threshold} attempts."
logging.error(error_message)
Expand All @@ -234,9 +234,8 @@ def kill(self, job: BaseJob) -> None:
Args:
job (BaseJob): The job to be terminated.
"""
job_id = job.get_id()
assert isinstance(job_id, int)
self.scancel(job_id)
assert isinstance(job.id, int)
self.scancel(job.id)

@classmethod
def parse_node_list(cls, node_list: str) -> List[str]:
Expand Down
10 changes: 5 additions & 5 deletions src/cloudai/systems/standalone_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ def is_job_running(self, job: BaseJob) -> bool:
Returns:
bool: True if the job is running, False otherwise.
"""
command = f"ps -p {job.get_id()}"
command = f"ps -p {job.id}"
logging.debug(f"Checking job status with command: {command}")
stdout = self.cmd_shell.execute(command).communicate()[0]

# Check if the job's PID is in the ps output
is_running = str(job.get_id()) in stdout
logging.debug(f"Job {job.get_id()} running status: {is_running}")
is_running = str(job.id) in stdout
logging.debug(f"Job {job.id} running status: {is_running}")

return is_running

Expand All @@ -96,6 +96,6 @@ def kill(self, job: BaseJob) -> None:
Args:
job (BaseJob): The job to be terminated.
"""
cmd = f"kill -9 {job.get_id()}"
logging.debug(f"Executing termination command for job {job.get_id()}: {cmd}")
cmd = f"kill -9 {job.id}"
logging.debug(f"Executing termination command for job {job.id}: {cmd}")
self.cmd_shell.execute(cmd)
2 changes: 1 addition & 1 deletion tests/test_standalone_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_kill_job(mock_execute, standalone_system, standalone_job):
mock_execute.return_value = mock_process

standalone_system.kill(standalone_job)
kill_command = f"kill -9 {standalone_job.get_id()}"
kill_command = f"kill -9 {standalone_job.id}"

mock_execute.assert_called_once_with(kill_command)

Expand Down

0 comments on commit 82aeaa9

Please sign in to comment.