Skip to content

Commit

Permalink
Introduce a more graceful way to check labels (#843)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucklove committed Oct 23, 2020
1 parent 1fb0740 commit 8cb8fd8
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 22 deletions.
33 changes: 33 additions & 0 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,39 @@ func (pc *PDClient) GetLocationLabels() ([]string, error) {
return rc.LocationLabels, nil
}

// StoreList implements StoreLabelProvider
func (pc *PDClient) StoreList() ([]string, error) {
r, err := pc.GetStores()
if err != nil {
return nil, err
}
addrs := []string{}
for _, s := range r.Stores {
addrs = append(addrs, s.Store.GetAddress())
}
return addrs, nil
}

// GetStoreLabels implements StoreLabelProvider
func (pc *PDClient) GetStoreLabels(address string) (map[string]string, error) {
r, err := pc.GetStores()
if err != nil {
return nil, err
}

for _, s := range r.Stores {
if address == s.Store.GetAddress() {
lbs := s.Store.GetLabels()
labels := map[string]string{}
for _, lb := range lbs {
labels[lb.GetKey()] = lb.GetValue()
}
return labels, nil
}
}
return nil, errors.Errorf("store %s not found", address)
}

// UpdateScheduleConfig updates the PD schedule config
func (pc *PDClient) UpdateScheduleConfig(body io.Reader) error {
return pc.updateConfig(body, pdConfigSchedule)
Expand Down
11 changes: 4 additions & 7 deletions pkg/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,13 +599,12 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
cliutil.PrintTable(clusterTable, true)
fmt.Printf("Total nodes: %d\n", len(clusterTable)-1)

if topo, ok := topo.(*spec.Specification); ok {
if _, ok := topo.(*spec.Specification); ok {
// Check if TiKV's label set correctly
kvs := topo.TiKVServers
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
if lbs, err := pdClient.GetLocationLabels(); err != nil {
color.Yellow("\nWARN: get location labels from pd failed: %v", err)
} else if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
} else if err := spec.CheckTiKVLocationLabels(lbs, pdClient); err != nil {
color.Yellow("\nWARN: there is something wrong with TiKV labels, which may cause data losing:\n%v", err)
}
}
Expand Down Expand Up @@ -1082,8 +1081,7 @@ func (m *Manager) Deploy(
if err != nil {
return err
}
kvs := topo.TiKVServers
if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
if err := spec.CheckTiKVLocationLabels(lbs, topo); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
}
}
Expand Down Expand Up @@ -1522,8 +1520,7 @@ func (m *Manager) ScaleOut(
if err != nil {
return err
}
kvs := mergedTopo.(*spec.Specification).TiKVServers
if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
if err := spec.CheckTiKVLocationLabels(lbs, mergedTopo.(*spec.Specification)); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
}
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,35 @@ func (s *Specification) LocationLabels() ([]string, error) {
return lbs, nil
}

// StoreList implements StoreLabelProvider
func (s *Specification) StoreList() ([]string, error) {
kvs := s.TiKVServers
addrs := []string{}
for _, kv := range kvs {
if kv.IsImported() {
// FIXME: this function implements StoreLabelProvider, which is used to
// detect if the label config is missing. However, we do that
// base on the meta.yaml, whose server.labels field is empty
// for imported TiKV servers, to workaround that, we just skip the
// imported TiKV servers at this time.
continue
}
addrs = append(addrs, fmt.Sprintf("%s:%d", kv.Host, kv.GetMainPort()))
}
return addrs, nil
}

// GetStoreLabels implements StoreLabelProvider
func (s *Specification) GetStoreLabels(address string) (map[string]string, error) {
kvs := s.TiKVServers
for _, kv := range kvs {
if address == fmt.Sprintf("%s:%d", kv.Host, kv.GetMainPort()) {
return kv.Labels()
}
}
return nil, errors.Errorf("store %s not found", address)
}

// AllComponentNames contains the names of all components.
// should include all components in ComponentsByStartOrder
func AllComponentNames() (roles []string) {
Expand Down
33 changes: 24 additions & 9 deletions pkg/cluster/spec/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,34 +339,49 @@ func (e *TiKVLabelError) Error() string {
return str
}

// StoreLabelProvider provide store labels information
type StoreLabelProvider interface {
StoreList() ([]string, error)
GetStoreLabels(address string) (map[string]string, error)
}

func getHostFromAddress(addr string) string {
return strings.Split(addr, ":")[0]
}

// CheckTiKVLocationLabels will check if tikv missing label or have wrong label
func CheckTiKVLocationLabels(pdLocLabels []string, kvs []TiKVSpec) error {
func CheckTiKVLocationLabels(pdLocLabels []string, slp StoreLabelProvider) error {
lerr := &TiKVLabelError{
TiKVInstances: make(map[string][]error),
}
lbs := set.NewStringSet(pdLocLabels...)
hosts := make(map[string]int)

kvs, err := slp.StoreList()
if err != nil {
return err
}
for _, kv := range kvs {
hosts[kv.Host] = hosts[kv.Host] + 1
host := getHostFromAddress(kv)
hosts[host] = hosts[host] + 1
}
for _, kv := range kvs {
id := fmt.Sprintf("%s:%d", kv.Host, kv.GetMainPort())
ls, err := kv.Labels()
host := getHostFromAddress(kv)
ls, err := slp.GetStoreLabels(kv)
if err != nil {
return err
}
if len(ls) == 0 && hosts[kv.Host] > 1 {
lerr.TiKVInstances[id] = append(
lerr.TiKVInstances[id],
if len(ls) == 0 && hosts[host] > 1 {
lerr.TiKVInstances[kv] = append(
lerr.TiKVInstances[kv],
errors.New("multiple TiKV instances are deployed at the same host but location label missing"),
)
continue
}
for lname := range ls {
if !lbs.Exist(lname) {
lerr.TiKVInstances[id] = append(
lerr.TiKVInstances[id],
lerr.TiKVInstances[kv] = append(
lerr.TiKVInstances[kv],
fmt.Errorf("label name '%s' is not specified in pd config (replication.location-labels: %v)", lname, pdLocLabels),
)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/cluster/spec/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,9 @@ tikv_servers:
status_port: 20180
`), &topo)
c.Assert(err, IsNil)
err = CheckTiKVLocationLabels(nil, topo.TiKVServers)
err = CheckTiKVLocationLabels(nil, &topo)
c.Assert(err, IsNil)
err = CheckTiKVLocationLabels([]string{}, topo.TiKVServers)
err = CheckTiKVLocationLabels([]string{}, &topo)
c.Assert(err, IsNil)

// 2 tikv on the same host without label
Expand All @@ -637,7 +637,7 @@ tikv_servers:
status_port: 20181
`), &topo)
c.Assert(err, IsNil)
err = CheckTiKVLocationLabels(nil, topo.TiKVServers)
err = CheckTiKVLocationLabels(nil, &topo)
c.Assert(err, NotNil)

// 2 tikv on the same host with unacquainted label
Expand All @@ -656,7 +656,7 @@ tikv_servers:
server.labels: { zone: "zone1", host: "172.16.5.140" }
`), &topo)
c.Assert(err, IsNil)
err = CheckTiKVLocationLabels(nil, topo.TiKVServers)
err = CheckTiKVLocationLabels(nil, &topo)
c.Assert(err, NotNil)

// 2 tikv on the same host with correct label
Expand All @@ -675,7 +675,7 @@ tikv_servers:
server.labels: { zone: "zone1", host: "172.16.5.140" }
`), &topo)
c.Assert(err, IsNil)
err = CheckTiKVLocationLabels([]string{"zone", "host"}, topo.TiKVServers)
err = CheckTiKVLocationLabels([]string{"zone", "host"}, &topo)
c.Assert(err, IsNil)

// 2 tikv on the same host with diffrent config style
Expand All @@ -697,6 +697,6 @@ tikv_servers:
host: "172.16.5.140"
`), &topo)
c.Assert(err, IsNil)
err = CheckTiKVLocationLabels([]string{"zone", "host"}, topo.TiKVServers)
err = CheckTiKVLocationLabels([]string{"zone", "host"}, &topo)
c.Assert(err, IsNil)
}

0 comments on commit 8cb8fd8

Please sign in to comment.