Skip to content

Commit

Permalink
Merge branch 'master' into label-delete
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Sep 15, 2022
2 parents 161ee36 + bca2afa commit e92f535
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 72 deletions.
3 changes: 3 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type PDPeerStats struct {
}

func fromPeer(peer *metapb.Peer) MetaPeer {
if peer == nil {
return MetaPeer{}
}
return MetaPeer{
Peer: peer,
RoleName: peer.GetRole().String(),
Expand Down
4 changes: 4 additions & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ func (suite *regionTestSuite) TestRegionCheck() {
}

func (suite *regionTestSuite) TestRegions() {
r := NewAPIRegionInfo(core.NewRegionInfo(&metapb.Region{Id: 1}, nil))
suite.Nil(r.Leader.Peer)
suite.Len(r.Leader.RoleName, 0)

rs := []*core.RegionInfo{
newTestRegionInfo(2, 1, []byte("a"), []byte("b")),
newTestRegionInfo(3, 1, []byte("b"), []byte("c")),
Expand Down
27 changes: 19 additions & 8 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,18 +605,20 @@ type ruleLeaderFitFilter struct {
region *core.RegionInfo
oldFit *placement.RegionFit
srcLeaderStoreID uint64
allowMoveLeader bool
}

// newRuleLeaderFitFilter creates a filter that ensures after transfer leader with new store,
// the isolation level will not decrease.
func newRuleLeaderFitFilter(scope string, cluster *core.BasicCluster, ruleManager *placement.RuleManager, region *core.RegionInfo, srcLeaderStoreID uint64) Filter {
func newRuleLeaderFitFilter(scope string, cluster *core.BasicCluster, ruleManager *placement.RuleManager, region *core.RegionInfo, srcLeaderStoreID uint64, allowMoveLeader bool) Filter {
return &ruleLeaderFitFilter{
scope: scope,
cluster: cluster,
ruleManager: ruleManager,
region: region,
oldFit: ruleManager.FitRegion(cluster, region),
srcLeaderStoreID: srcLeaderStoreID,
allowMoveLeader: allowMoveLeader,
}
}

Expand All @@ -633,14 +635,23 @@ func (f *ruleLeaderFitFilter) Source(options *config.PersistOptions, store *core
}

func (f *ruleLeaderFitFilter) Target(options *config.PersistOptions, store *core.StoreInfo) *plan.Status {
targetPeer := f.region.GetStorePeer(store.GetID())
targetStoreID := store.GetID()
sourcePeer := f.region.GetStorePeer(f.srcLeaderStoreID)
targetPeer := f.region.GetStorePeer(targetStoreID)
newRegionOptions := []core.RegionCreateOption{core.WithLeader(targetPeer)}
if targetPeer == nil {
log.Warn("ruleLeaderFitFilter couldn't find peer on target Store", zap.Uint64("target-store", store.GetID()))
return statusStoreNotMatchRule
if !f.allowMoveLeader {
log.Warn("ruleLeaderFitFilter couldn't find peer on target Store", zap.Uint64("target-store", store.GetID()))
return statusStoreNotMatchRule
}
newRegionOptions = []core.RegionCreateOption{
core.WithReplacePeerStore(f.srcLeaderStoreID, targetStoreID),
core.WithLeader(&metapb.Peer{Id: sourcePeer.GetId(), StoreId: targetStoreID}),
}
}
copyRegion := createRegionForRuleFit(f.region.GetStartKey(), f.region.GetEndKey(),
f.region.GetPeers(), f.region.GetLeader(),
core.WithLeader(targetPeer))
f.region.GetPeers(), f.region.GetLeader(), newRegionOptions...,
)
newFit := f.ruleManager.FitRegion(f.cluster, copyRegion)
if placement.CompareRegionFit(f.oldFit, newFit) <= 0 {
return statusOK
Expand All @@ -664,9 +675,9 @@ func NewPlacementSafeguard(scope string, opt *config.PersistOptions, cluster *co
// NewPlacementLeaderSafeguard creates a filter that ensures after transfer a leader with
// existed peer, the placement restriction will not become worse.
// Note that it only worked when PlacementRules enabled otherwise it will always permit the sourceStore.
func NewPlacementLeaderSafeguard(scope string, opt *config.PersistOptions, cluster *core.BasicCluster, ruleManager *placement.RuleManager, region *core.RegionInfo, sourceStore *core.StoreInfo) Filter {
func NewPlacementLeaderSafeguard(scope string, opt *config.PersistOptions, cluster *core.BasicCluster, ruleManager *placement.RuleManager, region *core.RegionInfo, sourceStore *core.StoreInfo, allowMoveLeader bool) Filter {
if opt.IsPlacementRulesEnabled() {
return newRuleLeaderFitFilter(scope, cluster, ruleManager, region, sourceStore.GetID())
return newRuleLeaderFitFilter(scope, cluster, ruleManager, region, sourceStore.GetID(), allowMoveLeader)
}
return nil
}
Expand Down
122 changes: 70 additions & 52 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool)
l.conf.mu.RLock()
defer l.conf.mu.RUnlock()
basePlan := NewBalanceSchedulerPlan()
var collector *plan.Collector
if dryRun {
collector = plan.NewCollector(basePlan)
}
batch := l.conf.Batch
schedulerCounter.WithLabelValues(l.GetName(), "schedule").Inc()

Expand All @@ -359,56 +363,58 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool)
scoreFunc := func(store *core.StoreInfo) float64 {
return store.LeaderScore(solver.kind.Policy, solver.GetOpInfluence(store.GetID()))
}
sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetOpts(), nil), false, scoreFunc)
sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetOpts(), collector), false, scoreFunc)
targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetOpts(), nil), true, scoreFunc)
usedRegions := make(map[uint64]struct{})

result := make([]*operator.Operator, 0, batch)
for sourceCandidate.hasStore() || targetCandidate.hasStore() {
// first choose source
if sourceCandidate.hasStore() {
op := createTransferLeaderOperator(sourceCandidate, transferOut, l, solver, usedRegions)
op := createTransferLeaderOperator(sourceCandidate, transferOut, l, solver, usedRegions, collector)
if op != nil {
result = append(result, op)
if len(result) >= batch {
return result, nil
return result, collector.GetPlans()
}
makeInfluence(op, solver, usedRegions, sourceCandidate, targetCandidate)
}
}
// next choose target
if targetCandidate.hasStore() {
op := createTransferLeaderOperator(targetCandidate, transferIn, l, solver, usedRegions)
op := createTransferLeaderOperator(targetCandidate, transferIn, l, solver, usedRegions, nil)
if op != nil {
result = append(result, op)
if len(result) >= batch {
return result, nil
return result, collector.GetPlans()
}
makeInfluence(op, solver, usedRegions, sourceCandidate, targetCandidate)
}
}
}
l.retryQuota.GC(append(sourceCandidate.stores, targetCandidate.stores...))
return result, nil
return result, collector.GetPlans()
}

func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLeaderScheduler,
plan *solver, usedRegions map[uint64]struct{}) *operator.Operator {
ssolver *solver, usedRegions map[uint64]struct{}, collector *plan.Collector) *operator.Operator {
store := cs.getStore()
ssolver.step++
defer func() { ssolver.step-- }()
retryLimit := l.retryQuota.GetLimit(store)
var creator func(*solver) *operator.Operator
var creator func(*solver, *plan.Collector) *operator.Operator
switch dir {
case transferOut:
plan.source, plan.target = store, nil
ssolver.source, ssolver.target = store, nil
creator = l.transferLeaderOut
case transferIn:
plan.source, plan.target = nil, store
ssolver.source, ssolver.target = nil, store
creator = l.transferLeaderIn
}
var op *operator.Operator
for i := 0; i < retryLimit; i++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if op = creator(plan); op != nil {
if op = creator(ssolver, collector); op != nil {
if _, ok := usedRegions[op.RegionID()]; !ok {
break
}
Expand Down Expand Up @@ -443,105 +449,117 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s
// transferLeaderOut transfers leader from the source store.
// It randomly selects a health region from the source store, then picks
// the best follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderOut(plan *solver) *operator.Operator {
plan.region = filter.SelectOneRegion(plan.RandLeaderRegions(plan.SourceStoreID(), l.conf.Ranges),
nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter())
if plan.region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.SourceStoreID()))
func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator {
solver.region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.SourceStoreID(), l.conf.Ranges),
collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter())
if solver.region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.SourceStoreID()))
schedulerCounter.WithLabelValues(l.GetName(), "no-leader-region").Inc()
return nil
}
targets := plan.GetFollowerStores(plan.region)
if solver.IsRegionHot(solver.region) {
log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID()))
collector.Collect(plan.SetResource(solver.region), plan.SetStatus(plan.NewStatus(plan.StatusRegionHot)))
schedulerCounter.WithLabelValues(l.GetName(), "region-hot").Inc()
return nil
}
solver.step++
defer func() { solver.step-- }()
targets := solver.GetFollowerStores(solver.region)
finalFilters := l.filters
opts := plan.GetOpts()
if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), opts, plan.GetBasicCluster(), plan.GetRuleManager(), plan.region, plan.source); leaderFilter != nil {
opts := solver.GetOpts()
if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), opts, solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, false /*allowMoveLeader*/); leaderFilter != nil {
finalFilters = append(l.filters, leaderFilter)
}
targets = filter.SelectTargetStores(targets, finalFilters, opts, nil)
leaderSchedulePolicy := opts.GetLeaderSchedulePolicy()
sort.Slice(targets, func(i, j int) bool {
iOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[j].GetID())
iOp := solver.GetOpInfluence(targets[i].GetID())
jOp := solver.GetOpInfluence(targets[j].GetID())
return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < targets[j].LeaderScore(leaderSchedulePolicy, jOp)
})
for _, plan.target = range targets {
if op := l.createOperator(plan); op != nil {
for _, solver.target = range targets {
if op := l.createOperator(solver, collector); op != nil {
return op
}
}
log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", plan.region.GetID()))
log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID()))
schedulerCounter.WithLabelValues(l.GetName(), "no-target-store").Inc()
return nil
}

// transferLeaderIn transfers leader to the target store.
// It randomly selects a health region from the target store, then picks
// the worst follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderIn(plan *solver) *operator.Operator {
plan.region = filter.SelectOneRegion(plan.RandFollowerRegions(plan.TargetStoreID(), l.conf.Ranges),
func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator {
solver.region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.TargetStoreID(), l.conf.Ranges),
nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter())
if plan.region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.TargetStoreID()))
if solver.region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.TargetStoreID()))
schedulerCounter.WithLabelValues(l.GetName(), "no-follower-region").Inc()
return nil
}
leaderStoreID := plan.region.GetLeader().GetStoreId()
plan.source = plan.GetStore(leaderStoreID)
if plan.source == nil {
if solver.IsRegionHot(solver.region) {
log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID()))
schedulerCounter.WithLabelValues(l.GetName(), "region-hot").Inc()
return nil
}
leaderStoreID := solver.region.GetLeader().GetStoreId()
solver.source = solver.GetStore(leaderStoreID)
if solver.source == nil {
log.Debug("region has no leader or leader store cannot be found",
zap.String("scheduler", l.GetName()),
zap.Uint64("region-id", plan.region.GetID()),
zap.Uint64("region-id", solver.region.GetID()),
zap.Uint64("store-id", leaderStoreID),
)
schedulerCounter.WithLabelValues(l.GetName(), "no-leader").Inc()
return nil
}
finalFilters := l.filters
opts := plan.GetOpts()
if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), opts, plan.GetBasicCluster(), plan.GetRuleManager(), plan.region, plan.source); leaderFilter != nil {
opts := solver.GetOpts()
if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), opts, solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, false /*allowMoveLeader*/); leaderFilter != nil {
finalFilters = append(l.filters, leaderFilter)
}
target := filter.NewCandidates([]*core.StoreInfo{plan.target}).
target := filter.NewCandidates([]*core.StoreInfo{solver.target}).
FilterTarget(opts, nil, finalFilters...).
PickFirst()
if target == nil {
log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", plan.region.GetID()))
log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID()))
schedulerCounter.WithLabelValues(l.GetName(), "no-target-store").Inc()
return nil
}
return l.createOperator(plan)
return l.createOperator(solver, collector)
}

// createOperator creates the operator according to the source and target store.
// If the region is hot or the difference between the two stores is tolerable, then
// no new operator need to be created, otherwise create an operator that transfers
// the leader from the source store to the target store for the region.
func (l *balanceLeaderScheduler) createOperator(plan *solver) *operator.Operator {
if plan.IsRegionHot(plan.region) {
log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", plan.region.GetID()))
schedulerCounter.WithLabelValues(l.GetName(), "region-hot").Inc()
return nil
}

if !plan.shouldBalance(l.GetName()) {
func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator {
solver.step++
defer func() { solver.step-- }()
if !solver.shouldBalance(l.GetName()) {
schedulerCounter.WithLabelValues(l.GetName(), "skip").Inc()
collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed)))
return nil
}

op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, plan, plan.region, plan.region.GetLeader().GetStoreId(), plan.TargetStoreID(), []uint64{}, operator.OpLeader)
solver.step++
defer func() { solver.step-- }()
op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, solver, solver.region, solver.region.GetLeader().GetStoreId(), solver.TargetStoreID(), []uint64{}, operator.OpLeader)
if err != nil {
log.Debug("fail to create balance leader operator", errs.ZapError(err))
collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusCreateOperatorFailed)))
return nil
}
op.Counters = append(op.Counters,
schedulerCounter.WithLabelValues(l.GetName(), "new-operator"),
)
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(l.GetName(), plan.SourceMetricLabel(), plan.TargetMetricLabel()),
l.counter.WithLabelValues("move-leader", plan.SourceMetricLabel()+"-out"),
l.counter.WithLabelValues("move-leader", plan.TargetMetricLabel()+"-in"),
balanceDirectionCounter.WithLabelValues(l.GetName(), solver.SourceMetricLabel(), solver.TargetMetricLabel()),
l.counter.WithLabelValues("move-leader", solver.SourceMetricLabel()+"-out"),
l.counter.WithLabelValues("move-leader", solver.TargetMetricLabel()+"-in"),
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(plan.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(plan.targetScore, 'f', 2, 64)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(solver.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(solver.targetScore, 'f', 2, 64)
return op
}
12 changes: 8 additions & 4 deletions server/schedulers/balance_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const (
pickSource = iota
pickRegion
pickTarget
// The following two steps may appear in future implementations
// shouldBalance
shouldBalance
// The following one step may appear in future implementations
// createOperator
)

Expand Down Expand Up @@ -123,10 +123,14 @@ func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error)
if !p.status.IsNormal() {
normal = false
}
// we don't consider the situation for verification step
if step > pickTarget {
// we don't consider the situation for createOperator step
if step > shouldBalance {
continue
}
// We can think of shouldBalance as a filtering step for target, except that the current implementation is separate.
if step == shouldBalance {
step = pickTarget
}
if step > maxStep {
storeStatusCounter = make(map[uint64]map[plan.Status]int)
maxStep = step
Expand Down
28 changes: 28 additions & 0 deletions server/schedulers/balance_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,31 @@ func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult4() {
5: plan.NewStatus(plan.StatusStoreDown),
}))
}

func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult5() {
plans := make([]plan.Plan, 0)
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreDown)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[1], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[4], step: 2, status: plan.NewStatus(plan.StatusStoreDown)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[1], step: 3, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)})
plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[4], step: 2, status: plan.NewStatus(plan.StatusStoreDown)})
statuses, isNormal, err := BalancePlanSummary(plans)
suite.NoError(err)
suite.False(isNormal)
suite.True(suite.check(statuses,
map[uint64]*plan.Status{
1: plan.NewStatus(plan.StatusStoreAlreadyHasPeer),
2: plan.NewStatus(plan.StatusStoreAlreadyHasPeer),
3: plan.NewStatus(plan.StatusStoreNotMatchRule),
4: plan.NewStatus(plan.StatusStoreNotMatchRule),
5: plan.NewStatus(plan.StatusStoreDown),
}))
}
Loading

0 comments on commit e92f535

Please sign in to comment.