Skip to content

Commit

Permalink
Merge pull request #446 from sivanzcw/develop
Browse files Browse the repository at this point in the history
Register healthz interface for controller and scheduler
  • Loading branch information
volcano-sh-bot authored Sep 12, 2019
2 parents 73d3a7b + 7248dec commit 51132ff
Show file tree
Hide file tree
Showing 13 changed files with 670 additions and 13 deletions.
7 changes: 6 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
defaultBurst = 100
defaultWorkers = 3
defaultSchedulerName = "volcano"

defaultHealthzBindAddress = "127.0.0.1:11252"
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -42,6 +44,9 @@ type ServerOption struct {
// concurrently. Larger number = faster job updating, but more CPU load.
WorkerThreads uint32
SchedulerName string
// HealthzBindAddress is the IP address and port for the health check server to serve on,
// defaulting to 127.0.0.1:11252
HealthzBindAddress string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -63,6 +68,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.")
}

// CheckOptionOrDie checks the LockObjectNamespace
Expand Down
13 changes: 7 additions & 6 deletions cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func TestAddFlags(t *testing.T) {

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerName: defaultSchedulerName,
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerName: defaultSchedulerName,
HealthzBindAddress: "127.0.0.1:11252",
}

if !reflect.DeepEqual(expected, s) {
Expand Down
5 changes: 5 additions & 0 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/client-go/tools/record"

"volcano.sh/volcano/cmd/controllers/app/options"
"volcano.sh/volcano/pkg/apis/helpers"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
"volcano.sh/volcano/pkg/controllers/job"
Expand Down Expand Up @@ -80,6 +81,10 @@ func Run(opt *options.ServerOption) error {
return err
}

if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller"); err != nil {
return err
}

run := startControllers(config, opt)

if !opt.EnableLeaderElection {
Expand Down
6 changes: 6 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
defaultQueue = "default"
defaultListenAddress = ":8080"

defaultHealthzBindAddress = "127.0.0.1:11251"

defaultQPS = 50.0
defaultBurst = 100
)
Expand All @@ -48,6 +50,9 @@ type ServerOption struct {
EnablePriorityClass bool
KubeAPIBurst int
KubeAPIQPS float32
// HealthzBindAddress is the IP address and port for the health check server to serve on,
// defaulting to 127.0.0.1:11251
HealthzBindAddress string
}

// ServerOpts server options
Expand Down Expand Up @@ -78,6 +83,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.")
}

// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled
Expand Down
13 changes: 7 additions & 6 deletions cmd/scheduler/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ func TestAddFlags(t *testing.T) {

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
SchedulerName: defaultSchedulerName,
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
KubeAPIBurst: defaultBurst,
KubeAPIQPS: defaultQPS,
SchedulerName: defaultSchedulerName,
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
KubeAPIBurst: defaultBurst,
KubeAPIQPS: defaultQPS,
HealthzBindAddress: "127.0.0.1:11251",
}

if !reflect.DeepEqual(expected, s) {
Expand Down
6 changes: 6 additions & 0 deletions cmd/scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/apis/helpers"
"volcano.sh/volcano/pkg/scheduler"
"volcano.sh/volcano/pkg/version"

Expand Down Expand Up @@ -96,6 +98,10 @@ func Run(opt *options.ServerOption) error {
glog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()

if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-scheduler"); err != nil {
return err
}

run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
Expand Down
80 changes: 80 additions & 0 deletions pkg/apis/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ limitations under the License.
package helpers

import (
"context"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/golang/glog"

"k8s.io/api/core/v1"
Expand All @@ -25,6 +34,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/client-go/kubernetes"

vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
Expand Down Expand Up @@ -148,3 +160,71 @@ func GeneratePodgroupName(pod *v1.Pod) string {

return pgName
}

// StartHealthz register healthz interface
func StartHealthz(healthzBindAddress, name string) error {
listener, err := net.Listen("tcp", healthzBindAddress)
if err != nil {
return fmt.Errorf("failed to create listener: %v", err)
}

pathRecorderMux := mux.NewPathRecorderMux(name)
healthz.InstallHandler(pathRecorderMux)

server := &http.Server{
Addr: listener.Addr().String(),
Handler: pathRecorderMux,
MaxHeaderBytes: 1 << 20,
}

return runServer(server, listener)
}

func runServer(server *http.Server, ln net.Listener) error {
if ln == nil || server == nil {
return fmt.Errorf("listener and server must not be nil")
}

stopCh := make(chan os.Signal)
signal.Notify(stopCh, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), 0)
server.Shutdown(ctx)
cancel()
}()

go func() {
defer utilruntime.HandleCrash()

var listener net.Listener
listener = tcpKeepAliveListener{ln.(*net.TCPListener)}

err := server.Serve(listener)
msg := fmt.Sprintf("Stopped listening on %s", listener.Addr().String())
select {
case <-stopCh:
glog.Info(msg)
default:
glog.Fatalf("%s due to error: %v", msg, err)
}
}()

return nil
}

type tcpKeepAliveListener struct {
*net.TCPListener
}

// Accept waits for and returns the next connection to the listener.
func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
tc, err := ln.AcceptTCP()
if err != nil {
return nil, err
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(3 * time.Minute)
return tc, nil
}
21 changes: 21 additions & 0 deletions vendor/k8s.io/apiserver/pkg/server/healthz/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 51132ff

Please sign in to comment.