From 446d9989598e7d9be004eb932c61bee504469f84 Mon Sep 17 00:00:00 2001 From: "wangyuqing (C)" Date: Thu, 9 May 2019 15:30:58 +0800 Subject: [PATCH] add admitPod --- .../configure.go => options/options.go} | 49 ++++- cmd/admission/app/server.go | 72 +------ cmd/admission/main.go | 23 ++- hack/e2e-admission-config.yaml | 44 ----- .../chart/volcano/templates/admission.yaml | 3 + installer/volcano-development.yaml | 3 + pkg/admission/admission_controller.go | 17 +- pkg/admission/admit_job.go | 10 +- pkg/admission/admit_job_test.go | 4 +- pkg/admission/admit_pod.go | 136 +++++++++++++ pkg/admission/admit_pod_test.go | 186 ++++++++++++++++++ pkg/admission/mutate_job.go | 3 +- pkg/admission/mutate_job_test.go | 4 +- pkg/admission/server.go | 77 ++++++++ pkg/apis/helpers/helpers.go | 18 ++ pkg/controllers/job/helpers/helpers.go | 4 +- .../job/job_controller_actions_test.go | 4 +- .../podgroup/pg_controller_handler.go | 20 +- test/e2e/admission.go | 75 +++++++ test/e2e/util.go | 1 + 20 files changed, 603 insertions(+), 150 deletions(-) rename cmd/admission/app/{configure/configure.go => options/options.go} (83%) delete mode 100644 hack/e2e-admission-config.yaml create mode 100644 pkg/admission/admit_pod.go create mode 100644 pkg/admission/admit_pod_test.go create mode 100644 pkg/admission/server.go diff --git a/cmd/admission/app/configure/configure.go b/cmd/admission/app/options/options.go similarity index 83% rename from cmd/admission/app/configure/configure.go rename to cmd/admission/app/options/options.go index 188b365e5f..bd2bfc3459 100644 --- a/cmd/admission/app/configure/configure.go +++ b/cmd/admission/app/options/options.go @@ -14,11 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package configure +package options import ( "flag" "fmt" + "github.com/golang/glog" "k8s.io/api/admissionregistration/v1beta1" @@ -29,6 +30,10 @@ import ( admissionregistrationv1beta1client "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1" ) +const ( + defaultSchedulerName = "volcano" +) + // Config admission-controller server config. type Config struct { Master string @@ -44,6 +49,7 @@ type Config struct { PrintVersion bool AdmissionServiceName string AdmissionServiceNamespace string + SchedulerName string } // NewConfig create new config @@ -73,6 +79,7 @@ func (c *Config) AddFlags() { flag.BoolVar(&c.PrintVersion, "version", false, "Show version and quit") flag.StringVar(&c.AdmissionServiceNamespace, "webhook-namespace", "default", "The namespace of this webhook") flag.StringVar(&c.AdmissionServiceName, "webhook-service-name", "admission-service", "The name of this admission service") + flag.StringVar(&c.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name") } const ( @@ -84,6 +91,10 @@ const ( ValidateHookName = "validatejob.volcano.sh" // MutateHookName Default name for webhooks in MutatingWebhookConfiguration MutateHookName = "mutatejob.volcano.sh" + // ValidatePodConfigName ValidatingWebhookPodConfiguration name format + ValidatePodConfigName = "%s-validate-pod" + // ValidatePodHookName Default name for webhooks in ValidatingWebhookPodConfiguration + ValidatePodHookName = "validatepod.volcano.sh" ) // CheckPortOrDie check valid port range @@ -177,6 +188,42 @@ func RegisterWebhooks(c *Config, clienset *kubernetes.Clientset, cabundle []byte return err } + // Prepare validate pods + path = "/pods" + PodValidateHooks := v1beta1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: useGeneratedNameIfRequired("", + fmt.Sprintf(ValidatePodConfigName, c.AdmissionServiceName)), + }, + Webhooks: []v1beta1.Webhook{{ + Name: useGeneratedNameIfRequired("", ValidatePodHookName), + Rules: []v1beta1.RuleWithOperations{ + { + Operations: []v1beta1.OperationType{v1beta1.Create}, + Rule: v1beta1.Rule{ + APIGroups: []string{""}, + APIVersions: []string{"v1"}, + Resources: []string{"pods"}, + }, + }, + }, + ClientConfig: v1beta1.WebhookClientConfig{ + Service: &v1beta1.ServiceReference{ + Name: c.AdmissionServiceName, + Namespace: c.AdmissionServiceNamespace, + Path: &path, + }, + CABundle: cabundle, + }, + FailurePolicy: &ignorePolicy, + }}, + } + + if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(), + []v1beta1.ValidatingWebhookConfiguration{PodValidateHooks}); err != nil { + return err + } + return nil } diff --git a/cmd/admission/app/server.go b/cmd/admission/app/server.go index dcd2ae9a98..a5e4185675 100644 --- a/cmd/admission/app/server.go +++ b/cmd/admission/app/server.go @@ -18,28 +18,13 @@ package app import ( "crypto/tls" - "encoding/json" - "io/ioutil" - "net/http" "github.com/golang/glog" - "volcano.sh/volcano/pkg/client/clientset/versioned" - "k8s.io/api/admission/v1beta1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" - - appConf "volcano.sh/volcano/cmd/admission/app/configure" - admissioncontroller "volcano.sh/volcano/pkg/admission" -) - -const ( - //CONTENTTYPE http content-type - CONTENTTYPE = "Content-Type" - - //APPLICATIONJSON json content - APPLICATIONJSON = "application/json" + appConf "volcano.sh/volcano/cmd/admission/app/options" + "volcano.sh/volcano/pkg/client/clientset/versioned" ) // GetClient Get a clientset with restConfig. @@ -51,8 +36,8 @@ func GetClient(restConfig *restclient.Config) *kubernetes.Clientset { return clientset } -//GetKubeBatchClient get a clientset for kubebatch -func GetKubeBatchClient(restConfig *restclient.Config) *versioned.Clientset { +// GetVolcanoClient get a clientset for volcano +func GetVolcanoClient(restConfig *restclient.Config) *versioned.Clientset { clientset, err := versioned.NewForConfig(restConfig) if err != nil { glog.Fatal(err) @@ -89,52 +74,3 @@ func ConfigTLS(config *appConf.Config, restConfig *restclient.Config) *tls.Confi glog.Fatal("tls: failed to find any tls config data") return &tls.Config{} } - -//Serve the http request -func Serve(w http.ResponseWriter, r *http.Request, admit admissioncontroller.AdmitFunc) { - var body []byte - if r.Body != nil { - if data, err := ioutil.ReadAll(r.Body); err == nil { - body = data - } - } - - // verify the content type is accurate - contentType := r.Header.Get(CONTENTTYPE) - if contentType != APPLICATIONJSON { - glog.Errorf("contentType=%s, expect application/json", contentType) - return - } - - var reviewResponse *v1beta1.AdmissionResponse - ar := v1beta1.AdmissionReview{} - deserializer := admissioncontroller.Codecs.UniversalDeserializer() - if _, _, err := deserializer.Decode(body, nil, &ar); err != nil { - reviewResponse = admissioncontroller.ToAdmissionResponse(err) - } else { - reviewResponse = admit(ar) - } - glog.V(3).Infof("sending response: %v", reviewResponse) - - response := createResponse(reviewResponse, &ar) - resp, err := json.Marshal(response) - if err != nil { - glog.Error(err) - } - if _, err := w.Write(resp); err != nil { - glog.Error(err) - } -} - -func createResponse(reviewResponse *v1beta1.AdmissionResponse, ar *v1beta1.AdmissionReview) v1beta1.AdmissionReview { - response := v1beta1.AdmissionReview{} - if reviewResponse != nil { - response.Response = reviewResponse - response.Response.UID = ar.Request.UID - } - // reset the Object and OldObject, they are not needed in a response. - ar.Request.Object = runtime.RawExtension{} - ar.Request.OldObject = runtime.RawExtension{} - - return response -} diff --git a/cmd/admission/main.go b/cmd/admission/main.go index fffa4d42f6..284b7f355b 100644 --- a/cmd/admission/main.go +++ b/cmd/admission/main.go @@ -17,7 +17,6 @@ package main import ( "flag" - "github.com/golang/glog" "io/ioutil" "net/http" "os" @@ -25,20 +24,22 @@ import ( "strconv" "syscall" + "github.com/golang/glog" + "k8s.io/client-go/tools/clientcmd" "volcano.sh/volcano/cmd/admission/app" - appConf "volcano.sh/volcano/cmd/admission/app/configure" + appConf "volcano.sh/volcano/cmd/admission/app/options" admissioncontroller "volcano.sh/volcano/pkg/admission" "volcano.sh/volcano/pkg/version" ) func serveJobs(w http.ResponseWriter, r *http.Request) { - app.Serve(w, r, admissioncontroller.AdmitJobs) + admissioncontroller.Serve(w, r, admissioncontroller.AdmitJobs) } func serveMutateJobs(w http.ResponseWriter, r *http.Request) { - app.Serve(w, r, admissioncontroller.MutateJobs) + admissioncontroller.Serve(w, r, admissioncontroller.MutateJobs) } func main() { @@ -63,7 +64,9 @@ func main() { glog.Fatalf("Unable to build k8s config: %v\n", err) } - admissioncontroller.KubeBatchClientSet = app.GetKubeBatchClient(restConfig) + admissioncontroller.VolcanoClientSet = app.GetVolcanoClient(restConfig) + + servePods(config) caBundle, err := ioutil.ReadFile(config.CaCertFile) if err != nil { @@ -101,3 +104,13 @@ func main() { return } } + +func servePods(config *appConf.Config) { + admController := &admissioncontroller.Controller{ + VcClients: admissioncontroller.VolcanoClientSet, + SchedulerName: config.SchedulerName, + } + http.HandleFunc(admissioncontroller.AdmitPodPath, admController.ServerPods) + + return +} diff --git a/hack/e2e-admission-config.yaml b/hack/e2e-admission-config.yaml deleted file mode 100644 index 663bf1acc1..0000000000 --- a/hack/e2e-admission-config.yaml +++ /dev/null @@ -1,44 +0,0 @@ -apiVersion: admissionregistration.k8s.io/v1beta1 -kind: ValidatingWebhookConfiguration -metadata: - name: validate-volcano-job -webhooks: - - clientConfig: - caBundle: {{CA_BUNDLE}} - - # the url should agree with webhook service - url: https://{{host}}:{{hostPort}}/jobs - failurePolicy: Ignore - name: validatejob.volcano.sh - rules: - - apiGroups: - - "batch.volcano.sh" - apiVersions: - - "v1alpha1" - operations: - - CREATE - - UPDATE - resources: - - jobs ---- -apiVersion: admissionregistration.k8s.io/v1beta1 -kind: MutatingWebhookConfiguration -metadata: - name: mutate-volcano-job -webhooks: - - clientConfig: - caBundle: {{CA_BUNDLE}} - - # the url should agree with webhook service - url: https://{{host}}:{{hostPort}}/mutating-jobs - failurePolicy: Ignore - name: mutatejob.volcano.sh - rules: - - apiGroups: - - "batch.volcano.sh" - apiVersions: - - "v1alpha1" - operations: - - CREATE - resources: - - jobs diff --git a/installer/helm/chart/volcano/templates/admission.yaml b/installer/helm/chart/volcano/templates/admission.yaml index e4c4915fbb..68ada88fc9 100644 --- a/installer/helm/chart/volcano/templates/admission.yaml +++ b/installer/helm/chart/volcano/templates/admission.yaml @@ -32,6 +32,9 @@ rules: - apiGroups: [""] resources: ["services"] verbs: ["get"] + - apiGroups: ["scheduling.incubator.k8s.io", "scheduling.sigs.dev"] + resources: ["podgroups"] + verbs: ["get", "list", "watch"] --- kind: ClusterRoleBinding diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index de09ac7a41..6df69378fa 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -187,6 +187,9 @@ rules: - apiGroups: [""] resources: ["services"] verbs: ["get"] + - apiGroups: ["scheduling.incubator.k8s.io", "scheduling.sigs.dev"] + resources: ["podgroups"] + verbs: ["get", "list", "watch"] --- kind: ClusterRoleBinding diff --git a/pkg/admission/admission_controller.go b/pkg/admission/admission_controller.go index 7df0fcc504..a1c7ca517c 100644 --- a/pkg/admission/admission_controller.go +++ b/pkg/admission/admission_controller.go @@ -31,18 +31,31 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + vcver "volcano.sh/volcano/pkg/client/clientset/versioned" ) const ( - //AdmitJobPath is the pattern for the jobs admission + // AdmitJobPath is the pattern for the jobs admission AdmitJobPath = "/jobs" - //MutateJobPath is the pattern for the mutating jobs + // MutateJobPath is the pattern for the mutating jobs MutateJobPath = "/mutating-jobs" + // AdmitPodPath is the pattern for the pods admission + AdmitPodPath = "/pods" + // CONTENTTYPE http content-type + CONTENTTYPE = "Content-Type" + // APPLICATIONJSON json content + APPLICATIONJSON = "application/json" ) //The AdmitFunc returns response type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse +// Controller the Admission Controller type +type Controller struct { + VcClients vcver.Interface + SchedulerName string +} + var scheme = runtime.NewScheme() //Codecs is for retrieving serializers for the supported wire formats diff --git a/pkg/admission/admit_job.go b/pkg/admission/admit_job.go index 6435a3ede4..a6a0040106 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/admit_job.go @@ -21,7 +21,6 @@ import ( "strings" "github.com/golang/glog" - "volcano.sh/volcano/pkg/client/clientset/versioned" "k8s.io/api/admission/v1beta1" "k8s.io/api/core/v1" @@ -33,11 +32,12 @@ import ( k8scorevalid "k8s.io/kubernetes/pkg/apis/core/validation" "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/controllers/job/plugins" ) -// KubeBatchClientSet is volcano clientset -var KubeBatchClientSet versioned.Interface +// VolcanoClientSet is volcano clientset +var VolcanoClientSet versioned.Interface // AdmitJobs is to admit jobs and return response func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { @@ -151,9 +151,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st } // Check whether Queue already present or not - if _, err := KubeBatchClientSet.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { + if _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { // TODO: deprecate v1alpha1 - if _, err := KubeBatchClientSet.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { + if _, err := VolcanoClientSet.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { msg = msg + fmt.Sprintf(" unable to find job queue: %v", err) } } diff --git a/pkg/admission/admit_job_test.go b/pkg/admission/admit_job_test.go index da011ae303..4841f4b6be 100644 --- a/pkg/admission/admit_job_test.go +++ b/pkg/admission/admit_job_test.go @@ -957,10 +957,10 @@ func TestValidateExecution(t *testing.T) { }, } // create fake volcano clientset - KubeBatchClientSet = fakeclient.NewSimpleClientset() + VolcanoClientSet = fakeclient.NewSimpleClientset() //create default queue - _, err := KubeBatchClientSet.SchedulingV1alpha2().Queues().Create(&defaultqueue) + _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Create(&defaultqueue) if err != nil { t.Error("Queue Creation Failed") } diff --git a/pkg/admission/admit_pod.go b/pkg/admission/admit_pod.go new file mode 100644 index 0000000000..80acee018f --- /dev/null +++ b/pkg/admission/admit_pod.go @@ -0,0 +1,136 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "fmt" + "net/http" + "strings" + + "github.com/golang/glog" + + "k8s.io/api/admission/v1beta1" + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "volcano.sh/volcano/pkg/apis/helpers" + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" +) + +// ServerPods is to server pods +func (c *Controller) ServerPods(w http.ResponseWriter, r *http.Request) { + Serve(w, r, c.AdmitPods) +} + +// AdmitPods is to admit pods and return response +func (c *Controller) AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { + + glog.V(3).Infof("admitting pods -- %s", ar.Request.Operation) + + pod, err := decodePod(ar.Request.Object, ar.Request.Resource) + if err != nil { + return ToAdmissionResponse(err) + } + + var msg string + reviewResponse := v1beta1.AdmissionResponse{} + reviewResponse.Allowed = true + + switch ar.Request.Operation { + case v1beta1.Create: + msg = c.validatePod(pod, &reviewResponse) + break + default: + err := fmt.Errorf("expect operation to be 'CREATE'") + return ToAdmissionResponse(err) + } + + if !reviewResponse.Allowed { + reviewResponse.Result = &metav1.Status{Message: strings.TrimSpace(msg)} + } + return &reviewResponse +} + +func decodePod(object runtime.RawExtension, resource metav1.GroupVersionResource) (v1.Pod, error) { + podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + raw := object.Raw + pod := v1.Pod{} + + if resource != podResource { + err := fmt.Errorf("expect resource to be %s", podResource) + return pod, err + } + + deserializer := Codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil { + return pod, err + } + glog.V(3).Infof("the pod struct is %+v", pod) + + return pod, nil +} + +// allow pods to create when +// 1. schedulerName of pod isn't volcano +// 2. pod has Podgroup whose phase isn't Pending +// 3. normal pods whose schedulerName is volcano don't have podgroup +func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionResponse) string { + if pod.Spec.SchedulerName != c.SchedulerName { + return "" + } + + pgName := "" + msg := "" + + // vc-job, SN == volcano + if pod.Annotations != nil { + pgName = pod.Annotations[v1alpha2.GroupNameAnnotationKey] + } + if pgName != "" { + if err := c.checkPGPhase(pod, pgName, true); err != nil { + msg = err.Error() + reviewResponse.Allowed = false + } + return msg + } + + // normal pod, SN == volcano + pgName = helpers.GeneratePodgroupName(&pod) + if err := c.checkPGPhase(pod, pgName, false); err != nil { + msg = err.Error() + reviewResponse.Allowed = false + } + + return msg +} + +func (c *Controller) checkPGPhase(pod v1.Pod, pgName string, isVCJob bool) error { + pg, err := c.VcClients.SchedulingV1alpha2().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) + if err != nil { + if isVCJob || (!isVCJob && !apierrors.IsNotFound(err)) { + return fmt.Errorf("Failed to get PodGroup for pod <%s/%s>: %v", pod.Namespace, pod.Name, err) + } + return nil + } + if pg.Status.Phase != v1alpha2.PodGroupPending { + return nil + } + return fmt.Errorf("Failed to create pod <%s/%s>, because the podgroup phase is Pending", + pod.Namespace, pod.Name) +} diff --git a/pkg/admission/admit_pod_test.go b/pkg/admission/admit_pod_test.go new file mode 100644 index 0000000000..b9b42f1fb0 --- /dev/null +++ b/pkg/admission/admit_pod_test.go @@ -0,0 +1,186 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "strings" + "testing" + + "k8s.io/api/admission/v1beta1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" + vcclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake" +) + +func TestValidatePod(t *testing.T) { + + namespace := "test" + pgName := "podgroup-p1" + isController := true + + testCases := []struct { + Name string + Pod v1.Pod + ExpectErr bool + reviewResponse v1beta1.AdmissionResponse + ret string + disabledPG bool + }{ + // validate normal pod with default-scheduler + { + Name: "validate default normal pod", + Pod: v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "normal-pod-1", + }, + Spec: v1.PodSpec{ + SchedulerName: "default-scheduler", + }, + }, + + reviewResponse: v1beta1.AdmissionResponse{Allowed: true}, + ret: "", + ExpectErr: false, + }, + // validate normal pod with volcano scheduler + { + Name: "validate volcano-scheduler normal pod", + Pod: v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "normal-pod-2", + OwnerReferences: []metav1.OwnerReference{ + {UID: "p1", Controller: &isController}, + }, + }, + Spec: v1.PodSpec{ + SchedulerName: "volcano", + }, + }, + + reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, + ret: "Failed to create pod , because the podgroup phase is Pending", + ExpectErr: true, + }, + // validate volcano pod with volcano scheduler + { + Name: "validate volcano-scheduler volcano pod", + Pod: v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "volcano-pod-1", + Annotations: map[string]string{v1alpha2.GroupNameAnnotationKey: pgName}, + }, + Spec: v1.PodSpec{ + SchedulerName: "volcano", + }, + }, + + reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, + ret: "Failed to create pod , because the podgroup phase is Pending", + ExpectErr: true, + }, + // validate volcano pod with volcano scheduler when get pg failed + { + Name: "validate volcano volcano pod when get pg failed", + Pod: v1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "volcano-pod-2", + Annotations: map[string]string{v1alpha2.GroupNameAnnotationKey: pgName}, + }, + Spec: v1.PodSpec{ + SchedulerName: "volcano", + }, + }, + + reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, + ret: `Failed to get PodGroup for pod : podgroups.scheduling "podgroup-p1" not found`, + ExpectErr: true, + disabledPG: true, + }, + } + + for _, testCase := range testCases { + + pg := &v1alpha2.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "podgroup-p1", + }, + Spec: v1alpha2.PodGroupSpec{ + MinMember: 1, + }, + Status: v1alpha2.PodGroupStatus{ + Phase: v1alpha2.PodGroupPending, + }, + } + + // create fake volcano clientset + VolcanoClientSet = vcclient.NewSimpleClientset() + + if !testCase.disabledPG { + _, err := VolcanoClientSet.SchedulingV1alpha2().PodGroups(namespace).Create(pg) + if err != nil { + t.Error("PG Creation Failed") + } + } + + c := Controller{ + VcClients: VolcanoClientSet, + SchedulerName: "volcano", + } + + ret := c.validatePod(testCase.Pod, &testCase.reviewResponse) + + if testCase.ExpectErr == true && ret == "" { + t.Errorf("%s: test case Expect error msg :%s, but got nil.", testCase.Name, testCase.ret) + } + if testCase.ExpectErr == true && testCase.reviewResponse.Allowed != false { + t.Errorf("%s: test case Expect Allowed as false but got true.", testCase.Name) + } + if testCase.ExpectErr == true && !strings.Contains(ret, testCase.ret) { + t.Errorf("%s: test case Expect error msg :%s, but got diff error %v", testCase.Name, testCase.ret, ret) + } + + if testCase.ExpectErr == false && ret != "" { + t.Errorf("%s: test case Expect no error, but got error %v", testCase.Name, ret) + } + if testCase.ExpectErr == false && testCase.reviewResponse.Allowed != true { + t.Errorf("%s: test case Expect Allowed as true but got false. %v", testCase.Name, testCase.reviewResponse) + } + } +} diff --git a/pkg/admission/mutate_job.go b/pkg/admission/mutate_job.go index e269c20fba..6c08115215 100644 --- a/pkg/admission/mutate_job.go +++ b/pkg/admission/mutate_job.go @@ -19,9 +19,10 @@ package admission import ( "encoding/json" "fmt" - "github.com/golang/glog" "strconv" + "github.com/golang/glog" + "k8s.io/api/admission/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/admission/mutate_job_test.go b/pkg/admission/mutate_job_test.go index f786198b96..f9ba79a9e6 100644 --- a/pkg/admission/mutate_job_test.go +++ b/pkg/admission/mutate_job_test.go @@ -17,9 +17,11 @@ limitations under the License. package admission import ( + "testing" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "testing" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" ) diff --git a/pkg/admission/server.go b/pkg/admission/server.go new file mode 100644 index 0000000000..4c3fa15e88 --- /dev/null +++ b/pkg/admission/server.go @@ -0,0 +1,77 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/golang/glog" + + "k8s.io/api/admission/v1beta1" + "k8s.io/apimachinery/pkg/runtime" +) + +// Serve the http request +func Serve(w http.ResponseWriter, r *http.Request, admit AdmitFunc) { + var body []byte + if r.Body != nil { + if data, err := ioutil.ReadAll(r.Body); err == nil { + body = data + } + } + + // verify the content type is accurate + contentType := r.Header.Get(CONTENTTYPE) + if contentType != APPLICATIONJSON { + glog.Errorf("contentType=%s, expect application/json", contentType) + return + } + + var reviewResponse *v1beta1.AdmissionResponse + ar := v1beta1.AdmissionReview{} + deserializer := Codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(body, nil, &ar); err != nil { + reviewResponse = ToAdmissionResponse(err) + } else { + reviewResponse = admit(ar) + } + glog.V(3).Infof("sending response: %v", reviewResponse) + + response := createResponse(reviewResponse, &ar) + resp, err := json.Marshal(response) + if err != nil { + glog.Error(err) + } + if _, err := w.Write(resp); err != nil { + glog.Error(err) + } +} + +func createResponse(reviewResponse *v1beta1.AdmissionResponse, ar *v1beta1.AdmissionReview) v1beta1.AdmissionReview { + response := v1beta1.AdmissionReview{} + if reviewResponse != nil { + response.Response = reviewResponse + response.Response.UID = ar.Request.UID + } + // reset the Object and OldObject, they are not needed in a response. + ar.Request.Object = runtime.RawExtension{} + ar.Request.OldObject = runtime.RawExtension{} + + return response +} diff --git a/pkg/apis/helpers/helpers.go b/pkg/apis/helpers/helpers.go index 21476f3854..c6c137d270 100644 --- a/pkg/apis/helpers/helpers.go +++ b/pkg/apis/helpers/helpers.go @@ -130,3 +130,21 @@ func DeleteConfigmap(job *vkv1.Job, kubeClients kubernetes.Interface, cmName str return nil } + +// GeneratePodgroupName generate podgroup name of normal pod +func GeneratePodgroupName(pod *v1.Pod) string { + pgName := vkbatchv1.PodgroupNamePrefix + + if len(pod.OwnerReferences) != 0 { + for _, ownerReference := range pod.OwnerReferences { + if ownerReference.Controller != nil && *ownerReference.Controller == true { + pgName += string(ownerReference.UID) + return pgName + } + } + } + + pgName += string(pod.UID) + + return pgName +} diff --git a/pkg/controllers/job/helpers/helpers.go b/pkg/controllers/job/helpers/helpers.go index 905376e31c..369c66153c 100644 --- a/pkg/controllers/job/helpers/helpers.go +++ b/pkg/controllers/job/helpers/helpers.go @@ -18,10 +18,12 @@ package helpers import ( "fmt" - "k8s.io/api/core/v1" "math/rand" "strings" "time" + + "k8s.io/api/core/v1" + "volcano.sh/volcano/pkg/controllers/apis" ) diff --git a/pkg/controllers/job/job_controller_actions_test.go b/pkg/controllers/job/job_controller_actions_test.go index 58184f007e..7d592f6d57 100644 --- a/pkg/controllers/job/job_controller_actions_test.go +++ b/pkg/controllers/job/job_controller_actions_test.go @@ -18,9 +18,11 @@ package job import ( "fmt" + "testing" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "testing" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" "volcano.sh/volcano/pkg/controllers/apis" diff --git a/pkg/controllers/podgroup/pg_controller_handler.go b/pkg/controllers/podgroup/pg_controller_handler.go index 7ab072e087..f74c84f4fd 100644 --- a/pkg/controllers/podgroup/pg_controller_handler.go +++ b/pkg/controllers/podgroup/pg_controller_handler.go @@ -23,7 +23,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) @@ -71,7 +70,7 @@ func (cc *Controller) updatePodAnnotations(pod *v1.Pod, pgName string) error { } func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error { - pgName := generatePodgroupName(pod) + pgName := helpers.GeneratePodgroupName(pod) if _, err := cc.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil { if !apierrors.IsNotFound(err) { @@ -101,23 +100,6 @@ func (cc *Controller) createNormalPodPGIfNotExist(pod *v1.Pod) error { return cc.updatePodAnnotations(pod, pgName) } -func generatePodgroupName(pod *v1.Pod) string { - pgName := vkbatchv1.PodgroupNamePrefix - - if len(pod.OwnerReferences) != 0 { - for _, ownerReference := range pod.OwnerReferences { - if ownerReference.Controller != nil && *ownerReference.Controller == true { - pgName += string(ownerReference.UID) - return pgName - } - } - } - - pgName += string(pod.UID) - - return pgName -} - func newPGOwnerReferences(pod *v1.Pod) []metav1.OwnerReference { if len(pod.OwnerReferences) != 0 { for _, ownerReference := range pod.OwnerReferences { diff --git a/test/e2e/admission.go b/test/e2e/admission.go index 79652940e1..a6a631935c 100644 --- a/test/e2e/admission.go +++ b/test/e2e/admission.go @@ -18,10 +18,15 @@ package e2e import ( "encoding/json" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + schedulingv1alpha2 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) var _ = Describe("Job E2E Test: Test Admission service", func() { @@ -152,4 +157,74 @@ var _ = Describe("Job E2E Test: Test Admission service", func() { }) + It("Create default-scheduler pod", func() { + podName := "pod-default-scheduler" + namespace := "test" + context := initTestContext() + defer cleanupTestContext(context) + + pod := &corev1.Pod{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: podName, + }, + Spec: corev1.PodSpec{ + Containers: createContainers(defaultNginxImage, "", "", oneCPU, oneCPU, 0), + }, + } + + _, err := context.kubeclient.CoreV1().Pods(namespace).Create(pod) + Expect(err).NotTo(HaveOccurred()) + + err = waitPodPhase(context, pod, []corev1.PodPhase{corev1.PodRunning}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("Can't create volcano pod when podgroup is Pending", func() { + podName := "pod-volcano" + pgName := "pending-pg" + namespace := "test" + context := initTestContext() + defer cleanupTestContext(context) + + pg := &schedulingv1alpha2.PodGroup{ + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: pgName, + }, + Spec: schedulingv1alpha2.PodGroupSpec{ + MinMember: 1, + MinResources: &thirtyCPU, + }, + Status: schedulingv1alpha2.PodGroupStatus{ + Phase: schedulingv1alpha2.PodGroupPending, + }, + } + + pod := &corev1.Pod{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: podName, + Annotations: map[string]string{schedulingv1alpha2.GroupNameAnnotationKey: pgName}, + }, + Spec: corev1.PodSpec{ + SchedulerName: "volcano", + Containers: createContainers(defaultNginxImage, "", "", oneCPU, oneCPU, 0), + }, + } + + _, err := context.vcclient.SchedulingV1alpha2().PodGroups(namespace).Create(pg) + Expect(err).NotTo(HaveOccurred()) + + _, err = context.kubeclient.CoreV1().Pods(namespace).Create(pod) + Expect(err.Error()).Should(ContainSubstring(`Failed to create pod , because the podgroup phase is Pending`)) + }) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 3b771c53c6..aac064f4dd 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -52,6 +52,7 @@ var ( oneMinute = 1 * time.Minute twoMinute = 2 * time.Minute oneCPU = v1.ResourceList{"cpu": resource.MustParse("1000m")} + thirtyCPU = v1.ResourceList{"cpu": resource.MustParse("30000m")} ) const (