Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register webhook in codes #332

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 144 additions & 72 deletions cmd/admission/app/configure/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ limitations under the License.
package configure

import (
"encoding/json"
"flag"
"fmt"
"github.com/golang/glog"

"k8s.io/api/admissionregistration/v1beta1"
"k8s.io/client-go/kubernetes"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
admissionregistrationv1beta1client "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1"
)

Expand All @@ -42,6 +42,8 @@ type Config struct {
ValidateWebhookConfigName string
ValidateWebhookName string
PrintVersion bool
AdmissionServiceName string
AdmissionServiceNamespace string
}

// NewConfig create new config
Expand All @@ -60,17 +62,30 @@ func (c *Config) AddFlags() {
flag.StringVar(&c.KeyFile, "tls-private-key-file", c.KeyFile, "File containing the default x509 private key matching --tls-cert-file.")
flag.StringVar(&c.CaCertFile, "ca-cert-file", c.CaCertFile, "File containing the x509 Certificate for HTTPS.")
flag.IntVar(&c.Port, "port", 443, "the port used by admission-controller-server.")
flag.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "volcano-mutate-job",
"Name of the mutatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "mutatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "volcano-validate-job",
"Name of the mutatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "validatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "",
hzxuzhonghu marked this conversation as resolved.
Show resolved Hide resolved
"Name of the mutatingwebhookconfiguration resource in Kubernetes [Deprecated]: it will be generated when not specified.")
flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "",
"Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified")
flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "",
"Name of the mutatingwebhookconfiguration resource in Kubernetes. [Deprecated]: it will be generated when not specified")
flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "",
"Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified")
flag.BoolVar(&c.PrintVersion, "version", false, "Show version and quit")
flag.StringVar(&c.AdmissionServiceNamespace, "webhook-namespace", "default", "The namespace of this webhook")
flag.StringVar(&c.AdmissionServiceName, "webhook-service-name", "admission-service", "The name of this admission service")
}

const (
// ValidateConfigName ValidatingWebhookConfiguration name format
ValidateConfigName = "%s-validate-job"
// MutateConfigName MutatingWebhookConfiguration name format
MutateConfigName = "%s-mutate-job"
// ValidateHookName Default name for webhooks in ValidatingWebhookConfiguration
ValidateHookName = "validatejob.volcano.sh"
// MutateHookName Default name for webhooks in MutatingWebhookConfiguration
MutateHookName = "mutatejob.volcano.sh"
)

// CheckPortOrDie check valid port range
func (c *Config) CheckPortOrDie() error {
if c.Port < 1 || c.Port > 65535 {
Expand All @@ -79,78 +94,135 @@ func (c *Config) CheckPortOrDie() error {
return nil
}

// PatchMutateWebhookConfig patches a CA bundle into the specified webhook config.
func PatchMutateWebhookConfig(client admissionregistrationv1beta1client.MutatingWebhookConfigurationInterface,
webhookConfigName, webhookName string, caBundle []byte) error {
config, err := client.Get(webhookConfigName, metav1.GetOptions{})
if err != nil {
return err
}
prev, err := json.Marshal(config)
if err != nil {
return err
}
found := false
for i, w := range config.Webhooks {
if w.Name == webhookName {
config.Webhooks[i].ClientConfig.CABundle = caBundle[:]
found = true
break
}
}
if !found {
return apierrors.NewInternalError(fmt.Errorf(
"webhook entry %q not found in config %q", webhookName, webhookConfigName))
func useGeneratedNameIfRequired(configured, generated string) string {
if configured != "" {
return configured
}
curr, err := json.Marshal(config)
if err != nil {
return err
return generated
}

// RegisterWebhooks register webhooks for admission service
func RegisterWebhooks(c *Config, clienset *kubernetes.Clientset, cabundle []byte) error {
ignorePolicy := v1beta1.Ignore

//Prepare validate webhooks
path := "/jobs"
JobValidateHooks := v1beta1.ValidatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: useGeneratedNameIfRequired(c.ValidateWebhookConfigName,
fmt.Sprintf(ValidateConfigName, c.AdmissionServiceName)),
},
Webhooks: []v1beta1.Webhook{{
Name: useGeneratedNameIfRequired(c.ValidateWebhookName, ValidateHookName),
Rules: []v1beta1.RuleWithOperations{
{
Operations: []v1beta1.OperationType{v1beta1.Create, v1beta1.Update},
Rule: v1beta1.Rule{
APIGroups: []string{"batch.volcano.sh"},
APIVersions: []string{"v1alpha1"},
Resources: []string{"jobs"},
},
},
},
ClientConfig: v1beta1.WebhookClientConfig{
Service: &v1beta1.ServiceReference{
Name: c.AdmissionServiceName,
Namespace: c.AdmissionServiceNamespace,
Path: &path,
},
CABundle: cabundle,
},
FailurePolicy: &ignorePolicy,
}},
}
patch, err := strategicpatch.CreateTwoWayMergePatch(prev, curr, v1beta1.MutatingWebhookConfiguration{})
if err != nil {

if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
[]v1beta1.ValidatingWebhookConfiguration{JobValidateHooks}); err != nil {
return err
}

if string(patch) != "{}" {
_, err = client.Patch(webhookConfigName, types.StrategicMergePatchType, patch)
//Prepare mutate jobs
path = "/mutating-jobs"
JobMutateHooks := v1beta1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: useGeneratedNameIfRequired(c.MutateWebhookConfigName,
fmt.Sprintf(MutateConfigName, c.AdmissionServiceName)),
},
Webhooks: []v1beta1.Webhook{{
Name: useGeneratedNameIfRequired(c.MutateWebhookName, MutateHookName),
Rules: []v1beta1.RuleWithOperations{
{
Operations: []v1beta1.OperationType{v1beta1.Create},
Rule: v1beta1.Rule{
APIGroups: []string{"batch.volcano.sh"},
APIVersions: []string{"v1alpha1"},
Resources: []string{"jobs"},
},
},
},
ClientConfig: v1beta1.WebhookClientConfig{
Service: &v1beta1.ServiceReference{
Name: c.AdmissionServiceName,
Namespace: c.AdmissionServiceNamespace,
Path: &path,
},
CABundle: cabundle,
},
FailurePolicy: &ignorePolicy,
}},
}
return err
}

// PatchValidateWebhookConfig patches a CA bundle into the specified webhook config.
func PatchValidateWebhookConfig(client admissionregistrationv1beta1client.ValidatingWebhookConfigurationInterface,
webhookConfigName, webhookName string, caBundle []byte) error {
config, err := client.Get(webhookConfigName, metav1.GetOptions{})
if err != nil {
return err
}
prev, err := json.Marshal(config)
if err != nil {
if err := registerMutateWebhook(clienset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(),
[]v1beta1.MutatingWebhookConfiguration{JobMutateHooks}); err != nil {
return err
}
found := false
for i, w := range config.Webhooks {
if w.Name == webhookName {
config.Webhooks[i].ClientConfig.CABundle = caBundle[:]
found = true
break

return nil

}

func registerMutateWebhook(client admissionregistrationv1beta1client.MutatingWebhookConfigurationInterface,
webhooks []v1beta1.MutatingWebhookConfiguration) error {
for _, hook := range webhooks {
existing, err := client.Get(hook.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if err == nil && existing != nil {
glog.Infof("Updating MutatingWebhookConfiguration %v", hook)
existing.Webhooks = hook.Webhooks
if _, err := client.Update(existing); err != nil {
return err
}
} else {
glog.Infof("Creating MutatingWebhookConfiguration %v", hook)
if _, err := client.Create(&hook); err != nil {
return err
}
}
}
if !found {
return apierrors.NewInternalError(fmt.Errorf(
"webhook entry %q not found in config %q", webhookName, webhookConfigName))
}
curr, err := json.Marshal(config)
if err != nil {
return err
}
patch, err := strategicpatch.CreateTwoWayMergePatch(prev, curr, v1beta1.ValidatingWebhookConfiguration{})
if err != nil {
return err
}
return nil
}

if string(patch) != "{}" {
_, err = client.Patch(webhookConfigName, types.StrategicMergePatchType, patch)
func registerValidateWebhook(client admissionregistrationv1beta1client.ValidatingWebhookConfigurationInterface,
webhooks []v1beta1.ValidatingWebhookConfiguration) error {
for _, hook := range webhooks {
existing, err := client.Get(hook.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if err == nil && existing != nil {
existing.Webhooks = hook.Webhooks
glog.Infof("Updating ValidatingWebhookConfiguration %v", hook)
if _, err := client.Update(existing); err != nil {
return err
}
} else {
glog.Infof("Creating ValidatingWebhookConfiguration %v", hook)
if _, err := client.Create(&hook); err != nil {
return err
}
}
}
return err
return nil
}
57 changes: 35 additions & 22 deletions cmd/admission/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ package main

import (
"flag"
"fmt"
"github.com/golang/glog"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"

"k8s.io/client-go/tools/clientcmd"

"volcano.sh/volcano/cmd/admission/app"
appConf "volcano.sh/volcano/cmd/admission/app/configure"
admissioncontroller "volcano.sh/volcano/pkg/admission"
"volcano.sh/volcano/pkg/version"

"k8s.io/client-go/tools/clientcmd"
)

func serveJobs(w http.ResponseWriter, r *http.Request) {
Expand All @@ -52,39 +54,50 @@ func main() {
http.HandleFunc(admissioncontroller.MutateJobPath, serveMutateJobs)

if err := config.CheckPortOrDie(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
glog.Fatalf("Configured port is invalid: %v\n", err)
TommyLike marked this conversation as resolved.
Show resolved Hide resolved
}
addr := ":" + strconv.Itoa(config.Port)

restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
glog.Fatalf("Unable to build k8s config: %v\n", err)
}

clientset := app.GetClient(restConfig)

admissioncontroller.KubeBatchClientSet = app.GetKubeBatchClient(restConfig)

caCertPem, err := ioutil.ReadFile(config.CaCertFile)
caBundle, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
} else {
// patch caBundle in webhook
if err = appConf.PatchMutateWebhookConfig(clientset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(),
config.MutateWebhookConfigName, config.MutateWebhookName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
if err = appConf.PatchValidateWebhookConfig(clientset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
config.ValidateWebhookConfigName, config.ValidateWebhookName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
glog.Fatalf("Unable to read cacert file: %v\n", err)
}

err = appConf.RegisterWebhooks(config, app.GetClient(restConfig), caBundle)
if err != nil {
glog.Fatalf("Unable to register webhook configs: %v\n", err)
}

stopChannel := make(chan os.Signal)
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)

server := &http.Server{
Addr: addr,
TLSConfig: app.ConfigTLS(config, restConfig),
}
server.ListenAndServeTLS("", "")
webhookServeError := make(chan struct{})
go func() {
err = server.ListenAndServeTLS("", "")
if err != nil && err != http.ErrServerClosed {
glog.Fatalf("ListenAndServeTLS for admission webhook failed: %v\n", err)
close(webhookServeError)
}
}()

select {
case <-stopChannel:
if err := server.Close(); err != nil {
glog.Fatalf("Close admission server failed: %v\n", err)
}
return
case <-webhookServeError:
return
}
}
Loading