Skip to content

Commit

Permalink
Fix job active check in the gretel k8s agent
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 174eac80ae57a7482b217cb6700b26810f29b35a
  • Loading branch information
misberner authored and mckornfield committed Aug 31, 2023
1 parent 0769919 commit 79c3533
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 7 deletions.
19 changes: 14 additions & 5 deletions src/gretel_client/agents/drivers/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,24 @@ def _delete_kubernetes_job(self, job: Optional[client.V1Job]):
) from ex

def _is_job_active(self, job: client.V1Job) -> bool:
job_resp: client.V1Job = self._batch_api.read_namespaced_job(
job_resp: Optional[client.V1Job] = self._batch_api.read_namespaced_job(
job.metadata.name, namespace=self._gretel_worker_namespace
)
if not job_resp:
return False
status: client.V1JobStatus = job_resp.status
if status and status and status.active is not None:
return status.active > 0
return False
status: Optional[client.V1JobStatus] = job_resp.status
# If a job doesn't have a status, this can only happen because it doesn't have
# a status *yet*, and a job about to launch should be treated as active.
if not status:
return True
# Only the job conditions provide an authoritative answer whether or not the job
# has terminated. A job otherwise can have some failed attempts and no active
# attempts in spite of still being active, e.g., in between retries.
return all(
not cond.status
for cond in (status.conditions or [])
if cond.type in ("Complete", "Failed")
)


class KubernetesDriverDaemon:
Expand Down
41 changes: 39 additions & 2 deletions tests/gretel_client/test_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
V1ConfigMapVolumeSource,
V1Container,
V1Job,
V1JobCondition,
V1JobSpec,
V1JobStatus,
V1LocalObjectReference,
Expand Down Expand Up @@ -394,8 +395,22 @@ def test_cpu_count_set_properly(self):

def test_is_job_active_true_then_false(self):
self.batch_api.read_namespaced_job.side_effect = [
V1Job(status=V1JobStatus(active=1)),
V1Job(status=V1JobStatus(active=0)),
V1Job(
status=V1JobStatus(
active=1,
conditions=[
V1JobCondition(type="Complete", status=False),
],
)
),
V1Job(
status=V1JobStatus(
active=0,
conditions=[
V1JobCondition(type="Complete", status=True),
],
)
),
]
self.assertTrue(self.driver.active(self.k8s_job))
self.assertFalse(self.driver.active(self.k8s_job))
Expand All @@ -414,6 +429,28 @@ def test_is_job_active_true_then_empty_status(self):
V1Job(status=V1JobStatus()),
]
self.assertTrue(self.driver.active(self.k8s_job))
self.assertTrue(self.driver.active(self.k8s_job))

def test_is_job_active_true_then_failed(self):
self.batch_api.read_namespaced_job.side_effect = [
V1Job(
status=V1JobStatus(
conditions=[
V1JobCondition(type="Complete", status=False),
V1JobCondition(type="Failed", status=False),
],
)
),
V1Job(
status=V1JobStatus(
conditions=[
V1JobCondition(type="Complete", status=False),
V1JobCondition(type="Failed", status=True),
],
)
),
]
self.assertTrue(self.driver.active(self.k8s_job))
self.assertFalse(self.driver.active(self.k8s_job))

def test_delete_job_successful_clean(self):
Expand Down

0 comments on commit 79c3533

Please sign in to comment.