Skip to content

Commit

Permalink
Admission refactor.
Browse files Browse the repository at this point in the history
Signed-off-by: Klaus Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Dec 1, 2019
1 parent 9077081 commit f364d60
Show file tree
Hide file tree
Showing 14 changed files with 269 additions and 197 deletions.
65 changes: 26 additions & 39 deletions cmd/admission/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License.
package main

import (
"io/ioutil"
"net/http"
"os"
"os/signal"
Expand All @@ -34,17 +33,13 @@ import (

"volcano.sh/volcano/cmd/admission/app"
"volcano.sh/volcano/cmd/admission/app/options"
"volcano.sh/volcano/pkg/admission"
"volcano.sh/volcano/pkg/admission/router"
"volcano.sh/volcano/pkg/version"
)

func serveJobs(w http.ResponseWriter, r *http.Request) {
admission.Serve(w, r, admission.AdmitJobs)
}

func serveMutateJobs(w http.ResponseWriter, r *http.Request) {
admission.Serve(w, r, admission.MutateJobs)
}
_ "volcano.sh/volcano/pkg/admission/jobs/mutate"
_ "volcano.sh/volcano/pkg/admission/jobs/validate"
_ "volcano.sh/volcano/pkg/admission/pods"
)

var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")

Expand All @@ -64,41 +59,43 @@ func main() {
go wait.Until(klog.Flush, *logFlushFreq, wait.NeverStop)
defer klog.Flush()

http.HandleFunc(admission.AdmitJobPath, serveJobs)
http.HandleFunc(admission.MutateJobPath, serveMutateJobs)

if err := config.CheckPortOrDie(); err != nil {
klog.Fatalf("Configured port is invalid: %v", err)
}
addr := ":" + strconv.Itoa(config.Port)

restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
if err != nil {
klog.Fatalf("Unable to build k8s config: %v", err)
}

admission.VolcanoClientSet = app.GetVolcanoClient(restConfig)

servePods(config)

caBundle, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
klog.Fatalf("Unable to read cacert file: %v", err)
}

err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle)
if err != nil {
klog.Fatalf("Unable to register webhook configs: %v", err)
}
vClient := app.GetVolcanoClient(restConfig)
router.ForEachAdmission(func(service *router.AdmissionService) {
if service.Config != nil {
service.Config.VolcanoClient = vClient
service.Config.SchedulerName = config.SchedulerName
}
http.HandleFunc(service.Path, service.Handler)
})

//
//caBundle, err := ioutil.ReadFile(config.CaCertFile)
//if err != nil {
// klog.Fatalf("Unable to read cacert file: %v", err)
//}
////
////err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle)
////if err != nil {
//// klog.Fatalf("Unable to register webhook configs: %v", err)
////}

webhookServeError := make(chan struct{})
stopChannel := make(chan os.Signal)
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)

server := &http.Server{
Addr: addr,
Addr: ":" + strconv.Itoa(config.Port),
TLSConfig: app.ConfigTLS(config, restConfig),
}
webhookServeError := make(chan struct{})
go func() {
err = server.ListenAndServeTLS("", "")
if err != nil && err != http.ErrServerClosed {
Expand All @@ -117,13 +114,3 @@ func main() {
return
}
}

func servePods(config *options.Config) {
admController := &admission.Controller{
VcClients: admission.VolcanoClientSet,
SchedulerName: config.SchedulerName,
}
http.HandleFunc(admission.AdmitPodPath, admController.ServerPods)

return
}
3 changes: 3 additions & 0 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
volcano.sh/volcano/pkg/admission/jobs/mutate
volcano.sh/volcano/pkg/admission/router
volcano.sh/volcano/pkg/admission/schema
volcano.sh/volcano/pkg/apis/scheduling
volcano.sh/volcano/pkg/apis/scheduling/v1alpha1
volcano.sh/volcano/pkg/apis/scheduling/v1alpha2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package admission
package mutate

import (
"encoding/json"
Expand All @@ -25,6 +25,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"

"volcano.sh/volcano/pkg/admission/router"
"volcano.sh/volcano/pkg/admission/schema"
"volcano.sh/volcano/pkg/admission/util"
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

Expand All @@ -33,6 +36,11 @@ const (
DefaultQueue = "default"
)

var Service = &router.AdmissionService{
Path: "/mutating-jobs",
Func: MutateJobs,
}

type patchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Expand All @@ -43,9 +51,9 @@ type patchOperation struct {
func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
klog.V(3).Infof("mutating jobs")

job, err := DecodeJob(ar.Request.Object, ar.Request.Resource)
job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource)
if err != nil {
return ToAdmissionResponse(err)
return util.ToAdmissionResponse(err)
}

reviewResponse := v1beta1.AdmissionResponse{}
Expand All @@ -58,7 +66,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
break
default:
err = fmt.Errorf("expect operation to be 'CREATE' ")
return ToAdmissionResponse(err)
return util.ToAdmissionResponse(err)
}

if err != nil {
Expand All @@ -73,7 +81,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
return &reviewResponse
}

func createPatch(job v1alpha1.Job) ([]byte, error) {
func createPatch(job *v1alpha1.Job) ([]byte, error) {
var patch []patchOperation
pathQueue := patchDefaultQueue(job)
if pathQueue != nil {
Expand All @@ -86,7 +94,7 @@ func createPatch(job v1alpha1.Job) ([]byte, error) {
return json.Marshal(patch)
}

func patchDefaultQueue(job v1alpha1.Job) *patchOperation {
func patchDefaultQueue(job *v1alpha1.Job) *patchOperation {
//Add default queue if not specified.
if job.Spec.Queue == "" {
return &patchOperation{Op: "add", Path: "/spec/queue", Value: DefaultQueue}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package admission
package mutate

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package admission
package validate

import (
"fmt"
Expand All @@ -30,23 +30,33 @@ import (
k8scorev1 "k8s.io/kubernetes/pkg/apis/core/v1"
k8scorevalid "k8s.io/kubernetes/pkg/apis/core/validation"

"volcano.sh/volcano/pkg/admission/router"
"volcano.sh/volcano/pkg/admission/schema"
"volcano.sh/volcano/pkg/admission/util"
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/job/plugins"
)

// VolcanoClientSet is volcano clientset
// TODO: make it as package local var.
var VolcanoClientSet vcclientset.Interface
func init() {
router.RegisterAdmission(service)
}

var service = &router.AdmissionService{
Path: "/jobs",
Func: AdmitJobs,

Config: config,
}

var config = &router.AdmissionServiceConfig{}

// AdmitJobs is to admit jobs and return response
func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {

klog.V(3).Infof("admitting jobs -- %s", ar.Request.Operation)

job, err := DecodeJob(ar.Request.Object, ar.Request.Resource)
job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource)
if err != nil {
return ToAdmissionResponse(err)
return util.ToAdmissionResponse(err)
}
var msg string
reviewResponse := v1beta1.AdmissionResponse{}
Expand All @@ -57,14 +67,14 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
msg = validateJob(job, &reviewResponse)
break
case v1beta1.Update:
_, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
_, err := schema.DecodeJob(ar.Request.OldObject, ar.Request.Resource)
if err != nil {
return ToAdmissionResponse(err)
return util.ToAdmissionResponse(err)
}
break
default:
err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'")
return ToAdmissionResponse(err)
return util.ToAdmissionResponse(err)
}

if !reviewResponse.Allowed {
Expand All @@ -73,8 +83,7 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
return &reviewResponse
}

func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string {

func validateJob(job *v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string {
var msg string
taskNames := map[string]string{}
var totalReplicas int32
Expand Down Expand Up @@ -151,9 +160,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
}

// Check whether Queue already present or not
if _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
if _, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
// TODO: deprecate v1alpha1
if _, err := VolcanoClientSet.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
if _, err := config.VolcanoClient.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
msg = msg + fmt.Sprintf(" unable to find job queue: %v", err)
}
}
Expand All @@ -165,7 +174,7 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
return msg
}

func validateTaskTemplate(task v1alpha1.TaskSpec, job v1alpha1.Job, index int) string {
func validateTaskTemplate(task v1alpha1.TaskSpec, job *v1alpha1.Job, index int) string {
var v1PodTemplate v1.PodTemplate
v1PodTemplate.Template = *task.Template.DeepCopy()
k8scorev1.SetObjectDefaults_PodTemplate(&v1PodTemplate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package admission
package validate

import (
"strings"
Expand Down Expand Up @@ -1046,15 +1046,15 @@ func TestValidateExecution(t *testing.T) {
},
}
// create fake volcano clientset
VolcanoClientSet = fakeclient.NewSimpleClientset()
config.VolcanoClient = fakeclient.NewSimpleClientset()

//create default queue
_, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Create(&defaultqueue)
_, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Create(&defaultqueue)
if err != nil {
t.Error("Queue Creation Failed")
}

ret := validateJob(testCase.Job, &testCase.reviewResponse)
ret := validateJob(&testCase.Job, &testCase.reviewResponse)
//fmt.Printf("test-case name:%s, ret:%v testCase.reviewResponse:%v \n", testCase.Name, ret,testCase.reviewResponse)
if testCase.ExpectErr == true && ret == "" {
t.Errorf("Expect error msg :%s, but got nil.", testCase.ret)
Expand Down
Loading

0 comments on commit f364d60

Please sign in to comment.