Skip to content

Commit

Permalink
Feature/cache pd status (#986)
Browse files Browse the repository at this point in the history
* typo(cluster/pdapi): misused arg name

* enhance(cluster/pd): pd.Status check by itself

* feat(cluster/spec): add GetDashardAddress

* enhance(utils): set http clien't dial timeout to 5second

* feat(cluster/manager): cache pd's status

* feat(cluster/manager): display dashboard url

* tests(cluster): add display result check

* feat(dm): get master's status by local host

* feat(cluster/manager): cache dm's master status

* feat(cluster/manager): highlight display dm's status

* feat(cluster/manager): highlight display dm's status

* refact(cluster/pd): rm notused suffix

* typo(spec/cluster): UP -> Up

Co-authored-by: SIGSEGV <gnu.crazier@gmail.com>
  • Loading branch information
jsvisa and lucklove committed Dec 31, 2020
1 parent edb12b8 commit 8dd0cb5
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 87 deletions.
14 changes: 6 additions & 8 deletions components/dm/spec/topology_dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,10 @@ type MasterSpec struct {
}

// Status queries current status of the instance
func (s MasterSpec) Status(tlsCfg *tls.Config, masterList ...string) string {
if len(masterList) < 1 {
return "N/A"
}
masterapi := api.NewDMMasterClient(masterList, statusQueryTimeout, tlsCfg)
isFound, isActive, isLeader, err := masterapi.GetMaster(s.Name)
func (s MasterSpec) Status(tlsCfg *tls.Config, _ ...string) string {
addr := fmt.Sprintf("%s:%d", s.Host, s.Port)
dc := api.NewDMMasterClient([]string{addr}, statusQueryTimeout, tlsCfg)
isFound, isActive, isLeader, err := dc.GetMaster(s.Name)
if err != nil {
return "Down"
}
Expand Down Expand Up @@ -193,8 +191,8 @@ func (s WorkerSpec) Status(tlsCfg *tls.Config, masterList ...string) string {
if len(masterList) < 1 {
return "N/A"
}
masterapi := api.NewDMMasterClient(masterList, statusQueryTimeout, tlsCfg)
stage, err := masterapi.GetWorker(s.Name)
dc := api.NewDMMasterClient(masterList, statusQueryTimeout, tlsCfg)
stage, err := dc.GetWorker(s.Name)
if err != nil {
return "Down"
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func tryURLs(endpoints []string, f func(endpoint string) ([]byte, error)) ([]byt
return bytes, nil
}
if len(endpoints) > 1 && err != nil {
err = errors.Errorf("no endpoint available, the last err is: %s", err)
err = errors.Errorf("no endpoint available, the last err was: %s", err)
}
return bytes, err
}

func (pc *PDClient) getEndpoints(cmd string) (endpoints []string) {
func (pc *PDClient) getEndpoints(uri string) (endpoints []string) {
for _, addr := range pc.addrs {
endpoint := fmt.Sprintf("%s/%s", pc.GetURL(addr), cmd)
endpoint := fmt.Sprintf("%s/%s", pc.GetURL(addr), uri)
endpoints = append(endpoints, endpoint)
}

Expand Down
137 changes: 88 additions & 49 deletions pkg/cluster/manager/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func (m *Manager) Display(name string, opt operator.Options) error {
base := metadata.GetBaseMeta()
// display cluster meta
cyan := color.New(color.FgCyan, color.Bold)
fmt.Printf("Cluster type: %s\n", cyan.Sprint(m.sysName))
fmt.Printf("Cluster name: %s\n", cyan.Sprint(name))
fmt.Printf("Cluster version: %s\n", cyan.Sprint(base.Version))
fmt.Printf("SSH type: %s\n", cyan.Sprint(topo.BaseTopo().GlobalOptions.SSHType))
fmt.Printf("Cluster type: %s\n", cyan.Sprint(m.sysName))
fmt.Printf("Cluster name: %s\n", cyan.Sprint(name))
fmt.Printf("Cluster version: %s\n", cyan.Sprint(base.Version))
fmt.Printf("SSH type: %s\n", cyan.Sprint(topo.BaseTopo().GlobalOptions.SSHType))

// display TLS info
if topo.BaseTopo().GlobalOptions.TLSEnabled {
fmt.Printf("TLS encryption: %s\n", cyan.Sprint("enabled"))
fmt.Printf("TLS encryption: %s\n", cyan.Sprint("enabled"))
fmt.Printf("CA certificate: %s\n", cyan.Sprint(
m.specManager.Path(name, spec.TLSCertKeyDir, spec.TLSCACert),
))
Expand All @@ -71,8 +71,7 @@ func (m *Manager) Display(name string, opt operator.Options) error {
}

ctx := task.NewContext()
err = ctx.SetSSHKeySet(m.specManager.Path(name, "ssh", "id_rsa"),
m.specManager.Path(name, "ssh", "id_rsa.pub"))
err = ctx.SetSSHKeySet(m.specManager.Path(name, "ssh", "id_rsa"), m.specManager.Path(name, "ssh", "id_rsa.pub"))
if err != nil {
return err
}
Expand All @@ -84,56 +83,96 @@ func (m *Manager) Display(name string, opt operator.Options) error {

filterRoles := set.NewStringSet(opt.Roles...)
filterNodes := set.NewStringSet(opt.Nodes...)
pdList := topo.BaseTopo().MasterList
masterList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))
if err != nil {
return err
}
for _, comp := range topo.ComponentsByStartOrder() {
for _, ins := range comp.Instances() {
// apply role filter
if len(filterRoles) > 0 && !filterRoles.Exist(ins.Role()) {
continue
}
// apply node filter
if len(filterNodes) > 0 && !filterNodes.Exist(ins.ID()) {
continue

masterActive := make([]string, 0)
masterStatus := make(map[string]string)

topo.IterInstance(func(ins spec.Instance) {
if ins.ComponentName() != spec.ComponentPD && ins.ComponentName() != spec.ComponentDMMaster {
return
}
status := ins.Status(tlsCfg, masterList...)
if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") {
instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())
masterActive = append(masterActive, instAddr)
}
masterStatus[ins.ID()] = status
})

var dashboardAddr string
if t, ok := topo.(*spec.Specification); ok {
var err error
dashboardAddr, err = t.GetDashboardAddress(tlsCfg, masterActive...)
if dashboardAddr != "" && err == nil {
schema := "http"
if tlsCfg != nil {
schema = "https"
}
fmt.Printf("Dashboard URL: %s\n", cyan.Sprintf("%s://%s/dashboard", schema, dashboardAddr))
}
}

dataDir := "-"
insDirs := ins.UsedDirs()
deployDir := insDirs[0]
if len(insDirs) > 1 {
dataDir = insDirs[1]
topo.IterInstance(func(ins spec.Instance) {
// apply role filter
if len(filterRoles) > 0 && !filterRoles.Exist(ins.Role()) {
return
}
// apply node filter
if len(filterNodes) > 0 && !filterNodes.Exist(ins.ID()) {
return
}

dataDir := "-"
insDirs := ins.UsedDirs()
deployDir := insDirs[0]
if len(insDirs) > 1 {
dataDir = insDirs[1]
}

var status string
switch ins.ComponentName() {
case spec.ComponentPD:
status = masterStatus[ins.ID()]
instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())
if dashboardAddr == instAddr {
status += "|UI"
}
case spec.ComponentDMMaster:
status = masterStatus[ins.ID()]
default:
status = ins.Status(tlsCfg, masterActive...)
}

status := ins.Status(tlsCfg, pdList...)
// Query the service status
if status == "-" {
e, found := ctx.GetExecutor(ins.GetHost())
if found {
active, _ := operator.GetServiceStatus(e, ins.ServiceName())
if parts := strings.Split(strings.TrimSpace(active), " "); len(parts) > 2 {
if parts[1] == "active" {
status = "Up"
} else {
status = parts[1]
}
// Query the service status
if status == "-" {
e, found := ctx.GetExecutor(ins.GetHost())
if found {
active, _ := operator.GetServiceStatus(e, ins.ServiceName())
if parts := strings.Split(strings.TrimSpace(active), " "); len(parts) > 2 {
if parts[1] == "active" {
status = "Up"
} else {
status = parts[1]
}
}
}
clusterTable = append(clusterTable, []string{
color.CyanString(ins.ID()),
ins.Role(),
ins.GetHost(),
utils.JoinInt(ins.UsedPorts(), "/"),
cliutil.OsArch(ins.OS(), ins.Arch()),
formatInstanceStatus(status),
dataDir,
deployDir,
})
}
}
clusterTable = append(clusterTable, []string{
color.CyanString(ins.ID()),
ins.Role(),
ins.GetHost(),
utils.JoinInt(ins.UsedPorts(), "/"),
cliutil.OsArch(ins.OS(), ins.Arch()),
formatInstanceStatus(status),
dataDir,
deployDir,
})
})

// Sort by role,host,ports
sort.Slice(clusterTable[1:], func(i, j int) bool {
Expand All @@ -152,7 +191,7 @@ func (m *Manager) Display(name string, opt operator.Options) error {

if t, ok := topo.(*spec.Specification); ok {
// Check if TiKV's label set correctly
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
pdClient := api.NewPDClient(masterActive, 10*time.Second, tlsCfg)
if lbs, err := pdClient.GetLocationLabels(); err != nil {
log.Debugf("get location labels from pd failed: %v", err)
} else if err := spec.CheckTiKVLabels(lbs, pdClient); err != nil {
Expand Down Expand Up @@ -182,13 +221,13 @@ func formatInstanceStatus(status string) string {
}

switch {
case startsWith("up|l"): // up|l, up|l|ui
case startsWith("up|l", "healthy|l"): // up|l, up|l|ui, healthy|l
return color.HiGreenString(status)
case startsWith("up"):
case startsWith("up", "healthy", "free"):
return color.GreenString(status)
case startsWith("down", "err"): // down, down|ui
return color.RedString(status)
case startsWith("tombstone", "disconnected"), strings.Contains(status, "offline"):
case startsWith("tombstone", "disconnected", "n/a"), strings.Contains(status, "offline"):
return color.YellowString(status)
default:
return status
Expand Down
32 changes: 10 additions & 22 deletions pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -52,37 +51,26 @@ type PDSpec struct {
}

// Status queries current status of the instance
func (s PDSpec) Status(tlsCfg *tls.Config, pdList ...string) string {
curAddr := fmt.Sprintf("%s:%d", s.Host, s.ClientPort)
curPdAPI := api.NewPDClient([]string{curAddr}, statusQueryTimeout, tlsCfg)
allPdAPI := api.NewPDClient(pdList, statusQueryTimeout, tlsCfg)
suffix := ""

// find dashboard node
dashboardAddr, _ := allPdAPI.GetDashboardAddress()
if strings.HasPrefix(dashboardAddr, "http") {
r := strings.NewReplacer("http://", "", "https://", "")
dashboardAddr = r.Replace(dashboardAddr)
}
if dashboardAddr == curAddr {
suffix = "|UI"
}
func (s PDSpec) Status(tlsCfg *tls.Config, _ ...string) string {
addr := fmt.Sprintf("%s:%d", s.Host, s.ClientPort)
pc := api.NewPDClient([]string{addr}, statusQueryTimeout, tlsCfg)

// check health
err := curPdAPI.CheckHealth()
err := pc.CheckHealth()
if err != nil {
return "Down" + suffix
return "Down"
}

// find leader node
leader, err := curPdAPI.GetLeader()
leader, err := pc.GetLeader()
if err != nil {
return "ERR" + suffix
return "ERR"
}
res := "Up"
if s.Name == leader.Name {
suffix = "|L" + suffix
res += "|L"
}
return "Up" + suffix
return res
}

// Role returns the component role of the instance
Expand Down
15 changes: 15 additions & 0 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/creasty/defaults"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/executor"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
"github.com/pingcap/tiup/pkg/logger/log"
Expand Down Expand Up @@ -370,6 +371,20 @@ func (s *Specification) GetPDList() []string {
return pdList
}

// GetDashboardAddress returns the cluster's dashboard addr
func (s *Specification) GetDashboardAddress(tlsCfg *tls.Config, pdList ...string) (string, error) {
pc := api.NewPDClient(pdList, statusQueryTimeout, tlsCfg)
dashboardAddr, err := pc.GetDashboardAddress()
if err != nil {
return "", err
}
if strings.HasPrefix(dashboardAddr, "http") {
r := strings.NewReplacer("http://", "", "https://", "")
dashboardAddr = r.Replace(dashboardAddr)
}
return dashboardAddr, nil
}

// GetEtcdClient load EtcdClient of current cluster
func (s *Specification) GetEtcdClient(tlsCfg *tls.Config) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Expand Down
5 changes: 1 addition & 4 deletions pkg/cluster/spec/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ func statusByURL(url string, tlsCfg *tls.Config) string {

// body doesn't have any status section needed
body, err := client.Get(url)
if err != nil {
return "Down"
}
if body == nil {
if err != nil || body == nil {
return "Down"
}
return "Up"
Expand Down
2 changes: 2 additions & 0 deletions pkg/utils/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"time"
)
Expand All @@ -37,6 +38,7 @@ func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient {
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
Dial: (&net.Dialer{Timeout: 5 * time.Second}).Dial,
},
},
}
Expand Down
7 changes: 6 additions & 1 deletion tests/tiup-cluster/script/cmd_subtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ function cmd_subtest() {

tiup-cluster $client _test $name data

tiup-cluster $client display $name
display_result=`tiup-cluster $client display $name`
echo "$display_result" | grep "Cluster type"
echo "$display_result" | grep "Cluster name"
echo "$display_result" | grep "Cluster version"
echo "$display_result" | grep "Dashboard URL"
echo "$display_result" | grep "Total nodes"

# Test rename
tiup-cluster $client rename $name "tmp-cluster-name"
Expand Down

0 comments on commit 8dd0cb5

Please sign in to comment.