Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

pdutil: use temp pause config #592

Merged
merged 25 commits into from
Nov 16, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 54 additions & 68 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/util/codec"
pd "github.com/tikv/pd/client"
"github.com/coreos/go-semver/semver"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/utils"
Expand All @@ -37,6 +38,12 @@ const (
pauseTimeout = 5 * time.Minute
)

var(
// in this version we can use pause configs
// see https://github.com/tikv/pd/pull/3088
pauseConfigVersion = semver.New("4.0.8")
)

// clusterConfig represents a set of scheduler whose config have been modified
// along with their original config.
type clusterConfig struct {
Expand All @@ -62,14 +69,12 @@ var (
"shuffle-hot-region-scheduler": {},
}
// TODO remove this, see https://github.com/pingcap/br/pull/555#discussion_r509855972
pdRegionMergeCfg = []string{
"max-merge-region-keys",
"max-merge-region-size",
}
pdScheduleLimitCfg = []string{
"leader-schedule-limit",
"region-schedule-limit",
"max-snapshot-count",
expectPDCfg = map[string]int {
"max-merge-region-keys": 0,
"max-merge-region-size": 0,
"leader-schedule-limit": 1,
"region-schedule-limit": 1,
"max-snapshot-count": 1,
}

// DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml.
Expand Down Expand Up @@ -122,6 +127,7 @@ type PdController struct {
addrs []string
cli *http.Client
pdClient pd.Client
version *semver.Version

// control the pause schedulers goroutine
schedulerPauseCh chan struct{}
Expand All @@ -144,6 +150,7 @@ func NewPdController(
addrs := strings.Split(pdAddrs, ",")
processedAddrs := make([]string, 0, len(addrs))
var failure error
var versionBytes []byte
for _, addr := range addrs {
if addr != "" && !strings.HasPrefix("http", addr) {
if tlsConf != nil {
Expand All @@ -153,14 +160,18 @@ func NewPdController(
}
}
processedAddrs = append(processedAddrs, addr)
_, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil)
versionBytes, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil)
if failure == nil {
break
}
}
if failure != nil {
return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs)
}
version, err := semver.NewVersion(string(versionBytes))
if err != nil {
return nil, errors.Annotatef(err, "transform pd version failed", string(versionBytes))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you choose an error in pkg/errors?

}

maxCallMsgSize := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
Expand All @@ -180,12 +191,17 @@ func NewPdController(
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
version: version,
// We should make a buffered channel here otherwise when context canceled,
// gracefully shutdown will stick at resuming schedulers.
schedulerPauseCh: make(chan struct{}, 1),
}, nil
}

func (p *PdController) isPauseConfigEnabled() bool {
return p.version.Compare(*pauseConfigVersion) >= 0
}

// SetHTTP set pd addrs and cli for test.
func (p *PdController) SetHTTP(addrs []string, cli *http.Client) {
p.addrs = addrs
Expand Down Expand Up @@ -260,16 +276,14 @@ func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string)
return p.pauseSchedulersWith(ctx, schedulers, pdRequest)
}

func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
removedSchedulers := make([]string, 0, len(schedulers))
func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
// pause this scheduler with 300 seconds
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)})
if err != nil {
return nil, err
}

// first pause this scheduler, if the first time failed. we should return the error
// so put first time out of for loop. and in for loop we could ignore other failed pause.
// PauseSchedulers remove pd scheduler temporarily.
removedSchedulers := make([]string, 0, len(schedulers))
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.addrs {
Expand All @@ -280,11 +294,21 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str
}
}
if err != nil {
log.Error("failed to pause scheduler at beginning",
zap.Strings("name", schedulers), zap.Error(err))
return nil, err
return removedSchedulers, err
}
}
return removedSchedulers, nil
}

func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
// first pause this scheduler, if the first time failed. we should return the error
// so put first time out of for loop. and in for loop we could ignore other failed pause.
removedSchedulers, err := p.doPauseSchedulers(ctx, schedulers, post)
if err != nil {
log.Error("failed to pause scheduler at beginning",
zap.Strings("name", schedulers), zap.Error(err))
return nil, err
}
log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers))

go func() {
Expand All @@ -296,19 +320,11 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str
case <-ctx.Done():
return
case <-tick.C:
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.addrs {
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err == nil {
break
}
}
if err == nil {
log.Info("pause scheduler", zap.String("name", scheduler))
} else {
log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err))
}
_, err := p.doPauseSchedulers(ctx, schedulers, post)
if err == nil {
log.Info("pause scheduler", zap.Strings("name", removedSchedulers))
} else {
log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err))
}
case <-p.schedulerPauseCh:
log.Info("exit pause scheduler successful")
Expand Down Expand Up @@ -423,7 +439,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster
}
log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg))
mergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
for cfgKey := range expectPDCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
Expand All @@ -435,18 +451,6 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster
return errors.Annotate(err, "fail to update PD merge config")
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
scheduleLimitCfg[cfgKey] = value
}
if err := pd.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
return errors.Annotate(err, "fail to update PD schedule config")
}
if locationPlacement, ok := clusterCfg.scheduleCfg["enable-location-replacement"]; ok {
log.Debug("restoring config enable-location-replacement", zap.Any("enable-location-placement", locationPlacement))
if err := pd.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": locationPlacement}); err != nil {
Expand Down Expand Up @@ -496,38 +500,20 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg})
log.Debug("saved PD config", zap.Any("config", scheduleCfg))

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := scheduleCfg[cfgKey]
if value == nil {
disablePDCfg := make(map[string]interface{})
for cfgKey, cfgVal := range expectPDCfg {
value, ok := scheduleCfg[cfgKey]
if !ok {
// Ignore non-exist config.
continue
}
// Disable region merge by setting config to 0.
disableMergeCfg[cfgKey] = 0
limit := int(value.(float64))
disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * cfgVal
}
err = p.UpdatePDScheduleConfig(ctx, disableMergeCfg)
err = p.UpdatePDScheduleConfig(ctx, disablePDCfg)
if err != nil {
return
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}

// Speed update PD scheduler by enlarging scheduling limits.
// Multiply limits by store count but no more than 40.
// Larger limit may make cluster unstable.
limit := int(value.(float64))
scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores)))
}
if err := p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
return undo, err
}
return undo, p.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": "false"})
}

Expand Down