Skip to content

Commit

Permalink
Supported Pod Affinity predicate.
Browse files Browse the repository at this point in the history
Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Oct 14, 2018
1 parent 5ae939f commit 52725c1
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 18 deletions.
84 changes: 84 additions & 0 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/cache"

Expand All @@ -38,7 +40,74 @@ func New(args *framework.PluginArgs) framework.Plugin {
}
}

type podLister struct {
session *framework.Session
}

func (pl *podLister) List(selector labels.Selector) ([]*v1.Pod, error) {
var pods []*v1.Pod
for _, job := range pl.session.Jobs {
for status, tasks := range job.TaskStatusIndex {
if !api.AllocatedStatus(status) {
continue
}

for _, task := range tasks {
if selector.Matches(labels.Set(task.Pod.Labels)) {
pod := task.Pod.DeepCopy()
pod.Spec.NodeName = task.NodeName
pods = append(pods, pod)
}
}
}
}

return pods, nil
}

func (pl *podLister) FilteredList(podFilter cache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
var pods []*v1.Pod
for _, job := range pl.session.Jobs {
for status, tasks := range job.TaskStatusIndex {
if !api.AllocatedStatus(status) {
continue
}

for _, task := range tasks {
if podFilter(task.Pod) && selector.Matches(labels.Set(task.Pod.Labels)) {
pod := task.Pod.DeepCopy()
pod.Spec.NodeName = task.NodeName
pods = append(pods, pod)
}
}
}
}

return pods, nil
}

type cachedNodeInfo struct {
session *framework.Session
}

func (c *cachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) {
node, found := c.session.NodeIndex[name]
if !found {
return nil, fmt.Errorf("failed to find node <%s>", name)
}

return node.Node, nil
}

func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
pl := &podLister{
session: ssn,
}

ni := &cachedNodeInfo{
session: ssn,
}

ssn.AddPredicateFn(func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo := cache.NewNodeInfo(node.Pods()...)
nodeInfo.SetNode(node.Node)
Expand Down Expand Up @@ -85,6 +154,21 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
node.Name, task.Namespace, task.Name)
}

// Pod Affinity/Anti-Affinity Predicate
podAffinityPredicate := predicates.NewPodAffinityPredicate(ni, pl)
fit, _, err = podAffinityPredicate(task.Pod, nil, nodeInfo)
if err != nil {
return err
}

glog.V(3).Infof("Pod Affinity/Anti-Affinity predicates Task <%s/%s> on Node <%s>: fit %t, err %v",
task.Namespace, task.Name, node.Name, fit, err)

if !fit {
return fmt.Errorf("task <%s/%s> affinity/anti-affinity failed on node <%s>",
node.Name, task.Namespace, task.Name)
}

return nil
})
}
Expand Down
24 changes: 12 additions & 12 deletions test/e2e/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var _ = Describe("Job E2E Test", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil)
job := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil, nil)
err := waitJobReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -37,9 +37,9 @@ var _ = Describe("Job E2E Test", func() {

rep := clusterSize(context, oneCPU)

job1 := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil)
qj2 := createJob(context, "qj-2", 2, rep, "busybox", oneCPU, nil)
qj3 := createJob(context, "qj-3", 2, rep, "busybox", oneCPU, nil)
job1 := createJob(context, "qj-1", 2, rep, "busybox", oneCPU, nil, nil)
qj2 := createJob(context, "qj-2", 2, rep, "busybox", oneCPU, nil, nil)
qj3 := createJob(context, "qj-3", 2, rep, "busybox", oneCPU, nil, nil)

err := waitJobReady(context, job1.Name)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -60,7 +60,7 @@ var _ = Describe("Job E2E Test", func() {
err := waitReplicaSetReady(context, replicaset.Name)
Expect(err).NotTo(HaveOccurred())

job := createJob(context, "gang-qj", rep, rep, "busybox", oneCPU, nil)
job := createJob(context, "gang-qj", rep, rep, "busybox", oneCPU, nil, nil)
err = waitJobNotReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -76,11 +76,11 @@ var _ = Describe("Job E2E Test", func() {
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)

job1 := createJob(context, "gang-fq-qj1", rep, rep, "nginx", oneCPU, nil)
job1 := createJob(context, "gang-fq-qj1", rep, rep, "nginx", oneCPU, nil, nil)
err := waitJobReady(context, job1.Name)
Expect(err).NotTo(HaveOccurred())

job2 := createJob(context, "gang-fq-qj2", rep, rep, "nginx", oneCPU, nil)
job2 := createJob(context, "gang-fq-qj2", rep, rep, "nginx", oneCPU, nil, nil)
err = waitJobNotReady(context, job2.Name)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -95,11 +95,11 @@ var _ = Describe("Job E2E Test", func() {
slot := oneCPU
rep := clusterSize(context, slot)

job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil)
job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil, nil)
err := waitTasksReady(context, job1.Name, int(rep))
Expect(err).NotTo(HaveOccurred())

job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil)
job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil, nil)
err = waitTasksReady(context, job2.Name, int(rep)/2)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -114,14 +114,14 @@ var _ = Describe("Job E2E Test", func() {
slot := oneCPU
rep := clusterSize(context, slot)

job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil)
job1 := createJob(context, "preemptee-qj", 1, rep, "nginx", slot, nil, nil)
err := waitTasksReady(context, job1.Name, int(rep))
Expect(err).NotTo(HaveOccurred())

job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil)
job2 := createJob(context, "preemptor-qj", 1, rep, "nginx", slot, nil, nil)
Expect(err).NotTo(HaveOccurred())

job3 := createJob(context, "preemptor-qj2", 1, rep, "nginx", slot, nil)
job3 := createJob(context, "preemptor-qj2", 1, rep, "nginx", slot, nil, nil)
Expect(err).NotTo(HaveOccurred())

err = waitTasksReady(context, job1.Name, int(rep)/3)
Expand Down
40 changes: 38 additions & 2 deletions test/e2e/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
. "github.com/onsi/gomega"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
)

Expand Down Expand Up @@ -51,7 +52,7 @@ var _ = Describe("Predicates E2E Test", func() {
},
}

job := createJob(context, "na-job", 1, 1, "nginx", slot, affinity)
job := createJob(context, "na-job", 1, 1, "nginx", slot, affinity, nil)
err := waitJobReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -68,12 +69,47 @@ var _ = Describe("Predicates E2E Test", func() {
nn := clusterNodeNumber(context)

containers := createContainers("nginx", oneCPU, 28080)
job := createJobWithOptions(context, "kube-batch", "qj-1", int32(nn), int32(nn*2), nil, containers)
job := createJobWithOptions(context, "kube-batch", "qj-1", int32(nn), int32(nn*2), nil, nil, containers)

err := waitTasksReady(context, job.Name, nn)
Expect(err).NotTo(HaveOccurred())

err = waitTasksNotReady(context, job.Name, nn)
Expect(err).NotTo(HaveOccurred())
})

It("Pod Affinity", func() {
context := initTestContext()
defer cleanupTestContext(context)

slot := oneCPU
_, rep := computeNode(context, oneCPU)
Expect(rep).NotTo(Equal(0))

labels := map[string]string{"foo": "bar"}

affinity := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
}

job := createJob(context, "pa-job", rep, rep, "nginx", slot, affinity, labels)
err := waitJobReady(context, job.Name)
Expect(err).NotTo(HaveOccurred())

pods := getPodOfJob(context, "pa-job")
// All pods should be scheduled to the same node.
nodeName := pods[0].Spec.NodeName
for _, pod := range pods {
Expect(pod.Spec.NodeName).To(Equal(nodeName))
}
})
})
4 changes: 2 additions & 2 deletions test/e2e/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var _ = Describe("Predicates E2E Test", func() {
slot := oneCPU
rep := clusterSize(context, slot)

createJob(context, jobName1, 1, rep, "nginx", slot, nil)
createJob(context, jobName1, 1, rep, "nginx", slot, nil, nil)
err := waitJobReady(context, jobName1)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -48,7 +48,7 @@ var _ = Describe("Predicates E2E Test", func() {
Expect(err).NotTo(HaveOccurred())
}

createJob(context, jobName2, 1, rep, "nginx", slot, nil)
createJob(context, jobName2, 1, rep, "nginx", slot, nil, nil)
err = waitTasksReady(context, jobName2, expected)
Expect(err).NotTo(HaveOccurred())

Expand Down
11 changes: 9 additions & 2 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ func createJob(
img string,
req v1.ResourceList,
affinity *v1.Affinity,
labels map[string]string,
) *batchv1.Job {
containers := createContainers(img, req, 0)
return createJobWithOptions(context, "kube-batch", name, min, rep, affinity, containers)
return createJobWithOptions(context, "kube-batch", name, min, rep, affinity, labels, containers)
}

func createContainers(img string, req v1.ResourceList, hostport int32) []v1.Container {
Expand Down Expand Up @@ -253,11 +254,17 @@ func createJobWithOptions(context *context,
name string,
min, rep int32,
affinity *v1.Affinity,
labels map[string]string,
containers []v1.Container,
) *batchv1.Job {
queueJobName := "queuejob.k8s.io"
jns, jn, jq := splictJobName(context, name)

podLabels := map[string]string{queueJobName: jn}
for k, v := range labels {
podLabels[k] = v
}

job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jn,
Expand All @@ -268,7 +275,7 @@ func createJobWithOptions(context *context,
Completions: &rep,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{queueJobName: jn},
Labels: podLabels,
Annotations: map[string]string{arbv1.GroupNameAnnotationKey: jn},
},
Spec: v1.PodSpec{
Expand Down

0 comments on commit 52725c1

Please sign in to comment.