diff --git a/cmd/admission/main.go b/cmd/admission/main.go index 37bc9e2ff4f..94e478c9176 100644 --- a/cmd/admission/main.go +++ b/cmd/admission/main.go @@ -16,7 +16,6 @@ limitations under the License. package main import ( - "io/ioutil" "net/http" "os" "os/signal" @@ -34,17 +33,13 @@ import ( "volcano.sh/volcano/cmd/admission/app" "volcano.sh/volcano/cmd/admission/app/options" - "volcano.sh/volcano/pkg/admission" + "volcano.sh/volcano/pkg/admission/router" "volcano.sh/volcano/pkg/version" -) - -func serveJobs(w http.ResponseWriter, r *http.Request) { - admission.Serve(w, r, admission.AdmitJobs) -} -func serveMutateJobs(w http.ResponseWriter, r *http.Request) { - admission.Serve(w, r, admission.MutateJobs) -} + _ "volcano.sh/volcano/pkg/admission/jobs/mutate" + _ "volcano.sh/volcano/pkg/admission/jobs/validate" + _ "volcano.sh/volcano/pkg/admission/pods" +) var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes") @@ -64,41 +59,43 @@ func main() { go wait.Until(klog.Flush, *logFlushFreq, wait.NeverStop) defer klog.Flush() - http.HandleFunc(admission.AdmitJobPath, serveJobs) - http.HandleFunc(admission.MutateJobPath, serveMutateJobs) - if err := config.CheckPortOrDie(); err != nil { klog.Fatalf("Configured port is invalid: %v", err) } - addr := ":" + strconv.Itoa(config.Port) restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig) if err != nil { klog.Fatalf("Unable to build k8s config: %v", err) } - admission.VolcanoClientSet = app.GetVolcanoClient(restConfig) - - servePods(config) - - caBundle, err := ioutil.ReadFile(config.CaCertFile) - if err != nil { - klog.Fatalf("Unable to read cacert file: %v", err) - } - - err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle) - if err != nil { - klog.Fatalf("Unable to register webhook configs: %v", err) - } + vClient := app.GetVolcanoClient(restConfig) + router.ForEachAdmission(func(service *router.AdmissionService) { + if service.Config != nil { + service.Config.VolcanoClient = vClient + service.Config.SchedulerName = config.SchedulerName + } + http.HandleFunc(service.Path, service.Handler) + }) + + // + //caBundle, err := ioutil.ReadFile(config.CaCertFile) + //if err != nil { + // klog.Fatalf("Unable to read cacert file: %v", err) + //} + //// + ////err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle) + ////if err != nil { + //// klog.Fatalf("Unable to register webhook configs: %v", err) + ////} + webhookServeError := make(chan struct{}) stopChannel := make(chan os.Signal) signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT) server := &http.Server{ - Addr: addr, + Addr: ":" + strconv.Itoa(config.Port), TLSConfig: app.ConfigTLS(config, restConfig), } - webhookServeError := make(chan struct{}) go func() { err = server.ListenAndServeTLS("", "") if err != nil && err != http.ErrServerClosed { @@ -117,13 +114,3 @@ func main() { return } } - -func servePods(config *options.Config) { - admController := &admission.Controller{ - VcClients: admission.VolcanoClientSet, - SchedulerName: config.SchedulerName, - } - http.HandleFunc(admission.AdmitPodPath, admController.ServerPods) - - return -} diff --git a/hack/.golint_failures b/hack/.golint_failures index 6a566edd03c..b8ea2a469b6 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -1,3 +1,6 @@ +volcano.sh/volcano/pkg/admission/jobs/mutate +volcano.sh/volcano/pkg/admission/router +volcano.sh/volcano/pkg/admission/schema volcano.sh/volcano/pkg/apis/scheduling volcano.sh/volcano/pkg/apis/scheduling/v1alpha1 volcano.sh/volcano/pkg/apis/scheduling/v1alpha2 diff --git a/pkg/admission/mutate_job.go b/pkg/admission/jobs/mutate/mutate_job.go similarity index 84% rename from pkg/admission/mutate_job.go rename to pkg/admission/jobs/mutate/mutate_job.go index 8fba26f7e0d..f2d6473b91f 100644 --- a/pkg/admission/mutate_job.go +++ b/pkg/admission/jobs/mutate/mutate_job.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package mutate import ( "encoding/json" @@ -25,6 +25,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" + "volcano.sh/volcano/pkg/admission/router" + "volcano.sh/volcano/pkg/admission/schema" + "volcano.sh/volcano/pkg/admission/util" "volcano.sh/volcano/pkg/apis/batch/v1alpha1" ) @@ -33,6 +36,11 @@ const ( DefaultQueue = "default" ) +var Service = &router.AdmissionService{ + Path: "/mutating-jobs", + Func: MutateJobs, +} + type patchOperation struct { Op string `json:"op"` Path string `json:"path"` @@ -43,9 +51,9 @@ type patchOperation struct { func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { klog.V(3).Infof("mutating jobs") - job, err := DecodeJob(ar.Request.Object, ar.Request.Resource) + job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource) if err != nil { - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } reviewResponse := v1beta1.AdmissionResponse{} @@ -58,7 +66,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { break default: err = fmt.Errorf("expect operation to be 'CREATE' ") - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } if err != nil { @@ -73,7 +81,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { return &reviewResponse } -func createPatch(job v1alpha1.Job) ([]byte, error) { +func createPatch(job *v1alpha1.Job) ([]byte, error) { var patch []patchOperation pathQueue := patchDefaultQueue(job) if pathQueue != nil { @@ -86,7 +94,7 @@ func createPatch(job v1alpha1.Job) ([]byte, error) { return json.Marshal(patch) } -func patchDefaultQueue(job v1alpha1.Job) *patchOperation { +func patchDefaultQueue(job *v1alpha1.Job) *patchOperation { //Add default queue if not specified. if job.Spec.Queue == "" { return &patchOperation{Op: "add", Path: "/spec/queue", Value: DefaultQueue} diff --git a/pkg/admission/mutate_job_test.go b/pkg/admission/jobs/mutate/mutate_job_test.go similarity index 99% rename from pkg/admission/mutate_job_test.go rename to pkg/admission/jobs/mutate/mutate_job_test.go index f9ba79a9e6d..e6fa8820779 100644 --- a/pkg/admission/mutate_job_test.go +++ b/pkg/admission/jobs/mutate/mutate_job_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package mutate import ( "testing" diff --git a/pkg/admission/admit_job.go b/pkg/admission/jobs/validate/admit_job.go similarity index 83% rename from pkg/admission/admit_job.go rename to pkg/admission/jobs/validate/admit_job.go index a34e8a3bf36..19c2a47f2b7 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/jobs/validate/admit_job.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package validate import ( "fmt" @@ -30,23 +30,33 @@ import ( k8scorev1 "k8s.io/kubernetes/pkg/apis/core/v1" k8scorevalid "k8s.io/kubernetes/pkg/apis/core/validation" + "volcano.sh/volcano/pkg/admission/router" + "volcano.sh/volcano/pkg/admission/schema" + "volcano.sh/volcano/pkg/admission/util" "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/controllers/job/plugins" ) -// VolcanoClientSet is volcano clientset -// TODO: make it as package local var. -var VolcanoClientSet vcclientset.Interface +func init() { + router.RegisterAdmission(service) +} + +var service = &router.AdmissionService{ + Path: "/jobs", + Func: AdmitJobs, + + Config: config, +} + +var config = &router.AdmissionServiceConfig{} // AdmitJobs is to admit jobs and return response func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { - klog.V(3).Infof("admitting jobs -- %s", ar.Request.Operation) - job, err := DecodeJob(ar.Request.Object, ar.Request.Resource) + job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource) if err != nil { - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } var msg string reviewResponse := v1beta1.AdmissionResponse{} @@ -57,14 +67,14 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { msg = validateJob(job, &reviewResponse) break case v1beta1.Update: - _, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource) + _, err := schema.DecodeJob(ar.Request.OldObject, ar.Request.Resource) if err != nil { - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } break default: err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'") - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } if !reviewResponse.Allowed { @@ -73,8 +83,7 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { return &reviewResponse } -func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string { - +func validateJob(job *v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string { var msg string taskNames := map[string]string{} var totalReplicas int32 @@ -151,9 +160,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st } // Check whether Queue already present or not - if _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { + if _, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { // TODO: deprecate v1alpha1 - if _, err := VolcanoClientSet.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { + if _, err := config.VolcanoClient.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { msg = msg + fmt.Sprintf(" unable to find job queue: %v", err) } } @@ -165,7 +174,7 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st return msg } -func validateTaskTemplate(task v1alpha1.TaskSpec, job v1alpha1.Job, index int) string { +func validateTaskTemplate(task v1alpha1.TaskSpec, job *v1alpha1.Job, index int) string { var v1PodTemplate v1.PodTemplate v1PodTemplate.Template = *task.Template.DeepCopy() k8scorev1.SetObjectDefaults_PodTemplate(&v1PodTemplate) diff --git a/pkg/admission/admit_job_test.go b/pkg/admission/jobs/validate/admit_job_test.go similarity index 99% rename from pkg/admission/admit_job_test.go rename to pkg/admission/jobs/validate/admit_job_test.go index f1e82439250..fdba61d2e1d 100644 --- a/pkg/admission/admit_job_test.go +++ b/pkg/admission/jobs/validate/admit_job_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package validate import ( "strings" @@ -1046,15 +1046,15 @@ func TestValidateExecution(t *testing.T) { }, } // create fake volcano clientset - VolcanoClientSet = fakeclient.NewSimpleClientset() + config.VolcanoClient = fakeclient.NewSimpleClientset() //create default queue - _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Create(&defaultqueue) + _, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Create(&defaultqueue) if err != nil { t.Error("Queue Creation Failed") } - ret := validateJob(testCase.Job, &testCase.reviewResponse) + ret := validateJob(&testCase.Job, &testCase.reviewResponse) //fmt.Printf("test-case name:%s, ret:%v testCase.reviewResponse:%v \n", testCase.Name, ret,testCase.reviewResponse) if testCase.ExpectErr == true && ret == "" { t.Errorf("Expect error msg :%s, but got nil.", testCase.ret) diff --git a/pkg/admission/admission_controller.go b/pkg/admission/jobs/validate/util.go similarity index 71% rename from pkg/admission/admission_controller.go rename to pkg/admission/jobs/validate/util.go index c7f9794607c..3c986bb555f 100644 --- a/pkg/admission/admission_controller.go +++ b/pkg/admission/jobs/validate/util.go @@ -14,55 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package validate import ( "fmt" "github.com/hashicorp/go-multierror" - "k8s.io/api/admission/v1beta1" - admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/klog" "k8s.io/kubernetes/pkg/apis/core/validation" batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned" ) -const ( - // AdmitJobPath is the pattern for the jobs admission - AdmitJobPath = "/jobs" - // MutateJobPath is the pattern for the mutating jobs - MutateJobPath = "/mutating-jobs" - // AdmitPodPath is the pattern for the pods admission - AdmitPodPath = "/pods" - // CONTENTTYPE http content-type - CONTENTTYPE = "Content-Type" - // APPLICATIONJSON json content - APPLICATIONJSON = "application/json" -) - -//The AdmitFunc returns response -type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse - -// Controller the Admission Controller type -type Controller struct { - VcClients vcclientset.Interface - SchedulerName string -} - -var scheme = runtime.NewScheme() - -//Codecs is for retrieving serializers for the supported wire formats -//and conversion wrappers to define preferred internal and external versions. -var Codecs = serializer.NewCodecFactory(scheme) - // policyEventMap defines all policy events and whether to allow external use var policyEventMap = map[batchv1alpha1.Event]bool{ batchv1alpha1.AnyEvent: true, @@ -86,45 +50,6 @@ var policyActionMap = map[batchv1alpha1.Action]bool{ batchv1alpha1.EnqueueAction: false, } -func init() { - addToScheme(scheme) -} - -func addToScheme(scheme *runtime.Scheme) { - corev1.AddToScheme(scheme) - admissionregistrationv1beta1.AddToScheme(scheme) -} - -//ToAdmissionResponse updates the admission response with the input error -func ToAdmissionResponse(err error) *v1beta1.AdmissionResponse { - klog.Error(err) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: err.Error(), - }, - } -} - -//DecodeJob decodes the job using deserializer from the raw object -func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource) (batchv1alpha1.Job, error) { - jobResource := metav1.GroupVersionResource{Group: batchv1alpha1.SchemeGroupVersion.Group, Version: batchv1alpha1.SchemeGroupVersion.Version, Resource: "jobs"} - raw := object.Raw - job := batchv1alpha1.Job{} - - if resource != jobResource { - err := fmt.Errorf("expect resource to be %s", jobResource) - return job, err - } - - deserializer := Codecs.UniversalDeserializer() - if _, _, err := deserializer.Decode(raw, nil, &job); err != nil { - return job, err - } - klog.V(3).Infof("the job struct is %+v", job) - - return job, nil -} - func validatePolicies(policies []batchv1alpha1.LifecyclePolicy, fldPath *field.Path) error { var err error policyEvents := map[batchv1alpha1.Event]struct{}{} diff --git a/pkg/admission/admit_pod.go b/pkg/admission/pods/admit_pod.go similarity index 56% rename from pkg/admission/admit_pod.go rename to pkg/admission/pods/admit_pod.go index 09d891f498c..ace63758678 100644 --- a/pkg/admission/admit_pod.go +++ b/pkg/admission/pods/admit_pod.go @@ -14,38 +14,47 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package pods import ( "fmt" - "net/http" "strings" "k8s.io/api/admission/v1beta1" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog" + "volcano.sh/volcano/pkg/admission/router" + "volcano.sh/volcano/pkg/admission/schema" + "volcano.sh/volcano/pkg/admission/util" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) -// ServerPods is to server pods -func (c *Controller) ServerPods(w http.ResponseWriter, r *http.Request) { - Serve(w, r, c.AdmitPods) +func init() { + router.RegisterAdmission(service) } +var service = &router.AdmissionService{ + Path: "/pods", + Func: AdmitPods, + + Config: config, +} + +var config = &router.AdmissionServiceConfig{} + // AdmitPods is to admit pods and return response -func (c *Controller) AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { +func AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { klog.V(3).Infof("admitting pods -- %s", ar.Request.Operation) - pod, err := decodePod(ar.Request.Object, ar.Request.Resource) + pod, err := schema.DecodePod(ar.Request.Object, ar.Request.Resource) if err != nil { - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } var msg string @@ -54,11 +63,11 @@ func (c *Controller) AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionRes switch ar.Request.Operation { case v1beta1.Create: - msg = c.validatePod(pod, &reviewResponse) + msg = validatePod(pod, &reviewResponse) break default: err := fmt.Errorf("expect operation to be 'CREATE'") - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } if !reviewResponse.Allowed { @@ -67,31 +76,12 @@ func (c *Controller) AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionRes return &reviewResponse } -func decodePod(object runtime.RawExtension, resource metav1.GroupVersionResource) (v1.Pod, error) { - podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} - raw := object.Raw - pod := v1.Pod{} - - if resource != podResource { - err := fmt.Errorf("expect resource to be %s", podResource) - return pod, err - } - - deserializer := Codecs.UniversalDeserializer() - if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil { - return pod, err - } - klog.V(3).Infof("the pod struct is %+v", pod) - - return pod, nil -} - // allow pods to create when // 1. schedulerName of pod isn't volcano // 2. pod has Podgroup whose phase isn't Pending // 3. normal pods whose schedulerName is volcano don't have podgroup -func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionResponse) string { - if pod.Spec.SchedulerName != c.SchedulerName { +func validatePod(pod *v1.Pod, reviewResponse *v1beta1.AdmissionResponse) string { + if pod.Spec.SchedulerName != config.SchedulerName { return "" } @@ -103,7 +93,7 @@ func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionRe pgName = pod.Annotations[v1alpha2.GroupNameAnnotationKey] } if pgName != "" { - if err := c.checkPGPhase(pod, pgName, true); err != nil { + if err := checkPGPhase(pod, pgName, true); err != nil { msg = err.Error() reviewResponse.Allowed = false } @@ -111,8 +101,8 @@ func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionRe } // normal pod, SN == volcano - pgName = helpers.GeneratePodgroupName(&pod) - if err := c.checkPGPhase(pod, pgName, false); err != nil { + pgName = helpers.GeneratePodgroupName(pod) + if err := checkPGPhase(pod, pgName, false); err != nil { msg = err.Error() reviewResponse.Allowed = false } @@ -120,13 +110,13 @@ func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionRe return msg } -func (c *Controller) checkPGPhase(pod v1.Pod, pgName string, isVCJob bool) error { - pg, err := c.VcClients.SchedulingV1alpha2().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) +func checkPGPhase(pod *v1.Pod, pgName string, isVCJob bool) error { + pg, err := config.VolcanoClient.SchedulingV1alpha2().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) if err != nil { - pg, err := c.VcClients.SchedulingV1alpha1().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) + pg, err := config.VolcanoClient.SchedulingV1alpha1().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) if err != nil { if isVCJob || (!isVCJob && !apierrors.IsNotFound(err)) { - return fmt.Errorf("Failed to get PodGroup for pod <%s/%s>: %v", pod.Namespace, pod.Name, err) + return fmt.Errorf("failed to get PodGroup for pod <%s/%s>: %v", pod.Namespace, pod.Name, err) } return nil } @@ -137,6 +127,6 @@ func (c *Controller) checkPGPhase(pod v1.Pod, pgName string, isVCJob bool) error if pg.Status.Phase != v1alpha2.PodGroupPending { return nil } - return fmt.Errorf("Failed to create pod <%s/%s>, because the podgroup phase is Pending", + return fmt.Errorf("failed to create pod <%s/%s> as the podgroup phase is Pending", pod.Namespace, pod.Name) } diff --git a/pkg/admission/admit_pod_test.go b/pkg/admission/pods/admit_pod_test.go similarity index 88% rename from pkg/admission/admit_pod_test.go rename to pkg/admission/pods/admit_pod_test.go index b9b42f1fb04..f71f0cf30f5 100644 --- a/pkg/admission/admit_pod_test.go +++ b/pkg/admission/pods/admit_pod_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package pods import ( "strings" @@ -84,7 +84,7 @@ func TestValidatePod(t *testing.T) { }, reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, - ret: "Failed to create pod , because the podgroup phase is Pending", + ret: "failed to create pod as the podgroup phase is Pending", ExpectErr: true, }, // validate volcano pod with volcano scheduler @@ -106,7 +106,7 @@ func TestValidatePod(t *testing.T) { }, reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, - ret: "Failed to create pod , because the podgroup phase is Pending", + ret: "failed to create pod as the podgroup phase is Pending", ExpectErr: true, }, // validate volcano pod with volcano scheduler when get pg failed @@ -128,7 +128,7 @@ func TestValidatePod(t *testing.T) { }, reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, - ret: `Failed to get PodGroup for pod : podgroups.scheduling "podgroup-p1" not found`, + ret: `failed to get PodGroup for pod : podgroups.scheduling "podgroup-p1" not found`, ExpectErr: true, disabledPG: true, }, @@ -150,21 +150,17 @@ func TestValidatePod(t *testing.T) { } // create fake volcano clientset - VolcanoClientSet = vcclient.NewSimpleClientset() + config.VolcanoClient = vcclient.NewSimpleClientset() + config.SchedulerName = "volcano" if !testCase.disabledPG { - _, err := VolcanoClientSet.SchedulingV1alpha2().PodGroups(namespace).Create(pg) + _, err := config.VolcanoClient.SchedulingV1alpha2().PodGroups(namespace).Create(pg) if err != nil { t.Error("PG Creation Failed") } } - c := Controller{ - VcClients: VolcanoClientSet, - SchedulerName: "volcano", - } - - ret := c.validatePod(testCase.Pod, &testCase.reviewResponse) + ret := validatePod(&testCase.Pod, &testCase.reviewResponse) if testCase.ExpectErr == true && ret == "" { t.Errorf("%s: test case Expect error msg :%s, but got nil.", testCase.Name, testCase.ret) diff --git a/pkg/admission/router/factory.go b/pkg/admission/router/factory.go new file mode 100644 index 00000000000..6461fefea31 --- /dev/null +++ b/pkg/admission/router/factory.go @@ -0,0 +1,36 @@ +package router + +import ( + "fmt" + "net/http" + "sync" +) + +type AdmissionHandler func(w http.ResponseWriter, r *http.Request) + +var admissionMap = make(map[string]*AdmissionService) +var admissionMutex sync.Mutex + +func RegisterAdmission(service *AdmissionService) error { + admissionMutex.Lock() + defer admissionMutex.Unlock() + + if _, found := admissionMap[service.Path]; found { + return fmt.Errorf("duplicated admission service for %s", service.Path) + } + + // Also register handler to the service. + service.Handler = func(w http.ResponseWriter, r *http.Request) { + Serve(w, r, service.Func) + } + + admissionMap[service.Path] = service + + return nil +} + +func ForEachAdmission(handler func(*AdmissionService)) { + for _, f := range admissionMap { + handler(f) + } +} diff --git a/pkg/admission/router/interface.go b/pkg/admission/router/interface.go new file mode 100644 index 00000000000..c54052ece26 --- /dev/null +++ b/pkg/admission/router/interface.go @@ -0,0 +1,25 @@ +package router + +import ( + "k8s.io/api/admission/v1beta1" + "k8s.io/client-go/kubernetes" + + "volcano.sh/volcano/pkg/client/clientset/versioned" +) + +//The AdmitFunc returns response +type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse + +type AdmissionServiceConfig struct { + SchedulerName string + KubeClient kubernetes.Interface + VolcanoClient versioned.Interface +} + +type AdmissionService struct { + Path string + Func AdmitFunc + Handler AdmissionHandler + + Config *AdmissionServiceConfig +} diff --git a/pkg/admission/server.go b/pkg/admission/router/server.go similarity index 85% rename from pkg/admission/server.go rename to pkg/admission/router/server.go index f61a8d9d66c..5132606a928 100644 --- a/pkg/admission/server.go +++ b/pkg/admission/router/server.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package router import ( "encoding/json" @@ -24,8 +24,17 @@ import ( "k8s.io/api/admission/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog" + + "volcano.sh/volcano/pkg/admission/schema" + "volcano.sh/volcano/pkg/admission/util" ) +// CONTENTTYPE http content-type +var CONTENTTYPE = "Content-Type" + +// APPLICATIONJSON json content +var APPLICATIONJSON = "application/json" + // Serve the http request func Serve(w http.ResponseWriter, r *http.Request, admit AdmitFunc) { var body []byte @@ -44,9 +53,9 @@ func Serve(w http.ResponseWriter, r *http.Request, admit AdmitFunc) { var reviewResponse *v1beta1.AdmissionResponse ar := v1beta1.AdmissionReview{} - deserializer := Codecs.UniversalDeserializer() + deserializer := schema.Codecs.UniversalDeserializer() if _, _, err := deserializer.Decode(body, nil, &ar); err != nil { - reviewResponse = ToAdmissionResponse(err) + reviewResponse = util.ToAdmissionResponse(err) } else { reviewResponse = admit(ar) } diff --git a/pkg/admission/schema/schema.go b/pkg/admission/schema/schema.go new file mode 100644 index 00000000000..20d60005bbd --- /dev/null +++ b/pkg/admission/schema/schema.go @@ -0,0 +1,69 @@ +package schema + +import ( + "fmt" + + "k8s.io/api/admission/v1beta1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/klog" + corev1 "k8s.io/kubernetes/pkg/apis/core/v1" + + batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" +) + +func init() { + addToScheme(scheme) +} + +var scheme = runtime.NewScheme() + +//Codecs is for retrieving serializers for the supported wire formats +//and conversion wrappers to define preferred internal and external versions. +var Codecs = serializer.NewCodecFactory(scheme) + +func addToScheme(scheme *runtime.Scheme) { + corev1.AddToScheme(scheme) + v1beta1.AddToScheme(scheme) +} + +//DecodeJob decodes the job using deserializer from the raw object +func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource) (*batchv1alpha1.Job, error) { + jobResource := metav1.GroupVersionResource{Group: batchv1alpha1.SchemeGroupVersion.Group, Version: batchv1alpha1.SchemeGroupVersion.Version, Resource: "jobs"} + raw := object.Raw + job := batchv1alpha1.Job{} + + if resource != jobResource { + err := fmt.Errorf("expect resource to be %s", jobResource) + return &job, err + } + + deserializer := Codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(raw, nil, &job); err != nil { + return &job, err + } + klog.V(3).Infof("the job struct is %+v", job) + + return &job, nil +} + +func DecodePod(object runtime.RawExtension, resource metav1.GroupVersionResource) (*v1.Pod, error) { + podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + raw := object.Raw + pod := v1.Pod{} + + if resource != podResource { + err := fmt.Errorf("expect resource to be %s", podResource) + return &pod, err + } + + deserializer := Codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil { + return &pod, err + } + klog.V(3).Infof("the pod struct is %+v", pod) + + return &pod, nil +} diff --git a/pkg/admission/util/util.go b/pkg/admission/util/util.go new file mode 100644 index 00000000000..388039806a8 --- /dev/null +++ b/pkg/admission/util/util.go @@ -0,0 +1,15 @@ +package util + +import "k8s.io/api/admission/v1beta1" +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +import "k8s.io/klog" + +//ToAdmissionResponse updates the admission response with the input error +func ToAdmissionResponse(err error) *v1beta1.AdmissionResponse { + klog.Error(err) + return &v1beta1.AdmissionResponse{ + Result: &metav1.Status{ + Message: err.Error(), + }, + } +}