Skip to content

Commit

Permalink
refactor: divide the yurthubServer into hubServer and proxyServert (o…
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch authored Mar 18, 2021
1 parent 09ce108 commit 3d2eba7
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 67 deletions.
13 changes: 9 additions & 4 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"fmt"
"net"
"net/url"
"strings"

Expand All @@ -15,8 +16,8 @@ import (
type YurtHubConfiguration struct {
LBMode string
RemoteServers []*url.URL
YurtHubHost string
YurtHubPort int
YurtHubServerAddr string
YurtHubProxyServerAddr string
GCFrequency int
CertMgrMode string
NodeName string
Expand All @@ -26,6 +27,7 @@ type YurtHubConfiguration struct {
MaxRequestInFlight int
JoinToken string
RootDir string
EnableProfiling bool
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand All @@ -35,11 +37,13 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
return nil, err
}

hubServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubPort)
proxyServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxyPort)
cfg := &YurtHubConfiguration{
LBMode: options.LBMode,
RemoteServers: us,
YurtHubHost: options.YurtHubHost,
YurtHubPort: options.YurtHubPort,
YurtHubServerAddr: hubServerAddr,
YurtHubProxyServerAddr: proxyServerAddr,
GCFrequency: options.GCFrequency,
CertMgrMode: options.CertMgrMode,
NodeName: options.NodeName,
Expand All @@ -49,6 +53,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
MaxRequestInFlight: options.MaxRequestInFlight,
JoinToken: options.JoinToken,
RootDir: options.RootDir,
EnableProfiling: options.EnableProfiling,
}

return cfg, nil
Expand Down
12 changes: 9 additions & 3 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
type YurtHubOptions struct {
ServerAddr string
YurtHubHost string
YurtHubPort int
YurtHubPort string
YurtHubProxyPort string
GCFrequency int
CertMgrMode string
NodeName string
Expand All @@ -25,13 +26,15 @@ type YurtHubOptions struct {
JoinToken string
RootDir string
Version bool
EnableProfiling bool
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
func NewYurtHubOptions() *YurtHubOptions {
o := &YurtHubOptions{
YurtHubHost: "127.0.0.1",
YurtHubPort: 10261,
YurtHubProxyPort: "10261",
YurtHubPort: "10267",
GCFrequency: 120,
CertMgrMode: "hubself",
LBMode: "rr",
Expand All @@ -40,6 +43,7 @@ func NewYurtHubOptions() *YurtHubOptions {
HeartbeatTimeoutSeconds: 2,
MaxRequestInFlight: 250,
RootDir: filepath.Join("/var/lib/", projectinfo.GetHubName()),
EnableProfiling: true,
}

return o
Expand Down Expand Up @@ -69,7 +73,8 @@ func ValidateOptions(options *YurtHubOptions) error {
// AddFlags returns flags for a specific yurthub by section name
func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.YurtHubHost, "bind-address", o.YurtHubHost, "the IP address on which to listen for the --serve-port port.")
fs.IntVar(&o.YurtHubPort, "serve-port", o.YurtHubPort, "the port on which to serve HTTP.")
fs.StringVar(&o.YurtHubPort, "serve-port", o.YurtHubPort, "the port on which to serve HTTP requests(like profiling, metrics) for hub agent.")
fs.StringVar(&o.YurtHubProxyPort, "proxy-port", o.YurtHubProxyPort, "the port on which to proxy HTTP requests to kube-apiserver")
fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver,the format is: \"server1,server2,...\"")
fs.StringVar(&o.CertMgrMode, "cert-mgr-mode", o.CertMgrMode, "the cert manager mode, kubelet: use certificates that belongs to kubelet, hubself: auto generate client cert for hub agent.")
fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).")
Expand All @@ -82,4 +87,5 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.")
fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).")
fs.BoolVar(&o.Version, "version", o.Version, "print the version information.")
fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling, "Enable profiling via web interface host:port/debug/pprof/")
}
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
klog.Infof("%d. new %s server and begin to serve, proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubServerAddr)
s := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler)
s.Run()
<-stopCh
Expand Down
4 changes: 2 additions & 2 deletions config/yaml-template/yurthub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ spec:
- name: pem-dir
mountPath: /var/lib/kubelet/pki
command:
- __project_prefix__hub
- __project_prefix__hub
- --v=2
- --server-addr=__server_addr__
- --node-name=$(NODE_NAME)
livenessProbe:
httpGet:
host: 127.0.0.1
path: /v1/healthz
port: 10261
port: 10267
initialDelaySeconds: 300
periodSeconds: 5
failureThreshold: 3
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurtctl/util/edgenode/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ spec:
httpGet:
host: 127.0.0.1
path: /v1/healthz
port: 10261
port: 10267
initialDelaySeconds: 300
periodSeconds: 5
failureThreshold: 3
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (

// Install adds the Profiling webservice to the given mux.
func Install(c *mux.Router) {
c.HandleFunc("/debug/pprof", redirectTo("/debug/pprof/"))
c.HandleFunc("/debug/pprof/", http.HandlerFunc(pprof.Index))
c.HandleFunc("/debug/pprof/profile", pprof.Profile)
c.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
c.HandleFunc("/debug/pprof/trace", pprof.Trace)
c.HandleFunc("/debug/pprof", redirectTo("/debug/pprof/"))
c.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
}

// redirectTo redirects request to a certain destination.
Expand Down
59 changes: 30 additions & 29 deletions pkg/yurthub/server/certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,43 @@ import (
"net/http"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces"
)

const (
tokenKey = "jointoken"
)

// updateToken update bootstrap token in the bootstrap-hub.conf file
// in order to update node certificate when both node certificate and
// old join token expires
func (s *yurtHubServer) updateToken(w http.ResponseWriter, r *http.Request) {
tokens := make(map[string]string)
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&tokens)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "could not decode tokens, %v", err)
return
}
// updateTokenHandler returns a http handler that update bootstrap token in the bootstrap-hub.conf file
// in order to update node certificate when both node certificate and old join token expires
func updateTokenHandler(certificateMgr interfaces.YurtCertificateManager) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tokens := make(map[string]string)
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&tokens)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "could not decode tokens, %v", err)
return
}

joinToken := tokens[tokenKey]
if len(joinToken) == 0 {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "no join token is set")
return
}
joinToken := tokens[tokenKey]
if len(joinToken) == 0 {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "no join token is set")
return
}

err = s.certificateMgr.Update(&config.YurtHubConfiguration{JoinToken: joinToken})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "could not update bootstrap token, %v", err)
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, "update bootstrap token successfully")
return
err = certificateMgr.Update(&config.YurtHubConfiguration{JoinToken: joinToken})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "could not update bootstrap token, %v", err)
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, "update bootstrap token successfully")
return
})
}
64 changes: 39 additions & 25 deletions pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,55 +31,69 @@ type Server interface {
Run()
}

// yutHubServer includes hubServer and proxyServer,
// and hubServer handles requests by hub agent itself, like profiling, metrics, healthz
// and proxyServer does not handle requests locally and proxy requests to kube-apiserver
type yurtHubServer struct {
mux *mux.Router
certificateMgr interfaces.YurtCertificateManager
proxyHandler http.Handler
cfg *config.YurtHubConfiguration
hubServer *http.Server
proxyServer *http.Server
}

// NewYurtHubServer creates a Server object
func NewYurtHubServer(cfg *config.YurtHubConfiguration,
certificateMgr interfaces.YurtCertificateManager,
proxyHandler http.Handler) Server {
return &yurtHubServer{
mux: mux.NewRouter(),
certificateMgr: certificateMgr,
proxyHandler: proxyHandler,
cfg: cfg,
hubMux := mux.NewRouter()
registerHandlers(hubMux, cfg, certificateMgr)
hubServer := &http.Server{
Addr: cfg.YurtHubServerAddr,
Handler: hubMux,
MaxHeaderBytes: 1 << 20,
}
}

func (s *yurtHubServer) Run() {
s.registerHandler()
proxyServer := &http.Server{
Addr: cfg.YurtHubProxyServerAddr,
Handler: proxyHandler,
MaxHeaderBytes: 1 << 20,
}

server := &http.Server{
Addr: fmt.Sprintf("%s:%d", s.cfg.YurtHubHost, s.cfg.YurtHubPort),
Handler: s.mux,
return &yurtHubServer{
hubServer: hubServer,
proxyServer: proxyServer,
}
}

err := server.ListenAndServe()
// Run will start hub server and proxy server
func (s *yurtHubServer) Run() {
go func() {
err := s.hubServer.ListenAndServe()
if err != nil {
panic(err)
}
}()

err := s.proxyServer.ListenAndServe()
if err != nil {
panic(err)
}
}

func (s *yurtHubServer) registerHandler() {
// registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token.
func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager) {
// register handlers for update join token
s.mux.HandleFunc("/v1/token", s.updateToken).Methods("POST", "PUT")
c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT")

// register handler for health check
s.mux.HandleFunc("/v1/healthz", s.healthz).Methods("GET")
c.HandleFunc("/v1/healthz", healthz).Methods("GET")

// register handler for profile
profile.Install(s.mux)

// attention: "/" route must be put at the end of registerHandler
// register handlers for proxy to kube-apiserver
s.mux.PathPrefix("/").Handler(s.proxyHandler)
if cfg.EnableProfiling {
profile.Install(c)
}
}

func (s *yurtHubServer) healthz(w http.ResponseWriter, r *http.Request) {
// healthz returns ok for healthz request
func healthz(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "OK")
}

0 comments on commit 3d2eba7

Please sign in to comment.