diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index c004ca1dd..b47aa026a 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -475,7 +475,8 @@ func (sc *SchedulerCache) Snapshot() *arbapi.ClusterInfo { } if _, found := queues[value.Queue]; !found { - glog.V(3).Infof("The Queue of Job <%v> does not exist, ignore it.", value.UID) + glog.V(3).Infof("The Queue <%v> of Job <%v> does not exist, ignore it.", + value.Queue, value.UID) continue } diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 943d22bcc..5d54736c7 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -387,6 +387,11 @@ func (sc *SchedulerCache) AddPodGroup(obj interface{}) { sc.Mutex.Lock() defer sc.Mutex.Unlock() + // If namespace as queue, the `.spec.Queue` of PodGroup is ignored. + if sc.namespaceAsQueue { + ss.Spec.Queue = "" + } + glog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec) err := sc.setPodGroup(ss) if err != nil { diff --git a/test/e2e/job.go b/test/e2e/job.go index 028ca6801..5cd65e33b 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -28,8 +28,7 @@ var _ = Describe("Job E2E Test", func() { rep := clusterSize(context, oneCPU) _, pg := createJobEx(context, &jobSpec{ - name: "qj-1", - namespace: "test", + name: "qj-1", tasks: []taskSpec{ { img: "busybox", @@ -51,7 +50,6 @@ var _ = Describe("Job E2E Test", func() { rep := clusterSize(context, oneCPU) job := &jobSpec{ - namespace: "test", tasks: []taskSpec{ { img: "busybox", @@ -151,7 +149,6 @@ var _ = Describe("Job E2E Test", func() { rep := clusterSize(context, slot) job := &jobSpec{ - namespace: "test", tasks: []taskSpec{ { img: "nginx", @@ -184,7 +181,6 @@ var _ = Describe("Job E2E Test", func() { rep := clusterSize(context, slot) job := &jobSpec{ - namespace: "test", tasks: []taskSpec{ { img: "nginx", @@ -226,8 +222,7 @@ var _ = Describe("Job E2E Test", func() { rep := clusterSize(context, slot) job := &jobSpec{ - name: "test", - namespace: "test", + name: "test", tasks: []taskSpec{ { img: "nginx", diff --git a/test/e2e/predicates.go b/test/e2e/predicates.go index ff5a180c6..458f0ffc5 100644 --- a/test/e2e/predicates.go +++ b/test/e2e/predicates.go @@ -53,8 +53,7 @@ var _ = Describe("Predicates E2E Test", func() { } job := &jobSpec{ - name: "na-job", - namespace: "test", + name: "na-job", tasks: []taskSpec{ { img: "nginx", @@ -83,8 +82,7 @@ var _ = Describe("Predicates E2E Test", func() { nn := clusterNodeNumber(context) job := &jobSpec{ - name: "hp-job", - namespace: "test", + name: "hp-job", tasks: []taskSpec{ { img: "nginx", @@ -129,8 +127,7 @@ var _ = Describe("Predicates E2E Test", func() { } job := &jobSpec{ - name: "pa-job", - namespace: "test", + name: "pa-job", tasks: []taskSpec{ { img: "nginx", diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 5bde1022c..17691305c 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -24,19 +24,28 @@ import ( ) var _ = Describe("Predicates E2E Test", func() { - It("Reclaim", func() { context := initTestContext() defer cleanupTestContext(context) - jobName1 := "n1/qj-1" - jobName2 := "n2/qj-2" - slot := oneCPU rep := clusterSize(context, slot) - createJob(context, jobName1, 1, rep, "nginx", slot, nil, nil) - err := waitJobReady(context, jobName1) + job := &jobSpec{ + tasks: []taskSpec{ + { + img: "nginx", + req: slot, + min: 1, + rep: rep, + }, + }, + } + + job.name = "q1-qj-1" + job.queue = "q1" + _, pg1 := createJobEx(context, job) + err := waitPodGroupReady(context, pg1) Expect(err).NotTo(HaveOccurred()) expected := int(rep) / 2 @@ -48,11 +57,13 @@ var _ = Describe("Predicates E2E Test", func() { Expect(err).NotTo(HaveOccurred()) } - createJob(context, jobName2, 1, rep, "nginx", slot, nil, nil) - err = waitTasksReady(context, jobName2, expected) + job.name = "q2-qj-2" + job.queue = "q2" + _, pg2 := createJobEx(context, job) + err = waitTasksReadyEx(context, pg2, expected) Expect(err).NotTo(HaveOccurred()) - err = waitTasksReady(context, jobName1, expected) + err = waitTasksReadyEx(context, pg1, expected) Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 998bbd8e0..61bea8588 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -21,7 +21,6 @@ import ( "os" "path/filepath" "strconv" - "strings" "time" . "github.com/onsi/gomega" @@ -65,25 +64,17 @@ type context struct { kubeclient *kubernetes.Clientset karclient *versioned.Clientset - namespaces []string + namespace string queues []string enableNamespaceAsQueue bool } -func splictJobName(cxt *context, jn string) (string, string, string) { - nss := strings.Split(jn, "/") - if len(nss) == 1 { - return cxt.namespaces[0], nss[0], cxt.namespaces[0] - } - if !cxt.enableNamespaceAsQueue { - return cxt.namespaces[0], nss[1], nss[0] - } - return nss[0], nss[1], nss[0] -} - func initTestContext() *context { enableNamespaceAsQueue, _ := strconv.ParseBool(os.Getenv("ENABLE_NAMESPACES_AS_QUEUE")) - cxt := &context{} + cxt := &context{ + namespace: "test", + queues: []string{"q1", "q2"}, + } home := homeDir() Expect(home).NotTo(Equal("")) @@ -94,36 +85,16 @@ func initTestContext() *context { cxt.karclient = versioned.NewForConfigOrDie(config) cxt.kubeclient = kubernetes.NewForConfigOrDie(config) - if enableNamespaceAsQueue { - cxt.namespaces = []string{"test", "n1", "n2"} - } else { - cxt.namespaces = []string{"test"} - cxt.queues = []string{"test", "n1", "n2"} - } cxt.enableNamespaceAsQueue = enableNamespaceAsQueue - for _, ns := range cxt.namespaces { - _, err = cxt.kubeclient.CoreV1().Namespaces().Create(&v1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: ns, - Namespace: ns, - }, - }) - Expect(err).NotTo(HaveOccurred()) - } - - for _, q := range cxt.queues { - _, err = cxt.karclient.Scheduling().Queues().Create(&arbv1.Queue{ - ObjectMeta: metav1.ObjectMeta{ - Name: q, - }, - Spec: arbv1.QueueSpec{ - Weight: 1, - }, - }) + _, err = cxt.kubeclient.CoreV1().Namespaces().Create(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: cxt.namespace, + }, + }) + Expect(err).NotTo(HaveOccurred()) - Expect(err).NotTo(HaveOccurred()) - } + createQueues(cxt) _, err = cxt.kubeclient.SchedulingV1beta1().PriorityClasses().Create(&schedv1.PriorityClass{ ObjectMeta: metav1.ObjectMeta{ @@ -148,11 +119,9 @@ func initTestContext() *context { func namespaceNotExist(ctx *context) wait.ConditionFunc { return func() (bool, error) { - for _, ns := range ctx.namespaces { - _, err := ctx.kubeclient.CoreV1().Namespaces().Get(ns, metav1.GetOptions{}) - if !(err != nil && errors.IsNotFound(err)) { - return false, err - } + _, err := ctx.kubeclient.CoreV1().Namespaces().Get(ctx.namespace, metav1.GetOptions{}) + if !(err != nil && errors.IsNotFound(err)) { + return false, err } return true, nil } @@ -161,7 +130,13 @@ func namespaceNotExist(ctx *context) wait.ConditionFunc { func queueNotExist(ctx *context) wait.ConditionFunc { return func() (bool, error) { for _, q := range ctx.queues { - _, err := ctx.karclient.Scheduling().Queues().Get(q, metav1.GetOptions{}) + var err error + if ctx.enableNamespaceAsQueue { + _, err = ctx.kubeclient.CoreV1().Namespaces().Get(q, metav1.GetOptions{}) + } else { + _, err = ctx.karclient.Scheduling().Queues().Get(q, metav1.GetOptions{}) + } + if !(err != nil && errors.IsNotFound(err)) { return false, err } @@ -174,21 +149,14 @@ func queueNotExist(ctx *context) wait.ConditionFunc { func cleanupTestContext(cxt *context) { foreground := metav1.DeletePropagationForeground - for _, ns := range cxt.namespaces { - err := cxt.kubeclient.CoreV1().Namespaces().Delete(ns, &metav1.DeleteOptions{ - PropagationPolicy: &foreground, - }) - Expect(err).NotTo(HaveOccurred()) - } + err := cxt.kubeclient.CoreV1().Namespaces().Delete(cxt.namespace, &metav1.DeleteOptions{ + PropagationPolicy: &foreground, + }) + Expect(err).NotTo(HaveOccurred()) - for _, q := range cxt.queues { - err := cxt.karclient.Scheduling().Queues().Delete(q, &metav1.DeleteOptions{ - PropagationPolicy: &foreground, - }) - Expect(err).NotTo(HaveOccurred()) - } + deleteQueues(cxt) - err := cxt.kubeclient.SchedulingV1beta1().PriorityClasses().Delete(masterPriority, &metav1.DeleteOptions{ + err = cxt.kubeclient.SchedulingV1beta1().PriorityClasses().Delete(masterPriority, &metav1.DeleteOptions{ PropagationPolicy: &foreground, }) Expect(err).NotTo(HaveOccurred()) @@ -207,6 +175,72 @@ func cleanupTestContext(cxt *context) { Expect(err).NotTo(HaveOccurred()) } +func createQueues(cxt *context) { + var err error + + for _, q := range cxt.queues { + if cxt.enableNamespaceAsQueue { + _, err = cxt.kubeclient.CoreV1().Namespaces().Create(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: q, + }, + }) + } else { + _, err = cxt.karclient.Scheduling().Queues().Create(&arbv1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: q, + }, + Spec: arbv1.QueueSpec{ + Weight: 1, + }, + }) + } + + Expect(err).NotTo(HaveOccurred()) + } + + if !cxt.enableNamespaceAsQueue { + _, err := cxt.karclient.Scheduling().Queues().Create(&arbv1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: cxt.namespace, + }, + Spec: arbv1.QueueSpec{ + Weight: 1, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + } +} + +func deleteQueues(cxt *context) { + foreground := metav1.DeletePropagationForeground + + for _, q := range cxt.queues { + var err error + + if cxt.enableNamespaceAsQueue { + err = cxt.kubeclient.CoreV1().Namespaces().Delete(q, &metav1.DeleteOptions{ + PropagationPolicy: &foreground, + }) + } else { + err = cxt.karclient.Scheduling().Queues().Delete(q, &metav1.DeleteOptions{ + PropagationPolicy: &foreground, + }) + } + + Expect(err).NotTo(HaveOccurred()) + } + + if !cxt.enableNamespaceAsQueue { + err := cxt.karclient.Scheduling().Queues().Delete(cxt.namespace, &metav1.DeleteOptions{ + PropagationPolicy: &foreground, + }) + + Expect(err).NotTo(HaveOccurred()) + } +} + type taskSpec struct { min, rep int32 img string @@ -223,16 +257,32 @@ type jobSpec struct { tasks []taskSpec } +func getNS(context *context, job *jobSpec) string { + if len(job.namespace) != 0 { + return job.namespace + } + + if context.enableNamespaceAsQueue { + if len(job.queue) != 0 { + return job.queue + } + } + + return context.namespace +} + func createJobEx(context *context, job *jobSpec) ([]*batchv1.Job, *arbv1.PodGroup) { var jobs []*batchv1.Job var podgroup *arbv1.PodGroup var min int32 + ns := getNS(context, job) + for i, task := range job.tasks { job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%d", job.name, i), - Namespace: job.namespace, + Namespace: ns, }, Spec: batchv1.JobSpec{ Parallelism: &task.rep, @@ -262,7 +312,7 @@ func createJobEx(context *context, job *jobSpec) ([]*batchv1.Job, *arbv1.PodGrou pg := &arbv1.PodGroup{ ObjectMeta: metav1.ObjectMeta{ Name: job.name, - Namespace: job.namespace, + Namespace: ns, }, Spec: arbv1.PodGroupSpec{ MinMember: min, @@ -347,19 +397,6 @@ func waitPodGroupUnschedulable(ctx *context, pg *arbv1.PodGroup) error { return wait.Poll(10*time.Second, oneMinute, podGroupUnschedulable(ctx, pg, now)) } -func createJob( - context *context, - name string, - min, rep int32, - img string, - req v1.ResourceList, - affinity *v1.Affinity, - labels map[string]string, -) *batchv1.Job { - containers := createContainers(img, req, 0) - return createJobWithOptions(context, "kube-batch", name, min, rep, affinity, labels, containers) -} - func createContainers(img string, req v1.ResourceList, hostport int32) []v1.Container { container := v1.Container{ Image: img, @@ -382,71 +419,12 @@ func createContainers(img string, req v1.ResourceList, hostport int32) []v1.Cont return []v1.Container{container} } -func createJobWithOptions(context *context, - scheduler string, - name string, - min, rep int32, - affinity *v1.Affinity, - labels map[string]string, - containers []v1.Container, -) *batchv1.Job { - queueJobName := "queuejob.k8s.io" - jns, jn, jq := splictJobName(context, name) - - podLabels := map[string]string{queueJobName: jn} - for k, v := range labels { - podLabels[k] = v - } - - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: jn, - Namespace: jns, - }, - Spec: batchv1.JobSpec{ - Parallelism: &rep, - Completions: &rep, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: podLabels, - Annotations: map[string]string{arbv1.GroupNameAnnotationKey: jn}, - }, - Spec: v1.PodSpec{ - SchedulerName: scheduler, - RestartPolicy: v1.RestartPolicyNever, - Containers: containers, - Affinity: affinity, - }, - }, - }, - } - - pg := &arbv1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: jn, - Namespace: jns, - }, - Spec: arbv1.PodGroupSpec{ - MinMember: min, - Queue: jq, - }, - } - - job, err := context.kubeclient.BatchV1().Jobs(job.Namespace).Create(job) - Expect(err).NotTo(HaveOccurred()) - - pg, err = context.karclient.Scheduling().PodGroups(job.Namespace).Create(pg) - Expect(err).NotTo(HaveOccurred()) - - return job -} - func createReplicaSet(context *context, name string, rep int32, img string, req v1.ResourceList) *appv1.ReplicaSet { deploymentName := "deployment.k8s.io" deployment := &appv1.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: context.namespaces[0], + Namespace: context.namespace, }, Spec: appv1.ReplicaSetSpec{ Replicas: &rep, @@ -476,7 +454,7 @@ func createReplicaSet(context *context, name string, rep int32, img string, req }, } - deployment, err := context.kubeclient.AppsV1().ReplicaSets(context.namespaces[0]).Create(deployment) + deployment, err := context.kubeclient.AppsV1().ReplicaSets(context.namespace).Create(deployment) Expect(err).NotTo(HaveOccurred()) return deployment @@ -484,116 +462,17 @@ func createReplicaSet(context *context, name string, rep int32, img string, req func deleteReplicaSet(ctx *context, name string) error { foreground := metav1.DeletePropagationForeground - return ctx.kubeclient.AppsV1().ReplicaSets(ctx.namespaces[0]).Delete(name, &metav1.DeleteOptions{ + return ctx.kubeclient.AppsV1().ReplicaSets(ctx.namespace).Delete(name, &metav1.DeleteOptions{ PropagationPolicy: &foreground, }) } -func taskReady(ctx *context, jobName string, taskNum int) wait.ConditionFunc { - jns, jn, _ := splictJobName(ctx, jobName) - - return func() (bool, error) { - queueJob, err := ctx.kubeclient.BatchV1().Jobs(jns).Get(jn, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - - pods, err := ctx.kubeclient.CoreV1().Pods(jns).List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - - pg, err := ctx.karclient.Scheduling().PodGroups(jns).Get(jn, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - - readyTaskNum := 0 - for _, pod := range pods.Items { - labelSelector := labels.SelectorFromSet(queueJob.Spec.Selector.MatchLabels) - if !labelSelector.Matches(labels.Set(pod.Labels)) || - !metav1.IsControlledBy(&pod, queueJob) { - continue - } - if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodSucceeded { - readyTaskNum++ - } - } - - if taskNum < 0 { - taskNum = int(pg.Spec.MinMember) - } - - return taskNum <= readyTaskNum, nil - } -} - -func taskNotReady(ctx *context, jobName string, taskNum int) wait.ConditionFunc { - jns, jn, _ := splictJobName(ctx, jobName) - - return func() (bool, error) { - queueJob, err := ctx.kubeclient.BatchV1().Jobs(jns).Get(jn, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - - pods, err := ctx.kubeclient.CoreV1().Pods(jns).List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - - notReadyTaskNum := 0 - for _, pod := range pods.Items { - labelSelector := labels.SelectorFromSet(queueJob.Spec.Selector.MatchLabels) - if !labelSelector.Matches(labels.Set(pod.Labels)) || - !metav1.IsControlledBy(&pod, queueJob) { - continue - } - if pod.Status.Phase == v1.PodPending { - notReadyTaskNum++ - } - } - - return taskNum <= notReadyTaskNum, nil - } -} - -func waitJobReady(ctx *context, name string) error { - return wait.Poll(100*time.Millisecond, oneMinute, taskReady(ctx, name, -1)) -} - -func waitTasksReady(ctx *context, name string, taskNum int) error { - return wait.Poll(100*time.Millisecond, oneMinute, taskReady(ctx, name, taskNum)) -} - -func waitTasksNotReady(ctx *context, name string, taskNum int) error { - return wait.Poll(100*time.Millisecond, oneMinute, taskNotReady(ctx, name, taskNum)) -} - -func jobNotReady(ctx *context, jobName string) wait.ConditionFunc { - return func() (bool, error) { - queueJob, err := ctx.kubeclient.BatchV1().Jobs(ctx.namespaces[0]).Get(jobName, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - - pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespaces[0]).List(metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - - pendingTaskNum := int32(0) - for _, pod := range pods.Items { - labelSelector := labels.SelectorFromSet(queueJob.Spec.Selector.MatchLabels) - if !labelSelector.Matches(labels.Set(pod.Labels)) || - !metav1.IsControlledBy(&pod, queueJob) { - continue - } - if pod.Status.Phase == v1.PodPending && len(pod.Spec.NodeName) == 0 { - pendingTaskNum++ - } - } - - return pendingTaskNum == *queueJob.Spec.Parallelism, nil - } -} - -func waitJobNotReady(ctx *context, name string) error { - return wait.Poll(10*time.Second, oneMinute, jobNotReady(ctx, name)) -} - func replicaSetReady(ctx *context, name string) wait.ConditionFunc { return func() (bool, error) { - deployment, err := ctx.kubeclient.ExtensionsV1beta1().ReplicaSets(ctx.namespaces[0]).Get(name, metav1.GetOptions{}) + deployment, err := ctx.kubeclient.ExtensionsV1beta1().ReplicaSets(ctx.namespace).Get(name, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) - pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespaces[0]).List(metav1.ListOptions{}) + pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespace).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) labelSelector := labels.SelectorFromSet(deployment.Spec.Selector.MatchLabels)