From 7248dec3c78c265818a90f7a7851b2748295ccfb Mon Sep 17 00:00:00 2001 From: Zhang Jinghui Date: Mon, 9 Sep 2019 19:13:14 +0800 Subject: [PATCH] register healthz interface --- Gopkg.lock | 7 +- cmd/controllers/app/options/options.go | 6 + cmd/controllers/app/options/options_test.go | 13 +- cmd/controllers/app/server.go | 5 + cmd/scheduler/app/options/options.go | 6 + cmd/scheduler/app/options/options_test.go | 13 +- cmd/scheduler/app/server.go | 6 + pkg/apis/helpers/helpers.go | 80 +++++ .../apiserver/pkg/server/healthz/doc.go | 21 ++ .../apiserver/pkg/server/healthz/healthz.go | 228 ++++++++++++++ vendor/k8s.io/apiserver/pkg/server/mux/OWNERS | 2 + vendor/k8s.io/apiserver/pkg/server/mux/doc.go | 18 ++ .../apiserver/pkg/server/mux/pathrecorder.go | 278 ++++++++++++++++++ 13 files changed, 670 insertions(+), 13 deletions(-) create mode 100644 vendor/k8s.io/apiserver/pkg/server/healthz/doc.go create mode 100644 vendor/k8s.io/apiserver/pkg/server/healthz/healthz.go create mode 100755 vendor/k8s.io/apiserver/pkg/server/mux/OWNERS create mode 100644 vendor/k8s.io/apiserver/pkg/server/mux/doc.go create mode 100644 vendor/k8s.io/apiserver/pkg/server/mux/pathrecorder.go diff --git a/Gopkg.lock b/Gopkg.lock index 98b764da6f..c8127ee1d3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -663,13 +663,15 @@ version = "kubernetes-1.13.5" [[projects]] - digest = "1:0028163a4c5fa7a710b24c537514ac27b9dcd37c1ec440cdfa82ec859be98223" + digest = "1:5e49d3702125aa421bbd705a620fa7f5117589d06642ed29243f830154be1e60" name = "k8s.io/apiserver" packages = [ "pkg/authentication/authenticator", "pkg/authentication/serviceaccount", "pkg/authentication/user", "pkg/features", + "pkg/server/healthz", + "pkg/server/mux", "pkg/util/feature", "pkg/util/flag", ] @@ -1052,6 +1054,7 @@ "k8s.io/apimachinery/pkg/api/meta", "k8s.io/apimachinery/pkg/api/resource", "k8s.io/apimachinery/pkg/apis/meta/v1", + "k8s.io/apimachinery/pkg/conversion", "k8s.io/apimachinery/pkg/labels", "k8s.io/apimachinery/pkg/runtime", "k8s.io/apimachinery/pkg/runtime/schema", @@ -1065,6 +1068,8 @@ "k8s.io/apimachinery/pkg/util/validation/field", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/watch", + "k8s.io/apiserver/pkg/server/healthz", + "k8s.io/apiserver/pkg/server/mux", "k8s.io/apiserver/pkg/util/flag", "k8s.io/client-go/discovery", "k8s.io/client-go/discovery/fake", diff --git a/cmd/controllers/app/options/options.go b/cmd/controllers/app/options/options.go index b37ef94dd2..698c67de2d 100644 --- a/cmd/controllers/app/options/options.go +++ b/cmd/controllers/app/options/options.go @@ -27,6 +27,8 @@ const ( defaultBurst = 100 defaultWorkers = 3 defaultSchedulerName = "volcano" + + defaultHealthzBindAddress = "127.0.0.1:11252" ) // ServerOption is the main context object for the controller manager. @@ -42,6 +44,9 @@ type ServerOption struct { // concurrently. Larger number = faster job updating, but more CPU load. WorkerThreads uint32 SchedulerName string + // HealthzBindAddress is the IP address and port for the health check server to serve on, + // defaulting to 127.0.0.1:11252 + HealthzBindAddress string } // NewServerOption creates a new CMServer with a default config. @@ -63,6 +68,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+ "Larger number = faster job updating, but more CPU load") fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name") + fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.") } // CheckOptionOrDie checks the LockObjectNamespace diff --git a/cmd/controllers/app/options/options_test.go b/cmd/controllers/app/options/options_test.go index 1726969523..94abb404b5 100644 --- a/cmd/controllers/app/options/options_test.go +++ b/cmd/controllers/app/options/options_test.go @@ -36,12 +36,13 @@ func TestAddFlags(t *testing.T) { // This is a snapshot of expected options parsed by args. expected := &ServerOption{ - Master: "127.0.0.1", - KubeAPIQPS: defaultQPS, - KubeAPIBurst: 200, - PrintVersion: false, - WorkerThreads: defaultWorkers, - SchedulerName: defaultSchedulerName, + Master: "127.0.0.1", + KubeAPIQPS: defaultQPS, + KubeAPIBurst: 200, + PrintVersion: false, + WorkerThreads: defaultWorkers, + SchedulerName: defaultSchedulerName, + HealthzBindAddress: "127.0.0.1:11252", } if !reflect.DeepEqual(expected, s) { diff --git a/cmd/controllers/app/server.go b/cmd/controllers/app/server.go index 565bebaeed..df1e083246 100644 --- a/cmd/controllers/app/server.go +++ b/cmd/controllers/app/server.go @@ -40,6 +40,7 @@ import ( "k8s.io/client-go/tools/record" "volcano.sh/volcano/cmd/controllers/app/options" + "volcano.sh/volcano/pkg/apis/helpers" vkclient "volcano.sh/volcano/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/controllers/garbagecollector" "volcano.sh/volcano/pkg/controllers/job" @@ -80,6 +81,10 @@ func Run(opt *options.ServerOption) error { return err } + if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller"); err != nil { + return err + } + run := startControllers(config, opt) if !opt.EnableLeaderElection { diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index 85f9e92ffc..2092e0c52e 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -29,6 +29,8 @@ const ( defaultQueue = "default" defaultListenAddress = ":8080" + defaultHealthzBindAddress = "127.0.0.1:11251" + defaultQPS = 50.0 defaultBurst = 100 ) @@ -48,6 +50,9 @@ type ServerOption struct { EnablePriorityClass bool KubeAPIBurst int KubeAPIQPS float32 + // HealthzBindAddress is the IP address and port for the health check server to serve on, + // defaulting to 127.0.0.1:11251 + HealthzBindAddress string } // ServerOpts server options @@ -78,6 +83,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { "Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false") fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") + fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.") } // CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled diff --git a/cmd/scheduler/app/options/options_test.go b/cmd/scheduler/app/options/options_test.go index 92c570a204..490a4a64d7 100644 --- a/cmd/scheduler/app/options/options_test.go +++ b/cmd/scheduler/app/options/options_test.go @@ -37,12 +37,13 @@ func TestAddFlags(t *testing.T) { // This is a snapshot of expected options parsed by args. expected := &ServerOption{ - SchedulerName: defaultSchedulerName, - SchedulePeriod: 5 * time.Minute, - DefaultQueue: defaultQueue, - ListenAddress: defaultListenAddress, - KubeAPIBurst: defaultBurst, - KubeAPIQPS: defaultQPS, + SchedulerName: defaultSchedulerName, + SchedulePeriod: 5 * time.Minute, + DefaultQueue: defaultQueue, + ListenAddress: defaultListenAddress, + KubeAPIBurst: defaultBurst, + KubeAPIQPS: defaultQPS, + HealthzBindAddress: "127.0.0.1:11251", } if !reflect.DeepEqual(expected, s) { diff --git a/cmd/scheduler/app/server.go b/cmd/scheduler/app/server.go index 698ca03d06..d7c1784d7e 100644 --- a/cmd/scheduler/app/server.go +++ b/cmd/scheduler/app/server.go @@ -25,7 +25,9 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus/promhttp" + "volcano.sh/volcano/cmd/scheduler/app/options" + "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/scheduler" "volcano.sh/volcano/pkg/version" @@ -96,6 +98,10 @@ func Run(opt *options.ServerOption) error { glog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil)) }() + if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-scheduler"); err != nil { + return err + } + run := func(ctx context.Context) { sched.Run(ctx.Done()) <-ctx.Done() diff --git a/pkg/apis/helpers/helpers.go b/pkg/apis/helpers/helpers.go index c6c137d270..95f2afa68a 100644 --- a/pkg/apis/helpers/helpers.go +++ b/pkg/apis/helpers/helpers.go @@ -17,6 +17,15 @@ limitations under the License. package helpers import ( + "context" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "time" + "github.com/golang/glog" "k8s.io/api/core/v1" @@ -25,6 +34,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/server/healthz" + "k8s.io/apiserver/pkg/server/mux" "k8s.io/client-go/kubernetes" vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" @@ -148,3 +160,71 @@ func GeneratePodgroupName(pod *v1.Pod) string { return pgName } + +// StartHealthz register healthz interface +func StartHealthz(healthzBindAddress, name string) error { + listener, err := net.Listen("tcp", healthzBindAddress) + if err != nil { + return fmt.Errorf("failed to create listener: %v", err) + } + + pathRecorderMux := mux.NewPathRecorderMux(name) + healthz.InstallHandler(pathRecorderMux) + + server := &http.Server{ + Addr: listener.Addr().String(), + Handler: pathRecorderMux, + MaxHeaderBytes: 1 << 20, + } + + return runServer(server, listener) +} + +func runServer(server *http.Server, ln net.Listener) error { + if ln == nil || server == nil { + return fmt.Errorf("listener and server must not be nil") + } + + stopCh := make(chan os.Signal) + signal.Notify(stopCh, syscall.SIGTERM, syscall.SIGINT) + + go func() { + <-stopCh + ctx, cancel := context.WithTimeout(context.Background(), 0) + server.Shutdown(ctx) + cancel() + }() + + go func() { + defer utilruntime.HandleCrash() + + var listener net.Listener + listener = tcpKeepAliveListener{ln.(*net.TCPListener)} + + err := server.Serve(listener) + msg := fmt.Sprintf("Stopped listening on %s", listener.Addr().String()) + select { + case <-stopCh: + glog.Info(msg) + default: + glog.Fatalf("%s due to error: %v", msg, err) + } + }() + + return nil +} + +type tcpKeepAliveListener struct { + *net.TCPListener +} + +// Accept waits for and returns the next connection to the listener. +func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { + tc, err := ln.AcceptTCP() + if err != nil { + return nil, err + } + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(3 * time.Minute) + return tc, nil +} diff --git a/vendor/k8s.io/apiserver/pkg/server/healthz/doc.go b/vendor/k8s.io/apiserver/pkg/server/healthz/doc.go new file mode 100644 index 0000000000..06e67f6fe3 --- /dev/null +++ b/vendor/k8s.io/apiserver/pkg/server/healthz/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package healthz implements basic http server health checking. +// Usage: +// import "k8s.io/apiserver/pkg/server/healthz" +// healthz.DefaultHealthz() +package healthz // import "k8s.io/apiserver/pkg/server/healthz" diff --git a/vendor/k8s.io/apiserver/pkg/server/healthz/healthz.go b/vendor/k8s.io/apiserver/pkg/server/healthz/healthz.go new file mode 100644 index 0000000000..17d85fbe63 --- /dev/null +++ b/vendor/k8s.io/apiserver/pkg/server/healthz/healthz.go @@ -0,0 +1,228 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthz + +import ( + "bytes" + "fmt" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "k8s.io/klog" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" +) + +// HealthzChecker is a named healthz checker. +type HealthzChecker interface { + Name() string + Check(req *http.Request) error +} + +var defaultHealthz = sync.Once{} + +// DefaultHealthz installs the default healthz check to the http.DefaultServeMux. +func DefaultHealthz(checks ...HealthzChecker) { + defaultHealthz.Do(func() { + InstallHandler(http.DefaultServeMux, checks...) + }) +} + +// PingHealthz returns true automatically when checked +var PingHealthz HealthzChecker = ping{} + +// ping implements the simplest possible healthz checker. +type ping struct{} + +func (ping) Name() string { + return "ping" +} + +// PingHealthz is a health check that returns true. +func (ping) Check(_ *http.Request) error { + return nil +} + +// LogHealthz returns true if logging is not blocked +var LogHealthz HealthzChecker = &log{} + +type log struct { + startOnce sync.Once + lastVerified atomic.Value +} + +func (l *log) Name() string { + return "log" +} + +func (l *log) Check(_ *http.Request) error { + l.startOnce.Do(func() { + l.lastVerified.Store(time.Now()) + go wait.Forever(func() { + klog.Flush() + l.lastVerified.Store(time.Now()) + }, time.Minute) + }) + + lastVerified := l.lastVerified.Load().(time.Time) + if time.Since(lastVerified) < (2 * time.Minute) { + return nil + } + return fmt.Errorf("logging blocked") +} + +// NamedCheck returns a healthz checker for the given name and function. +func NamedCheck(name string, check func(r *http.Request) error) HealthzChecker { + return &healthzCheck{name, check} +} + +// InstallHandler registers handlers for health checking on the path +// "/healthz" to mux. *All handlers* for mux must be specified in +// exactly one call to InstallHandler. Calling InstallHandler more +// than once for the same mux will result in a panic. +func InstallHandler(mux mux, checks ...HealthzChecker) { + InstallPathHandler(mux, "/healthz", checks...) +} + +// InstallPathHandler registers handlers for health checking on +// a specific path to mux. *All handlers* for the path must be +// specified in exactly one call to InstallPathHandler. Calling +// InstallPathHandler more than once for the same path and mux will +// result in a panic. +func InstallPathHandler(mux mux, path string, checks ...HealthzChecker) { + if len(checks) == 0 { + klog.V(5).Info("No default health checks specified. Installing the ping handler.") + checks = []HealthzChecker{PingHealthz} + } + + klog.V(5).Info("Installing healthz checkers:", formatQuoted(checkerNames(checks...)...)) + + mux.Handle(path, handleRootHealthz(checks...)) + for _, check := range checks { + mux.Handle(fmt.Sprintf("%s/%v", path, check.Name()), adaptCheckToHandler(check.Check)) + } +} + +// mux is an interface describing the methods InstallHandler requires. +type mux interface { + Handle(pattern string, handler http.Handler) +} + +// healthzCheck implements HealthzChecker on an arbitrary name and check function. +type healthzCheck struct { + name string + check func(r *http.Request) error +} + +var _ HealthzChecker = &healthzCheck{} + +func (c *healthzCheck) Name() string { + return c.name +} + +func (c *healthzCheck) Check(r *http.Request) error { + return c.check(r) +} + +// getExcludedChecks extracts the health check names to be excluded from the query param +func getExcludedChecks(r *http.Request) sets.String { + checks, found := r.URL.Query()["exclude"] + if found { + return sets.NewString(checks...) + } + return sets.NewString() +} + +// handleRootHealthz returns an http.HandlerFunc that serves the provided checks. +func handleRootHealthz(checks ...HealthzChecker) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + failed := false + excluded := getExcludedChecks(r) + var verboseOut bytes.Buffer + for _, check := range checks { + // no-op the check if we've specified we want to exclude the check + if excluded.Has(check.Name()) { + excluded.Delete(check.Name()) + fmt.Fprintf(&verboseOut, "[+]%v excluded: ok\n", check.Name()) + continue + } + if err := check.Check(r); err != nil { + // don't include the error since this endpoint is public. If someone wants more detail + // they should have explicit permission to the detailed checks. + klog.V(6).Infof("healthz check %v failed: %v", check.Name(), err) + fmt.Fprintf(&verboseOut, "[-]%v failed: reason withheld\n", check.Name()) + failed = true + } else { + fmt.Fprintf(&verboseOut, "[+]%v ok\n", check.Name()) + } + } + if excluded.Len() > 0 { + fmt.Fprintf(&verboseOut, "warn: some health checks cannot be excluded: no matches for %v\n", formatQuoted(excluded.List()...)) + klog.Warningf("cannot exclude some health checks, no health checks are installed matching %v", + formatQuoted(excluded.List()...)) + } + // always be verbose on failure + if failed { + http.Error(w, fmt.Sprintf("%vhealthz check failed", verboseOut.String()), http.StatusInternalServerError) + return + } + + if _, found := r.URL.Query()["verbose"]; !found { + fmt.Fprint(w, "ok") + return + } + + verboseOut.WriteTo(w) + fmt.Fprint(w, "healthz check passed\n") + }) +} + +// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks. +func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := c(r) + if err != nil { + http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError) + } else { + fmt.Fprint(w, "ok") + } + }) +} + +// checkerNames returns the names of the checks in the same order as passed in. +func checkerNames(checks ...HealthzChecker) []string { + // accumulate the names of checks for printing them out. + checkerNames := make([]string, 0, len(checks)) + for _, check := range checks { + checkerNames = append(checkerNames, check.Name()) + } + return checkerNames +} + +// formatQuoted returns a formatted string of the health check names, +// preserving the order passed in. +func formatQuoted(names ...string) string { + quoted := make([]string, 0, len(names)) + for _, name := range names { + quoted = append(quoted, fmt.Sprintf("%q", name)) + } + return strings.Join(quoted, ",") +} diff --git a/vendor/k8s.io/apiserver/pkg/server/mux/OWNERS b/vendor/k8s.io/apiserver/pkg/server/mux/OWNERS new file mode 100755 index 0000000000..9d268c4d14 --- /dev/null +++ b/vendor/k8s.io/apiserver/pkg/server/mux/OWNERS @@ -0,0 +1,2 @@ +reviewers: +- sttts diff --git a/vendor/k8s.io/apiserver/pkg/server/mux/doc.go b/vendor/k8s.io/apiserver/pkg/server/mux/doc.go new file mode 100644 index 0000000000..da9fb8ed7f --- /dev/null +++ b/vendor/k8s.io/apiserver/pkg/server/mux/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package mux contains abstractions for http multiplexing of APIs. +package mux diff --git a/vendor/k8s.io/apiserver/pkg/server/mux/pathrecorder.go b/vendor/k8s.io/apiserver/pkg/server/mux/pathrecorder.go new file mode 100644 index 0000000000..16857cc8a6 --- /dev/null +++ b/vendor/k8s.io/apiserver/pkg/server/mux/pathrecorder.go @@ -0,0 +1,278 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mux + +import ( + "fmt" + "net/http" + "runtime/debug" + "sort" + "strings" + "sync" + "sync/atomic" + + "k8s.io/klog" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" +) + +// PathRecorderMux wraps a mux object and records the registered exposedPaths. +type PathRecorderMux struct { + // name is used for logging so you can trace requests through + name string + + lock sync.Mutex + notFoundHandler http.Handler + pathToHandler map[string]http.Handler + prefixToHandler map[string]http.Handler + + // mux stores a pathHandler and is used to handle the actual serving. + // Turns out, we want to accept trailing slashes, BUT we don't care about handling + // everything under them. This does exactly matches only unless its explicitly requested to + // do something different + mux atomic.Value + + // exposedPaths is the list of paths that should be shown at / + exposedPaths []string + + // pathStacks holds the stacks of all registered paths. This allows us to show a more helpful message + // before the "http: multiple registrations for %s" panic. + pathStacks map[string]string +} + +// pathHandler is an http.Handler that will satify requests first by exact match, then by prefix, +// then by notFoundHandler +type pathHandler struct { + // muxName is used for logging so you can trace requests through + muxName string + + // pathToHandler is a map of exactly matching request to its handler + pathToHandler map[string]http.Handler + + // this has to be sorted by most slashes then by length + prefixHandlers []prefixHandler + + // notFoundHandler is the handler to use for satisfying requests with no other match + notFoundHandler http.Handler +} + +// prefixHandler holds the prefix it should match and the handler to use +type prefixHandler struct { + // prefix is the prefix to test for a request match + prefix string + // handler is used to satisfy matching requests + handler http.Handler +} + +// NewPathRecorderMux creates a new PathRecorderMux +func NewPathRecorderMux(name string) *PathRecorderMux { + ret := &PathRecorderMux{ + name: name, + pathToHandler: map[string]http.Handler{}, + prefixToHandler: map[string]http.Handler{}, + mux: atomic.Value{}, + exposedPaths: []string{}, + pathStacks: map[string]string{}, + } + + ret.mux.Store(&pathHandler{notFoundHandler: http.NotFoundHandler()}) + return ret +} + +// ListedPaths returns the registered handler exposedPaths. +func (m *PathRecorderMux) ListedPaths() []string { + handledPaths := append([]string{}, m.exposedPaths...) + sort.Strings(handledPaths) + + return handledPaths +} + +func (m *PathRecorderMux) trackCallers(path string) { + if existingStack, ok := m.pathStacks[path]; ok { + utilruntime.HandleError(fmt.Errorf("registered %q from %v", path, existingStack)) + } + m.pathStacks[path] = string(debug.Stack()) +} + +// refreshMuxLocked creates a new mux and must be called while locked. Otherwise the view of handlers may +// not be consistent +func (m *PathRecorderMux) refreshMuxLocked() { + newMux := &pathHandler{ + muxName: m.name, + pathToHandler: map[string]http.Handler{}, + prefixHandlers: []prefixHandler{}, + notFoundHandler: http.NotFoundHandler(), + } + if m.notFoundHandler != nil { + newMux.notFoundHandler = m.notFoundHandler + } + for path, handler := range m.pathToHandler { + newMux.pathToHandler[path] = handler + } + + keys := sets.StringKeySet(m.prefixToHandler).List() + sort.Sort(sort.Reverse(byPrefixPriority(keys))) + for _, prefix := range keys { + newMux.prefixHandlers = append(newMux.prefixHandlers, prefixHandler{ + prefix: prefix, + handler: m.prefixToHandler[prefix], + }) + } + + m.mux.Store(newMux) +} + +// NotFoundHandler sets the handler to use if there's no match for a give path +func (m *PathRecorderMux) NotFoundHandler(notFoundHandler http.Handler) { + m.lock.Lock() + defer m.lock.Unlock() + + m.notFoundHandler = notFoundHandler + + m.refreshMuxLocked() +} + +// Unregister removes a path from the mux. +func (m *PathRecorderMux) Unregister(path string) { + m.lock.Lock() + defer m.lock.Unlock() + + delete(m.pathToHandler, path) + delete(m.prefixToHandler, path) + delete(m.pathStacks, path) + for i := range m.exposedPaths { + if m.exposedPaths[i] == path { + m.exposedPaths = append(m.exposedPaths[:i], m.exposedPaths[i+1:]...) + break + } + } + + m.refreshMuxLocked() +} + +// Handle registers the handler for the given pattern. +// If a handler already exists for pattern, Handle panics. +func (m *PathRecorderMux) Handle(path string, handler http.Handler) { + m.lock.Lock() + defer m.lock.Unlock() + m.trackCallers(path) + + m.exposedPaths = append(m.exposedPaths, path) + m.pathToHandler[path] = handler + m.refreshMuxLocked() +} + +// HandleFunc registers the handler function for the given pattern. +// If a handler already exists for pattern, Handle panics. +func (m *PathRecorderMux) HandleFunc(path string, handler func(http.ResponseWriter, *http.Request)) { + m.Handle(path, http.HandlerFunc(handler)) +} + +// UnlistedHandle registers the handler for the given pattern, but doesn't list it. +// If a handler already exists for pattern, Handle panics. +func (m *PathRecorderMux) UnlistedHandle(path string, handler http.Handler) { + m.lock.Lock() + defer m.lock.Unlock() + m.trackCallers(path) + + m.pathToHandler[path] = handler + m.refreshMuxLocked() +} + +// UnlistedHandleFunc registers the handler function for the given pattern, but doesn't list it. +// If a handler already exists for pattern, Handle panics. +func (m *PathRecorderMux) UnlistedHandleFunc(path string, handler func(http.ResponseWriter, *http.Request)) { + m.UnlistedHandle(path, http.HandlerFunc(handler)) +} + +// HandlePrefix is like Handle, but matches for anything under the path. Like a standard golang trailing slash. +func (m *PathRecorderMux) HandlePrefix(path string, handler http.Handler) { + if !strings.HasSuffix(path, "/") { + panic(fmt.Sprintf("%q must end in a trailing slash", path)) + } + + m.lock.Lock() + defer m.lock.Unlock() + m.trackCallers(path) + + m.exposedPaths = append(m.exposedPaths, path) + m.prefixToHandler[path] = handler + m.refreshMuxLocked() +} + +// UnlistedHandlePrefix is like UnlistedHandle, but matches for anything under the path. Like a standard golang trailing slash. +func (m *PathRecorderMux) UnlistedHandlePrefix(path string, handler http.Handler) { + if !strings.HasSuffix(path, "/") { + panic(fmt.Sprintf("%q must end in a trailing slash", path)) + } + + m.lock.Lock() + defer m.lock.Unlock() + m.trackCallers(path) + + m.prefixToHandler[path] = handler + m.refreshMuxLocked() +} + +// ServeHTTP makes it an http.Handler +func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { + m.mux.Load().(*pathHandler).ServeHTTP(w, r) +} + +// ServeHTTP makes it an http.Handler +func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok { + klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path) + exactHandler.ServeHTTP(w, r) + return + } + + for _, prefixHandler := range h.prefixHandlers { + if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) { + klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix) + prefixHandler.handler.ServeHTTP(w, r) + return + } + } + + klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path) + h.notFoundHandler.ServeHTTP(w, r) +} + +// byPrefixPriority sorts url prefixes by the order in which they should be tested by the mux +// this has to be sorted by most slashes then by length so that we can iterate straight +// through to match the "best" one first. +type byPrefixPriority []string + +func (s byPrefixPriority) Len() int { return len(s) } +func (s byPrefixPriority) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s byPrefixPriority) Less(i, j int) bool { + lhsNumParts := strings.Count(s[i], "/") + rhsNumParts := strings.Count(s[j], "/") + if lhsNumParts != rhsNumParts { + return lhsNumParts < rhsNumParts + } + + lhsLen := len(s[i]) + rhsLen := len(s[j]) + if lhsLen != rhsLen { + return lhsLen < rhsLen + } + + return strings.Compare(s[i], s[j]) < 0 +}