diff --git a/test/e2e/predicates.go b/test/e2e/predicates.go index b94cab41c5..64f12784de 100644 --- a/test/e2e/predicates.go +++ b/test/e2e/predicates.go @@ -19,6 +19,9 @@ package e2e import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var _ = Describe("Predicates E2E Test", func() { @@ -51,4 +54,255 @@ var _ = Describe("Predicates E2E Test", func() { Expect(err).NotTo(HaveOccurred()) }) + It("NodeAffinity", func() { + context := initTestContext() + defer cleanupTestContext(context) + + slot := oneCPU + nodeName, rep := computeNode(context, oneCPU) + Expect(rep).NotTo(Equal(0)) + + affinity := &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchFields: []v1.NodeSelectorRequirement{ + { + Key: nodeFieldSelectorKeyNodeName, + Operator: v1.NodeSelectorOpIn, + Values: []string{nodeName}, + }, + }, + }, + }, + }, + }, + } + + spec := &jobSpec{ + name: "na-job", + tasks: []taskSpec{ + { + img: "nginx", + req: slot, + min: 1, + rep: rep, + affinity: affinity, + }, + }, + } + + job := createJob(context, spec) + err := waitTasksReady(context, job, int(rep)) + Expect(err).NotTo(HaveOccurred()) + + pods := getTasksOfJob(context, job) + for _, pod := range pods { + Expect(pod.Spec.NodeName).To(Equal(nodeName)) + } + }) + + It("Pod Affinity", func() { + context := initTestContext() + defer cleanupTestContext(context) + + slot := oneCPU + _, rep := computeNode(context, oneCPU) + Expect(rep).NotTo(Equal(0)) + + labels := map[string]string{"foo": "bar"} + + affinity := &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + } + + spec := &jobSpec{ + name: "pa-job", + tasks: []taskSpec{ + { + img: "nginx", + req: slot, + min: rep, + rep: rep, + affinity: affinity, + labels: labels, + }, + }, + } + + job := createJob(context, spec) + err := waitTasksReady(context, job, int(rep)) + Expect(err).NotTo(HaveOccurred()) + + pods := getTasksOfJob(context, job) + // All pods should be scheduled to the same node. + nodeName := pods[0].Spec.NodeName + for _, pod := range pods { + Expect(pod.Spec.NodeName).To(Equal(nodeName)) + } + }) + + It("Pod Anti-Affinity", func() { + context := initTestContext() + defer cleanupTestContext(context) + + slot := oneCPU + + labels := map[string]string{"foo": "bar"} + + affinity := &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + } + + spec := &jobSpec{ + name: "pa-job", + tasks: []taskSpec{ + { + img: "nginx", + req: slot, + min: 2, + rep: 2, + affinity: affinity, + labels: labels, + }, + }, + } + + job := createJob(context, spec) + err := waitTasksReady(context, job, 2) + Expect(err).NotTo(HaveOccurred()) + + pods := getTasksOfJob(context, job) + // All pods should be scheduled to the same node. + nodeName := pods[0].Spec.NodeName + + for index, pod := range pods { + if index != 0 { + Expect(pod.Spec.NodeName).NotTo(Equal(nodeName)) + } + } + }) + + It("Taints", func() { + context := initTestContext() + defer cleanupTestContext(context) + + taints := []v1.Taint{ + { + Key: "test-taint-key", + Value: "test-taint-val", + Effect: v1.TaintEffectNoSchedule, + }, + } + defer removeTaintsFromAllNodes(context, taints) + + err := taintAllNodes(context, taints) + Expect(err).NotTo(HaveOccurred()) + + spec := &jobSpec{ + name: "tt-job", + tasks: []taskSpec{ + { + img: "nginx", + req: oneCPU, + min: 1, + rep: 1, + }, + }, + } + + job := createJob(context, spec) + err = waitTasksPending(context, job, 1) + Expect(err).NotTo(HaveOccurred()) + + err = removeTaintsFromAllNodes(context, taints) + Expect(err).NotTo(HaveOccurred()) + + err = waitTasksReady(context, job, 1) + Expect(err).NotTo(HaveOccurred()) + }) + + It("Taints and Tolerations", func() { + context := initTestContext() + defer cleanupTestContext(context) + + taints := []v1.Taint{ + { + Key: "test-taint-key", + Value: "test-taint-val", + Effect: v1.TaintEffectNoSchedule, + }, + } + defer removeTaintsFromAllNodes(context, taints) + + tolerations := []v1.Toleration{ + { + Key: "test-taint-key", + Value: "test-taint-val", + Operator: v1.TolerationOpEqual, + Effect: v1.TaintEffectNoSchedule, + }, + } + + err := taintAllNodes(context, taints) + Expect(err).NotTo(HaveOccurred()) + + spec1 := &jobSpec{ + name: "tt-job", + tasks: []taskSpec{ + { + img: "nginx", + req: oneCPU, + min: 1, + rep: 1, + tolerations: tolerations, + }, + }, + } + + spec2 := &jobSpec{ + name: "tt-job-no-toleration", + tasks: []taskSpec{ + { + img: "nginx", + req: oneCPU, + min: 1, + rep: 1, + }, + }, + } + + job1 := createJob(context, spec1) + err = waitTasksReady(context, job1, 1) + Expect(err).NotTo(HaveOccurred()) + + job2 := createJob(context, spec2) + err = waitTasksPending(context, job2, 1) + Expect(err).NotTo(HaveOccurred()) + + err = removeTaintsFromAllNodes(context, taints) + Expect(err).NotTo(HaveOccurred()) + + err = waitTasksReady(context, job2, 1) + Expect(err).NotTo(HaveOccurred()) + }) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index caa6c4f3c2..e90342fba7 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + api "k8s.io/kubernetes/pkg/apis/core" kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" kbver "volcano.sh/volcano/pkg/client/clientset/versioned" @@ -55,12 +56,13 @@ var ( ) const ( - timeOutMessage = "timed out waiting for the condition" - workerPriority = "worker-pri" - masterPriority = "master-pri" - defaultNginxImage = "nginx:1.14" - defaultBusyBoxImage = "busybox:1.24" - defaultMPIImage = "volcanosh/example-mpi:0.0.1" + timeOutMessage = "timed out waiting for the condition" + workerPriority = "worker-pri" + masterPriority = "master-pri" + defaultNginxImage = "nginx:1.14" + nodeFieldSelectorKeyNodeName = api.ObjectNameField + defaultBusyBoxImage = "busybox:1.24" + defaultMPIImage = "volcanosh/example-mpi:0.0.1" defaultNamespace = "test" defaultQueue1 = "q1" @@ -268,6 +270,7 @@ type taskSpec struct { labels map[string]string policies []vkv1.LifecyclePolicy restartPolicy v1.RestartPolicy + tolerations []v1.Toleration defaultGracefulPeriod *int64 } @@ -342,6 +345,7 @@ func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) { RestartPolicy: restartPolicy, Containers: createContainers(task.img, task.command, task.workingDir, task.req, task.limit, task.hostport), Affinity: task.affinity, + Tolerations: task.tolerations, }, }, } @@ -944,7 +948,9 @@ func getTasksOfJob(ctx *context, job *vkv1.Job) []*v1.Pod { if !metav1.IsControlledBy(&pod, job) { continue } - tasks = append(tasks, &pod) + var duplicatePod *v1.Pod + duplicatePod = pod.DeepCopy() + tasks = append(tasks, duplicatePod) } return tasks