From b11f1c4567dc289f466cae3fb626a6e2aaa255f8 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 10 Aug 2022 11:50:48 +0800 Subject: [PATCH] scale-in pd leader after evict the leader (#2005) --- pkg/cluster/operation/scale_in.go | 41 +++++++++++++++++++++++++++++++ pkg/cluster/operation/upgrade.go | 8 +++++- pkg/cluster/spec/cdc.go | 10 +------- 3 files changed, 49 insertions(+), 10 deletions(-) diff --git a/pkg/cluster/operation/scale_in.go b/pkg/cluster/operation/scale_in.go index 4d27f769b6..ef8135b2d4 100644 --- a/pkg/cluster/operation/scale_in.go +++ b/pkg/cluster/operation/scale_in.go @@ -263,6 +263,7 @@ func ScaleInCluster( cdcInstances := make([]spec.Instance, 0) // Delete member from cluster for _, component := range cluster.ComponentsByStartOrder() { + deferInstances := make([]spec.Instance, 0) for _, instance := range component.Instances() { if !deletedNodes.Exist(instance.ID()) { continue @@ -274,6 +275,46 @@ func ScaleInCluster( continue } + if component.Role() == spec.ComponentPD { + // defer PD leader to be scale-in after others + isLeader, err := instance.(*spec.PDInstance).IsLeader(ctx, cluster, int(options.APITimeout), tlsCfg) + if err != nil { + logger.Warnf("cannot found pd leader, ignore: %s", err) + return err + } + if isLeader { + deferInstances = append(deferInstances, instance) + logger.Debugf("Deferred scale-in of PD leader %s", instance.ID()) + continue + } + } + + err := deleteMember(ctx, component, instance, pdClient, binlogClient, options.APITimeout) + if err != nil { + return errors.Trace(err) + } + + if !asyncOfflineComps.Exist(instance.ComponentName()) { + instCount[instance.GetHost()]-- + if err := StopAndDestroyInstance(ctx, cluster, instance, options, false, instCount[instance.GetHost()] == 0, tlsCfg); err != nil { + return err + } + } else { + logger.Warnf(color.YellowString("The component `%s` will become tombstone, maybe exists in several minutes or hours, after that you can use the prune command to clean it", + component.Name())) + } + } + + // process deferred instances + for _, instance := range deferInstances { + // actually, it must be the pd leader at the moment, so the `PreRestart` always triggered. + rollingInstance, ok := instance.(spec.RollingUpdateInstance) + if ok { + if err := rollingInstance.PreRestart(ctx, cluster, int(options.APITimeout), tlsCfg); err != nil { + return errors.Trace(err) + } + } + err := deleteMember(ctx, component, instance, pdClient, binlogClient, options.APITimeout) if err != nil { return errors.Trace(err) diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index c9e20b906f..02775d4cf0 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -175,7 +175,13 @@ func Upgrade( return RestartMonitored(ctx, uniqueHosts.Slice(), noAgentHosts, topo.GetMonitoredOptions(), options.OptTimeout) } -func upgradeInstance(ctx context.Context, topo spec.Topology, instance spec.Instance, options Options, tlsCfg *tls.Config) (err error) { +func upgradeInstance( + ctx context.Context, + topo spec.Topology, + instance spec.Instance, + options Options, + tlsCfg *tls.Config, +) (err error) { // insert checkpoint point := checkpoint.Acquire(ctx, upgradePoint, map[string]interface{}{"instance": instance.ID()}) defer func() { diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 71750d232a..ac08d6e841 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -241,14 +241,7 @@ func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutS client := api.NewCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg) captures, err := client.GetAllCaptures() if err != nil { - logger.Warnf("cdc pre-restart skipped, cannot get all captures, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start)) - return nil - } - - // this may happen all other captures crashed, only this one alive, - // no need to drain the capture, just return it to trigger hard restart. - if len(captures) <= 1 { - logger.Debugf("cdc pre-restart finished, only one alive capture found, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start)) + logger.Debugf("cdc pre-restart skipped, cannot get all captures, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start)) return nil } @@ -257,7 +250,6 @@ func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutS found bool isOwner bool ) - for _, capture := range captures { if address == capture.AdvertiseAddr { found = true