From f0391b2ec928c534caf99d6f7f7955342a232993 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Sun, 14 Oct 2018 08:49:32 +0800 Subject: [PATCH] Supported Pod Affinity predicate. Signed-off-by: Da K. Ma --- .../plugins/predicates/predicates.go | 84 +++++++++++++++++++ test/e2e/job.go | 24 +++--- test/e2e/predicates.go | 40 ++++++++- test/e2e/queue.go | 4 +- test/e2e/util.go | 11 ++- 5 files changed, 145 insertions(+), 18 deletions(-) diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index 48a01836b..d7bdad2c9 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -21,6 +21,8 @@ import ( "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/cache" @@ -38,7 +40,74 @@ func New(args *framework.PluginArgs) framework.Plugin { } } +type podLister struct { + session *framework.Session +} + +func (pl *podLister) List(selector labels.Selector) ([]*v1.Pod, error) { + var pods []*v1.Pod + for _, job := range pl.session.Jobs { + for status, tasks := range job.TaskStatusIndex { + if !api.AllocatedStatus(status) { + continue + } + + for _, task := range tasks { + if selector.Matches(labels.Set(task.Pod.Labels)) { + pod := task.Pod.DeepCopy() + pod.Spec.NodeName = task.NodeName + pods = append(pods, pod) + } + } + } + } + + return pods, nil +} + +func (pl *podLister) FilteredList(podFilter cache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + var pods []*v1.Pod + for _, job := range pl.session.Jobs { + for status, tasks := range job.TaskStatusIndex { + if !api.AllocatedStatus(status) { + continue + } + + for _, task := range tasks { + if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) { + pod := task.Pod.DeepCopy() + pod.Spec.NodeName = task.NodeName + pods = append(pods, pod) + } + } + } + } + + return pods, nil +} + +type cachedNodeInfo struct { + session *framework.Session +} + +func (c *cachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) { + node, found := c.session.NodeIndex[name] + if !found { + return nil, fmt.Errorf("failed to find node <%s>", name) + } + + return node.Node, nil +} + func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) { + pl := &podLister{ + session: ssn, + } + + ni := &cachedNodeInfo{ + session: ssn, + } + ssn.AddPredicateFn(func(task *api.TaskInfo, node *api.NodeInfo) error { nodeInfo := cache.NewNodeInfo(node.Pods()...) nodeInfo.SetNode(node.Node) @@ -85,6 +154,21 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) { node.Name, task.Namespace, task.Name) } + // Pod Affinity/Anti-Affinity Predicate + podAffinityPredicate := predicates.NewPodAffinityPredicate(ni, pl) + fit, _, err = podAffinityPredicate(task.Pod, nil, nodeInfo) + if err != nil { + return err + } + + glog.V(3).Infof("Pod Affinity/Anti-Affinity predicates Task <%s/%s> on Node <%s>: fit %t, err %v", + task.Namespace, task.Name, node.Name, fit, err) + + if !fit { + return fmt.Errorf("task <%s/%s> affinity/anti-affinity failed on node <%s>", + node.Name, task.Namespace, task.Name) + } + return nil }) } diff --git a/test/e2e/job.go b/test/e2e/job.go index 31108ce1a..5db76eafb 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -26,7 +26,7 @@ var _ = Describe("Job E2E Test", func() { context := initTestContext() defer cleanupTestContext(context) rep := clusterSize(context, oneCPU) - job := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil) + job := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil, nil) err := waitJobReady(context, job.Name) Expect(err).NotTo(HaveOccurred()) }) @@ -37,9 +37,9 @@ var _ = Describe("Job E2E Test", func() { rep := clusterSize(context, oneCPU) - job1 := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil) - qj2 := createJob(context, "qj-2", 2, rep, "busybox", oneCPU, nil) - qj3 := createJob(context, "qj-3", 2, rep, "busybox", oneCPU, nil) + job1 := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil, nil) + qj2 := createJob(context, "qj-2", 2, rep, "busybox", oneCPU, nil, nil) + qj3 := createJob(context, "qj-3", 2, rep, "busybox", oneCPU, nil, nil) err := waitJobReady(context, job1.Name) Expect(err).NotTo(HaveOccurred()) @@ -60,7 +60,7 @@ var _ = Describe("Job E2E Test", func() { err := waitReplicaSetReady(context, replicaset.Name) Expect(err).NotTo(HaveOccurred()) - job := createJob(context, "gang-qj", rep, rep, "busybox", oneCPU, nil) + job := createJob(context, "gang-qj", rep, rep, "busybox", oneCPU, nil, nil) err = waitJobNotReady(context, job.Name) Expect(err).NotTo(HaveOccurred()) @@ -76,11 +76,11 @@ var _ = Describe("Job E2E Test", func() { defer cleanupTestContext(context) rep := clusterSize(context, oneCPU) - job1 := createJob(context, "gang-fq-qj1", rep, rep, "nginx", oneCPU, nil) + job1 := createJob(context, "gang-fq-qj1", rep, rep, "nginx", oneCPU, nil, nil) err := waitJobReady(context, job1.Name) Expect(err).NotTo(HaveOccurred()) - job2 := createJob(context, "gang-fq-qj2", rep, rep, "nginx", oneCPU, nil) + job2 := createJob(context, "gang-fq-qj2", rep, rep, "nginx", oneCPU, nil, nil) err = waitJobNotReady(context, job2.Name) Expect(err).NotTo(HaveOccurred()) @@ -95,11 +95,11 @@ var _ = Describe("Job E2E Test", func() { slot := oneCPU rep := clusterSize(context, slot) - job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil) + job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil, nil) err := waitTasksReady(context, job1.Name, int(rep)) Expect(err).NotTo(HaveOccurred()) - job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil) + job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil, nil) err = waitTasksReady(context, job2.Name, int(rep)/2) Expect(err).NotTo(HaveOccurred()) @@ -114,14 +114,14 @@ var _ = Describe("Job E2E Test", func() { slot := oneCPU rep := clusterSize(context, slot) - job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil) + job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil, nil) err := waitTasksReady(context, job1.Name, int(rep)) Expect(err).NotTo(HaveOccurred()) - job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil) + job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil, nil) Expect(err).NotTo(HaveOccurred()) - job3 := createJob(context, "preemptor-qj2", 1, rep, "nginx", slot, nil) + job3 := createJob(context, "preemptor-qj2", 1, rep, "nginx", slot, nil, nil) Expect(err).NotTo(HaveOccurred()) err = waitTasksReady(context, job1.Name, int(rep)/3) diff --git a/test/e2e/predicates.go b/test/e2e/predicates.go index 0cc3912d2..25dd1f6df 100644 --- a/test/e2e/predicates.go +++ b/test/e2e/predicates.go @@ -21,6 +21,7 @@ import ( . "github.com/onsi/gomega" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm" ) @@ -51,7 +52,7 @@ var _ = Describe("Predicates E2E Test", func() { }, } - job := createJob(context, "na-job", 1, 1, "nginx", slot, affinity) + job := createJob(context, "na-job", 1, 1, "nginx", slot, affinity, nil) err := waitJobReady(context, job.Name) Expect(err).NotTo(HaveOccurred()) @@ -68,7 +69,7 @@ var _ = Describe("Predicates E2E Test", func() { nn := clusterNodeNumber(context) containers := createContainers("nginx", oneCPU, 28080) - job := createJobWithOptions(context, "kube-batch", "qj-1", int32(nn), int32(nn*2), nil, containers) + job := createJobWithOptions(context, "kube-batch", "qj-1", int32(nn), int32(nn*2), nil, nil, containers) err := waitTasksReady(context, job.Name, nn) Expect(err).NotTo(HaveOccurred()) @@ -76,4 +77,39 @@ var _ = Describe("Predicates E2E Test", func() { err = waitTasksNotReady(context, job.Name, nn) Expect(err).NotTo(HaveOccurred()) }) + + It("Pod Affinity", func() { + context := initTestContext() + defer cleanupTestContext(context) + + slot := oneCPU + _, rep := computeNode(context, oneCPU) + Expect(rep).NotTo(Equal(0)) + + labels := map[string]string{"foo": "bar"} + + affinity := &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + } + + job := createJob(context, "pa-job", rep, rep, "nginx", slot, affinity, labels) + err := waitJobReady(context, job.Name) + Expect(err).NotTo(HaveOccurred()) + + pods := getPodOfJob(context, "pa-job") + // All pods should be scheduled to the same node. + nodeName := pods[0].Spec.NodeName + for _, pod := range pods { + Expect(pod.Spec.NodeName).To(Equal(nodeName)) + } + }) }) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 97c2cb592..5bde1022c 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -35,7 +35,7 @@ var _ = Describe("Predicates E2E Test", func() { slot := oneCPU rep := clusterSize(context, slot) - createJob(context, jobName1, 1, rep, "nginx", slot, nil) + createJob(context, jobName1, 1, rep, "nginx", slot, nil, nil) err := waitJobReady(context, jobName1) Expect(err).NotTo(HaveOccurred()) @@ -48,7 +48,7 @@ var _ = Describe("Predicates E2E Test", func() { Expect(err).NotTo(HaveOccurred()) } - createJob(context, jobName2, 1, rep, "nginx", slot, nil) + createJob(context, jobName2, 1, rep, "nginx", slot, nil, nil) err = waitTasksReady(context, jobName2, expected) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/util.go b/test/e2e/util.go index 9fc53f3b9..20225b357 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -221,9 +221,10 @@ func createJob( img string, req v1.ResourceList, affinity *v1.Affinity, + labels map[string]string, ) *batchv1.Job { containers := createContainers(img, req, 0) - return createJobWithOptions(context, "kube-batch", name, min, rep, affinity, containers) + return createJobWithOptions(context, "kube-batch", name, min, rep, affinity, labels, containers) } func createContainers(img string, req v1.ResourceList, hostport int32) []v1.Container { @@ -253,11 +254,17 @@ func createJobWithOptions(context *context, name string, min, rep int32, affinity *v1.Affinity, + labels map[string]string, containers []v1.Container, ) *batchv1.Job { queueJobName := "queuejob.k8s.io" jns, jn, jq := splictJobName(context, name) + podLabels := map[string]string{queueJobName: jn} + for k, v := range labels { + podLabels[k] = v + } + job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: jn, @@ -268,7 +275,7 @@ func createJobWithOptions(context *context, Completions: &rep, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{queueJobName: jn}, + Labels: podLabels, Annotations: map[string]string{arbv1.GroupNameAnnotationKey: jn}, }, Spec: v1.PodSpec{