Skip to content

Commit

Permalink
[Core] Avoid PENDING job to be set to FAILED and speed up job schedul…
Browse files Browse the repository at this point in the history
…ing (#4264)

* fix race condition for setting job status to FAILED during INIT

* Fix

* fix

* format

* Add smoke tests

* revert pending submit

* remove update entirely for the job schedule step

* wait for job 32 to finish

* fix smoke

* move and rename

* Add comment

* minor
  • Loading branch information
Michaelvll authored Nov 7, 2024
1 parent 654ed4a commit fe2ce9a
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 36 deletions.
6 changes: 3 additions & 3 deletions examples/multi_echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ def run(cluster: Optional[str] = None, cloud: Optional[str] = None):

# Submit multiple tasks in parallel to trigger queueing behaviors.
def _exec(i):
task = sky.Task(run=f'echo {i}; sleep 5')
resources = sky.Resources(accelerators={'T4': 0.5})
task = sky.Task(run=f'echo {i}; sleep 60')
resources = sky.Resources(accelerators={'T4': 0.05})
task.set_resources(resources)
sky.exec(task, cluster_name=cluster, detach_run=True)

with pool.ThreadPool(8) as p:
list(p.imap(_exec, range(32)))
list(p.imap(_exec, range(150)))


if __name__ == '__main__':
Expand Down
86 changes: 55 additions & 31 deletions sky/skylet/job_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,19 @@ def _run_job(self, job_id: int, run_cmd: str):
subprocess.Popen(run_cmd, shell=True, stdout=subprocess.DEVNULL)

def schedule_step(self, force_update_jobs: bool = False) -> None:
jobs = self._get_jobs()
if len(jobs) > 0 or force_update_jobs:
if force_update_jobs:
update_status()
pending_jobs = self._get_pending_jobs()
# TODO(zhwu, mraheja): One optimization can be allowing more than one
# job staying in the pending state after ray job submit, so that to be
# faster to schedule a large amount of jobs.
for job_id, run_cmd, submit, created_time in jobs:
for job_id, run_cmd, submit, created_time in pending_jobs:
with filelock.FileLock(_get_lock_path(job_id)):
# We don't have to refresh the job status before checking, as
# the job status will only be stale in rare cases where ray job
# crashes; or the job stays in INIT state for a long time.
# In those cases, the periodic JobSchedulerEvent event will
# update the job status every 300 seconds.
status = get_status_no_lock(job_id)
if (status not in _PRE_RESOURCE_STATUSES or
created_time < psutil.boot_time()):
Expand All @@ -202,7 +207,7 @@ def schedule_step(self, force_update_jobs: bool = False) -> None:
self._run_job(job_id, run_cmd)
return

def _get_jobs(self) -> List[Tuple[int, str, int, int]]:
def _get_pending_jobs(self) -> List[Tuple[int, str, int, int]]:
"""Returns the metadata for jobs in the pending jobs table
The information contains job_id, run command, submit time,
Expand All @@ -214,7 +219,7 @@ def _get_jobs(self) -> List[Tuple[int, str, int, int]]:
class FIFOScheduler(JobScheduler):
"""First in first out job scheduler"""

def _get_jobs(self) -> List[Tuple[int, str, int, int]]:
def _get_pending_jobs(self) -> List[Tuple[int, str, int, int]]:
return list(
_CURSOR.execute('SELECT * FROM pending_jobs ORDER BY job_id'))

Expand Down Expand Up @@ -534,41 +539,62 @@ def update_job_status(job_ids: List[int],
This function should only be run on the remote instance with ray>=2.4.0.
"""
echo = logger.info if not silent else logger.debug
if len(job_ids) == 0:
return []

# TODO: if too slow, directly query against redis.
ray_job_ids = [make_ray_job_id(job_id) for job_id in job_ids]

job_client = _create_ray_job_submission_client()

# In ray 2.4.0, job_client.list_jobs returns a list of JobDetails,
# which contains the job status (str) and submission_id (str).
ray_job_query_time = time.time()
job_detail_lists: List['ray_pydantic.JobDetails'] = job_client.list_jobs()

job_details = {}
ray_job_ids_set = set(ray_job_ids)
for job_detail in job_detail_lists:
if job_detail.submission_id in ray_job_ids_set:
job_details[job_detail.submission_id] = job_detail

statuses = []
for job_id, ray_job_id in zip(job_ids, ray_job_ids):
# Per-job status lock is required because between the job status
# query and the job status update, the job status in the databse
# can be modified by the generated ray program.
with filelock.FileLock(_get_lock_path(job_id)):
status = None
if ray_job_id in job_details:
ray_status = job_details[ray_job_id].status
status = _RAY_TO_JOB_STATUS_MAP[ray_status]
job_record = _get_jobs_by_ids([job_id])[0]
original_status = job_record['status']
job_submitted_at = job_record['submitted_at']

ray_job_query_time = time.time()
if original_status == JobStatus.INIT:
if (job_submitted_at >= psutil.boot_time() and job_submitted_at
>= ray_job_query_time - _PENDING_SUBMIT_GRACE_PERIOD):
# The job id is reserved, but the job is not submitted yet.
# We should keep it in INIT.
status = JobStatus.INIT
else:
# We always immediately submit job after the job id is
# allocated, i.e. INIT -> PENDING, if a job stays in INIT
# for too long, it is likely the job submission process
# was killed before the job is submitted. We should set it
# to FAILED then. Note, if ray job indicates the job is
# running, we will change status to PENDING below.
echo(f'INIT job {job_id} is stale, setting to FAILED')
status = JobStatus.FAILED

try:
# Querying status within the lock is safer than querying
# outside, as it avoids the race condition when job table is
# updated after the ray job status query.
# Also, getting per-job status is faster than querying all jobs,
# when there are significant number of finished jobs.
# Reference: getting 124 finished jobs takes 0.038s, while
# querying a single job takes 0.006s, 10 jobs takes 0.066s.
# TODO: if too slow, directly query against redis.
ray_job_status = job_client.get_job_status(ray_job_id)
status = _RAY_TO_JOB_STATUS_MAP[ray_job_status.value]
except RuntimeError:
# Job not found.
pass

pending_job = _get_pending_job(job_id)
if pending_job is not None:
if pending_job['created_time'] < psutil.boot_time():
logger.info(f'Job {job_id} is stale, setting to FAILED: '
f'created_time={pending_job["created_time"]}, '
f'boot_time={psutil.boot_time()}')
echo(f'Job {job_id} is stale, setting to FAILED: '
f'created_time={pending_job["created_time"]}, '
f'boot_time={psutil.boot_time()}')
# The job is stale as it is created before the instance
# is booted, e.g. the instance is rebooted.
status = JobStatus.FAILED
Expand All @@ -583,22 +609,20 @@ def update_job_status(job_ids: List[int],
# as stale.
status = JobStatus.PENDING

original_status = get_status_no_lock(job_id)
assert original_status is not None, (job_id, status)
if status is None:
status = original_status
if (original_status is not None and
not original_status.is_terminal()):
logger.info(f'Ray job status for job {job_id} is None, '
'setting it to FAILED.')
echo(f'Ray job status for job {job_id} is None, '
'setting it to FAILED.')
# The job may be stale, when the instance is restarted
# (the ray redis is volatile). We need to reset the
# status of the task to FAILED if its original status
# is RUNNING or PENDING.
status = JobStatus.FAILED
_set_status_no_lock(job_id, status)
if not silent:
logger.info(f'Updated job {job_id} status to {status}')
echo(f'Updated job {job_id} status to {status}')
else:
# Taking max of the status is necessary because:
# 1. It avoids race condition, where the original status has
Expand All @@ -611,10 +635,10 @@ def update_job_status(job_ids: List[int],
# DB) would already have that value. So we take the max here to
# keep it at later status.
status = max(status, original_status)
assert status is not None, (job_id, status, original_status)
if status != original_status: # Prevents redundant update.
_set_status_no_lock(job_id, status)
if not silent:
logger.info(f'Updated job {job_id} status to {status}')
echo(f'Updated job {job_id} status to {status}')
statuses.append(status)
return statuses

Expand Down
13 changes: 12 additions & 1 deletion tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -1813,7 +1813,18 @@ def test_multi_echo(generic_cloud: str):
'multi_echo',
[
f'python examples/multi_echo.py {name} {generic_cloud}',
'sleep 120',
f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "FAILED" && exit 1 || true',
'sleep 10',
f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "FAILED" && exit 1 || true',
'sleep 30',
f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "FAILED" && exit 1 || true',
'sleep 30',
# Make sure that our job scheduler is fast enough to have at least
# 10 RUNNING jobs in parallel.
f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "RUNNING" | wc -l | awk \'{{if ($1 < 10) exit 1}}\'',
'sleep 30',
f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep "FAILED" && exit 1 || true',
f'until sky logs {name} 32 --status; do echo "Waiting for job 32 to finish..."; sleep 1; done',
] +
# Ensure jobs succeeded.
[f'sky logs {name} {i + 1} --status' for i in range(32)] +
Expand Down
2 changes: 1 addition & 1 deletion tests/test_yamls/pipeline_aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: a

resources:
cloud: aws
region: us-west-2
region: us-east-2
cpus: 2+

run: |
Expand Down

0 comments on commit fe2ce9a

Please sign in to comment.