From 9ac1c8c4fcdd33f5764b20aecbba551b70cca76e Mon Sep 17 00:00:00 2001 From: bood Date: Mon, 9 Dec 2024 09:51:31 +0800 Subject: [PATCH] fix: mark PodGroup completed when pod fails Signed-off-by: bood --- pkg/scheduler/api/helpers.go | 10 +++++ pkg/scheduler/framework/session.go | 6 +-- test/e2e/schedulingbase/job_scheduling.go | 23 +++++++++++ test/e2e/util/deployment.go | 49 ++++++++++++++++++++++- 4 files changed, 84 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/api/helpers.go b/pkg/scheduler/api/helpers.go index 00ccb0f43a..bc76ff01ac 100644 --- a/pkg/scheduler/api/helpers.go +++ b/pkg/scheduler/api/helpers.go @@ -80,6 +80,16 @@ func AllocatedStatus(status TaskStatus) bool { } } +// CompletedStatus checks whether the tasks are completed (regardless of failure or success) +func CompletedStatus(status TaskStatus) bool { + switch status { + case Failed, Succeeded: + return true + default: + return false + } +} + // MergeErrors is used to merge multiple errors into single error func MergeErrors(errs ...error) error { msg := "errors: " diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index e28a08345e..dd5ef2b275 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -333,7 +333,7 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus { } else { allocated := 0 for status, tasks := range jobInfo.TaskStatusIndex { - if api.AllocatedStatus(status) || status == api.Succeeded { + if api.AllocatedStatus(status) || api.CompletedStatus(status) { allocated += len(tasks) } } @@ -341,8 +341,8 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) scheduling.PodGroupStatus { // If there're enough allocated resource, it's running if int32(allocated) >= jobInfo.PodGroup.Spec.MinMember { status.Phase = scheduling.PodGroupRunning - // If all allocated tasks is succeeded, it's completed - if len(jobInfo.TaskStatusIndex[api.Succeeded]) == allocated { + // If all allocated tasks is succeeded or failed, it's completed + if len(jobInfo.TaskStatusIndex[api.Succeeded])+len(jobInfo.TaskStatusIndex[api.Failed]) == allocated { status.Phase = scheduling.PodGroupCompleted } } else if jobInfo.PodGroup.Status.Phase != scheduling.PodGroupInqueue { diff --git a/test/e2e/schedulingbase/job_scheduling.go b/test/e2e/schedulingbase/job_scheduling.go index 5cb1ecb07f..64a860ea37 100644 --- a/test/e2e/schedulingbase/job_scheduling.go +++ b/test/e2e/schedulingbase/job_scheduling.go @@ -594,4 +594,27 @@ var _ = Describe("Job E2E Test", func() { Expect(pgPhase).To(Equal(vcscheduling.PodGroupCompleted), "podGroup Phase is %s, should be %s", ctx.Namespace, vcscheduling.PodGroupCompleted) }) + + It("PodGroup's Phase should be Completed when Job fails", func() { + ctx := e2eutil.InitTestContext(e2eutil.Options{}) + defer e2eutil.CleanupTestContext(ctx) + + jb := e2eutil.CreateFailK8sJob(ctx, "job1", e2eutil.DefaultNginxImage, e2eutil.OneCPU) + err := e2eutil.Waitk8sJobCompleted(ctx, jb.Name) + Expect(err).NotTo(HaveOccurred()) + + var pgPhase vcscheduling.PodGroupPhase + wait.Poll(time.Second, time.Second*30, func() (bool, error) { + pgs, err := ctx.Vcclient.SchedulingV1beta1().PodGroups(ctx.Namespace).List(context.TODO(), metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred(), "failed to list podGroups in namespace %s", ctx.Namespace) + Expect(len(pgs.Items)).To(Equal(1), "this test need a clean cluster") + pgPhase = pgs.Items[0].Status.Phase + if pgPhase != vcscheduling.PodGroupRunning { + return true, nil + } + return false, nil + }) + Expect(pgPhase).To(Equal(vcscheduling.PodGroupCompleted), "podGroup Phase is %s, should be %s", + ctx.Namespace, vcscheduling.PodGroupCompleted) + }) }) diff --git a/test/e2e/util/deployment.go b/test/e2e/util/deployment.go index dc05282c81..914773d951 100644 --- a/test/e2e/util/deployment.go +++ b/test/e2e/util/deployment.go @@ -159,6 +159,53 @@ func CreateSampleK8sJob(ctx *TestContext, name string, img string, req v1.Resour return jb } +// CreateFailK8sJob creates a new k8s job that fails +func CreateFailK8sJob(ctx *TestContext, name string, img string, req v1.ResourceList) *batchv1.Job { + k8sjobname := "job.k8s.io" + defaultTrue := true + // no retries to save time + defaultBackoffLimit := int32(0) + j := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: batchv1.JobSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + k8sjobname: name, + }, + }, + ManualSelector: &defaultTrue, + BackoffLimit: &defaultBackoffLimit, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{k8sjobname: name}, + }, + Spec: v1.PodSpec{ + SchedulerName: "volcano", + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Image: img, + Name: name, + Command: []string{"/bin/sh", "-c", "sleep 10 && exit 1"}, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: v1.ResourceRequirements{ + Requests: req, + }, + }, + }, + }, + }, + }, + } + + jb, err := ctx.Kubeclient.BatchV1().Jobs(ctx.Namespace).Create(context.TODO(), j, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred(), "failed to create k8sjob %s", name) + + return jb +} + func k8sjobCompleted(ctx *TestContext, name string) wait.ConditionFunc { return func() (bool, error) { jb, err := ctx.Kubeclient.BatchV1().Jobs(ctx.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) @@ -173,7 +220,7 @@ func k8sjobCompleted(ctx *TestContext, name string) wait.ConditionFunc { if !labelSelector.Matches(labels.Set(pod.Labels)) { continue } - if pod.Status.Phase == v1.PodSucceeded { + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { return true, nil } }