Skip to content

Commit

Permalink
cluster: upgrade PD leader after other PD instances (#1086)
Browse files Browse the repository at this point in the history
Co-authored-by: SIGSEGV <gnu.crazier@gmail.com>
  • Loading branch information
AstroProfundis and lucklove authored Jan 22, 2021
1 parent 9f96db2 commit d85cb88
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 8 deletions.
29 changes: 27 additions & 2 deletions pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,36 @@ func Upgrade(
continue
}

// Transfer leader of evict leader if the component is TiKV/PD in non-force mode
log.Infof("Upgrading component %s", component.Name())

log.Infof("Restarting component %s", component.Name())
// some instances are upgraded after others
deferInstances := make([]spec.Instance, 0)

for _, instance := range instances {
switch component.Name() {
case spec.ComponentPD:
// defer PD leader to be upgraded after others
isLeader, err := instance.(*spec.PDInstance).IsLeader(topo, int(options.APITimeout), tlsCfg)
if err != nil {
return err
}
if isLeader {
deferInstances = append(deferInstances, instance)
log.Debugf("Defferred upgrading of PD leader %s", instance.ID())
continue
}
default:
// do nothing, kept for future usage with other components
}

if err := upgradeInstance(ctx, topo, instance, options, tlsCfg); err != nil {
return err
}
}

// process defferred instances
for _, instance := range deferInstances {
log.Debugf("Upgrading defferred instance %s...", instance.ID())
if err := upgradeInstance(ctx, topo, instance, options, tlsCfg); err != nil {
return err
}
Expand Down
30 changes: 24 additions & 6 deletions pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,26 @@ func (i *PDInstance) ScaleConfig(

var _ RollingUpdateInstance = &PDInstance{}

// IsLeader checks if the instance is PD leader
func (i *PDInstance) IsLeader(topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) (bool, error) {
tidbTopo, ok := topo.(*Specification)
if !ok {
panic("topo should be type of tidb topology")
}
pdClient := api.NewPDClient(tidbTopo.GetPDList(), time.Second*5, tlsCfg)

return i.isLeader(pdClient)
}

func (i *PDInstance) isLeader(pdClient *api.PDClient) (bool, error) {
leader, err := pdClient.GetLeader()
if err != nil {
return false, errors.Annotatef(err, "failed to get PD leader %s", i.GetHost())
}

return leader.Name == i.Name, nil
}

// PreRestart implements RollingUpdateInstance interface.
func (i *PDInstance) PreRestart(topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error {
timeoutOpt := &utils.RetryOption{
Expand All @@ -300,15 +320,13 @@ func (i *PDInstance) PreRestart(topo Topology, apiTimeoutSeconds int, tlsCfg *tl
if !ok {
panic("topo should be type of tidb topology")
}
pdClient := api.NewPDClient(tidbTopo.GetPDList(), time.Second*5, tlsCfg)

pdClient := api.NewPDClient(tidbTopo.GetPDList(), 5*time.Second, tlsCfg)

leader, err := pdClient.GetLeader()
isLeader, err := i.isLeader(pdClient)
if err != nil {
return errors.Annotatef(err, "failed to get PD leader %s", i.GetHost())
return err
}

if len(tidbTopo.PDServers) > 1 && leader.Name == i.Name {
if len(tidbTopo.PDServers) > 1 && isLeader {
if err := pdClient.EvictPDLeader(timeoutOpt); err != nil {
return errors.Annotatef(err, "failed to evict PD leader %s", i.GetHost())
}
Expand Down

0 comments on commit d85cb88

Please sign in to comment.