Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Supported Pod Affinity/Anti-Affinity Predicate. #418

Merged
merged 1 commit into from
Oct 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/cache"

Expand All @@ -38,7 +40,74 @@ func New(args *framework.PluginArgs) framework.Plugin {
}
}

type podLister struct {
session *framework.Session
}

func (pl *podLister) List(selector labels.Selector) ([]*v1.Pod, error) {
var pods []*v1.Pod
for _, job := range pl.session.Jobs {
for status, tasks := range job.TaskStatusIndex {
if !api.AllocatedStatus(status) {
continue
}

for _, task := range tasks {
if selector.Matches(labels.Set(task.Pod.Labels)) {
pod := task.Pod.DeepCopy()
pod.Spec.NodeName = task.NodeName
pods = append(pods, pod)
}
}
}
}

return pods, nil
}

func (pl *podLister) FilteredList(podFilter cache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
var pods []*v1.Pod
for _, job := range pl.session.Jobs {
k82cn marked this conversation as resolved.
Show resolved Hide resolved
for status, tasks := range job.TaskStatusIndex {
if !api.AllocatedStatus(status) {
continue
}

for _, task := range tasks {
if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) {
pod := task.Pod.DeepCopy()
pod.Spec.NodeName = task.NodeName
pods = append(pods, pod)
}
}
}
}

return pods, nil
}

type cachedNodeInfo struct {
session *framework.Session
}

func (c *cachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) {
node, found := c.session.NodeIndex[name]
if !found {
return nil, fmt.Errorf("failed to find node <%s>", name)
}

return node.Node, nil
}

func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
pl := &podLister{
session: ssn,
}

ni := &cachedNodeInfo{
session: ssn,
}

ssn.AddPredicateFn(func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo := cache.NewNodeInfo(node.Pods()...)
nodeInfo.SetNode(node.Node)
Expand Down Expand Up @@ -85,6 +154,21 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
node.Name, task.Namespace, task.Name)
}

// Pod Affinity/Anti-Affinity Predicate
podAffinityPredicate := predicates.NewPodAffinityPredicate(ni, pl)
fit, _, err = podAffinityPredicate(task.Pod, nil, nodeInfo)
if err != nil {
return err
}

glog.V(3).Infof("Pod Affinity/Anti-Affinity predicates Task <%s/%s> on Node <%s>: fit %t, err %v",
task.Namespace, task.Name, node.Name, fit, err)

if !fit {
return fmt.Errorf("task <%s/%s> affinity/anti-affinity failed on node <%s>",
node.Name, task.Namespace, task.Name)
}

return nil
})
}
Expand Down
24 changes: 12 additions & 12 deletions test/e2e/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var _ = Describe("Job E2E Test", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil)
job := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil, nil)
err := waitJobReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -37,9 +37,9 @@ var _ = Describe("Job E2E Test", func() {

rep := clusterSize(context, oneCPU)

job1 := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil)
qj2 := createJob(context, "qj-2", 2, rep, "busybox", oneCPU, nil)
qj3 := createJob(context, "qj-3", 2, rep, "busybox", oneCPU, nil)
job1 := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil, nil)
qj2 := createJob(context, "qj-2", 2, rep, "busybox", oneCPU, nil, nil)
qj3 := createJob(context, "qj-3", 2, rep, "busybox", oneCPU, nil, nil)

err := waitJobReady(context, job1.Name)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -60,7 +60,7 @@ var _ = Describe("Job E2E Test", func() {
err := waitReplicaSetReady(context, replicaset.Name)
Expect(err).NotTo(HaveOccurred())

job := createJob(context, "gang-qj", rep, rep, "busybox", oneCPU, nil)
job := createJob(context, "gang-qj", rep, rep, "busybox", oneCPU, nil, nil)
err = waitJobNotReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -76,11 +76,11 @@ var _ = Describe("Job E2E Test", func() {
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)

job1 := createJob(context, "gang-fq-qj1", rep, rep, "nginx", oneCPU, nil)
job1 := createJob(context, "gang-fq-qj1", rep, rep, "nginx", oneCPU, nil, nil)
err := waitJobReady(context, job1.Name)
Expect(err).NotTo(HaveOccurred())

job2 := createJob(context, "gang-fq-qj2", rep, rep, "nginx", oneCPU, nil)
job2 := createJob(context, "gang-fq-qj2", rep, rep, "nginx", oneCPU, nil, nil)
err = waitJobNotReady(context, job2.Name)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -95,11 +95,11 @@ var _ = Describe("Job E2E Test", func() {
slot := oneCPU
rep := clusterSize(context, slot)

job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil)
job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil, nil)
err := waitTasksReady(context, job1.Name, int(rep))
Expect(err).NotTo(HaveOccurred())

job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil)
job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil, nil)
err = waitTasksReady(context, job2.Name, int(rep)/2)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -114,14 +114,14 @@ var _ = Describe("Job E2E Test", func() {
slot := oneCPU
rep := clusterSize(context, slot)

job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil)
job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil, nil)
err := waitTasksReady(context, job1.Name, int(rep))
Expect(err).NotTo(HaveOccurred())

job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil)
job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil, nil)
Expect(err).NotTo(HaveOccurred())

job3 := createJob(context, "preemptor-qj2", 1, rep, "nginx", slot, nil)
job3 := createJob(context, "preemptor-qj2", 1, rep, "nginx", slot, nil, nil)
Expect(err).NotTo(HaveOccurred())

err = waitTasksReady(context, job1.Name, int(rep)/3)
Expand Down
40 changes: 38 additions & 2 deletions test/e2e/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
. "github.com/onsi/gomega"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
)

Expand Down Expand Up @@ -51,7 +52,7 @@ var _ = Describe("Predicates E2E Test", func() {
},
}

job := createJob(context, "na-job", 1, 1, "nginx", slot, affinity)
job := createJob(context, "na-job", 1, 1, "nginx", slot, affinity, nil)
err := waitJobReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -68,12 +69,47 @@ var _ = Describe("Predicates E2E Test", func() {
nn := clusterNodeNumber(context)

containers := createContainers("nginx", oneCPU, 28080)
job := createJobWithOptions(context, "kube-batch", "qj-1", int32(nn), int32(nn*2), nil, containers)
job := createJobWithOptions(context, "kube-batch", "qj-1", int32(nn), int32(nn*2), nil, nil, containers)

err := waitTasksReady(context, job.Name, nn)
Expect(err).NotTo(HaveOccurred())

err = waitTasksNotReady(context, job.Name, nn)
Expect(err).NotTo(HaveOccurred())
})

It("Pod Affinity", func() {
context := initTestContext()
defer cleanupTestContext(context)

slot := oneCPU
_, rep := computeNode(context, oneCPU)
Expect(rep).NotTo(Equal(0))

labels := map[string]string{"foo": "bar"}

affinity := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
}

job := createJob(context, "pa-job", rep, rep, "nginx", slot, affinity, labels)
err := waitJobReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())

pods := getPodOfJob(context, "pa-job")
// All pods should be scheduled to the same node.
nodeName := pods[0].Spec.NodeName
for _, pod := range pods {
Expect(pod.Spec.NodeName).To(Equal(nodeName))
}
})
})
4 changes: 2 additions & 2 deletions test/e2e/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var _ = Describe("Predicates E2E Test", func() {
slot := oneCPU
rep := clusterSize(context, slot)

createJob(context, jobName1, 1, rep, "nginx", slot, nil)
createJob(context, jobName1, 1, rep, "nginx", slot, nil, nil)
err := waitJobReady(context, jobName1)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -48,7 +48,7 @@ var _ = Describe("Predicates E2E Test", func() {
Expect(err).NotTo(HaveOccurred())
}

createJob(context, jobName2, 1, rep, "nginx", slot, nil)
createJob(context, jobName2, 1, rep, "nginx", slot, nil, nil)
err = waitTasksReady(context, jobName2, expected)
Expect(err).NotTo(HaveOccurred())

Expand Down
11 changes: 9 additions & 2 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ func createJob(
img string,
req v1.ResourceList,
affinity *v1.Affinity,
labels map[string]string,
) *batchv1.Job {
containers := createContainers(img, req, 0)
return createJobWithOptions(context, "kube-batch", name, min, rep, affinity, containers)
return createJobWithOptions(context, "kube-batch", name, min, rep, affinity, labels, containers)
}

func createContainers(img string, req v1.ResourceList, hostport int32) []v1.Container {
Expand Down Expand Up @@ -253,11 +254,17 @@ func createJobWithOptions(context *context,
name string,
min, rep int32,
affinity *v1.Affinity,
labels map[string]string,
containers []v1.Container,
) *batchv1.Job {
queueJobName := "queuejob.k8s.io"
jns, jn, jq := splictJobName(context, name)

podLabels := map[string]string{queueJobName: jn}
for k, v := range labels {
podLabels[k] = v
}

job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jn,
Expand All @@ -268,7 +275,7 @@ func createJobWithOptions(context *context,
Completions: &rep,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{queueJobName: jn},
Labels: podLabels,
Annotations: map[string]string{arbv1.GroupNameAnnotationKey: jn},
},
Spec: v1.PodSpec{
Expand Down