Skip to content

Commit

Permalink
Merge pull request #3807 from bood/pg-completed-pod-error-case
Browse files Browse the repository at this point in the history
fix: mark PodGroup completed when pod fails
  • Loading branch information
volcano-sh-bot authored Dec 19, 2024
2 parents dca4160 + 9ac1c8c commit b0c1a56
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 4 deletions.
10 changes: 10 additions & 0 deletions pkg/scheduler/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ func AllocatedStatus(status TaskStatus) bool {
}
}

// CompletedStatus checks whether the tasks are completed (regardless of failure or success)
func CompletedStatus(status TaskStatus) bool {
switch status {
case Failed, Succeeded:
return true
default:
return false
}
}

// MergeErrors is used to merge multiple errors into single error
func MergeErrors(errs ...error) error {
msg := "errors: "
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,16 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus {
} else {
allocated := 0
for status, tasks := range jobInfo.TaskStatusIndex {
if api.AllocatedStatus(status) || status == api.Succeeded {
if api.AllocatedStatus(status) || api.CompletedStatus(status) {
allocated += len(tasks)
}
}

// If there're enough allocated resource, it's running
if int32(allocated) >= jobInfo.PodGroup.Spec.MinMember {
status.Phase = scheduling.PodGroupRunning
// If all allocated tasks is succeeded, it's completed
if len(jobInfo.TaskStatusIndex[api.Succeeded]) == allocated {
// If all allocated tasks is succeeded or failed, it's completed
if len(jobInfo.TaskStatusIndex[api.Succeeded])+len(jobInfo.TaskStatusIndex[api.Failed]) == allocated {
status.Phase = scheduling.PodGroupCompleted
}
} else if jobInfo.PodGroup.Status.Phase != scheduling.PodGroupInqueue {
Expand Down
23 changes: 23 additions & 0 deletions test/e2e/schedulingbase/job_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,4 +594,27 @@ var _ = Describe("Job E2E Test", func() {
Expect(pgPhase).To(Equal(vcscheduling.PodGroupCompleted), "podGroup Phase is %s, should be %s",
ctx.Namespace, vcscheduling.PodGroupCompleted)
})

It("PodGroup's Phase should be Completed when Job fails", func() {
ctx := e2eutil.InitTestContext(e2eutil.Options{})
defer e2eutil.CleanupTestContext(ctx)

jb := e2eutil.CreateFailK8sJob(ctx, "job1", e2eutil.DefaultNginxImage, e2eutil.OneCPU)
err := e2eutil.Waitk8sJobCompleted(ctx, jb.Name)
Expect(err).NotTo(HaveOccurred())

var pgPhase vcscheduling.PodGroupPhase
wait.Poll(time.Second, time.Second*30, func() (bool, error) {
pgs, err := ctx.Vcclient.SchedulingV1beta1().PodGroups(ctx.Namespace).List(context.TODO(), metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to list podGroups in namespace %s", ctx.Namespace)
Expect(len(pgs.Items)).To(Equal(1), "this test need a clean cluster")
pgPhase = pgs.Items[0].Status.Phase
if pgPhase != vcscheduling.PodGroupRunning {
return true, nil
}
return false, nil
})
Expect(pgPhase).To(Equal(vcscheduling.PodGroupCompleted), "podGroup Phase is %s, should be %s",
ctx.Namespace, vcscheduling.PodGroupCompleted)
})
})
49 changes: 48 additions & 1 deletion test/e2e/util/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,53 @@ func CreateSampleK8sJob(ctx *TestContext, name string, img string, req v1.Resour
return jb
}

// CreateFailK8sJob creates a new k8s job that fails
func CreateFailK8sJob(ctx *TestContext, name string, img string, req v1.ResourceList) *batchv1.Job {
k8sjobname := "job.k8s.io"
defaultTrue := true
// no retries to save time
defaultBackoffLimit := int32(0)
j := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
k8sjobname: name,
},
},
ManualSelector: &defaultTrue,
BackoffLimit: &defaultBackoffLimit,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{k8sjobname: name},
},
Spec: v1.PodSpec{
SchedulerName: "volcano",
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Image: img,
Name: name,
Command: []string{"/bin/sh", "-c", "sleep 10 && exit 1"},
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Requests: req,
},
},
},
},
},
},
}

jb, err := ctx.Kubeclient.BatchV1().Jobs(ctx.Namespace).Create(context.TODO(), j, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to create k8sjob %s", name)

return jb
}

func k8sjobCompleted(ctx *TestContext, name string) wait.ConditionFunc {
return func() (bool, error) {
jb, err := ctx.Kubeclient.BatchV1().Jobs(ctx.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
Expand All @@ -173,7 +220,7 @@ func k8sjobCompleted(ctx *TestContext, name string) wait.ConditionFunc {
if !labelSelector.Matches(labels.Set(pod.Labels)) {
continue
}
if pod.Status.Phase == v1.PodSucceeded {
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
return true, nil
}
}
Expand Down

0 comments on commit b0c1a56

Please sign in to comment.