Skip to content

Commit

Permalink
cherry pick #3361 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jan 28, 2021
1 parent 0899703 commit 2417ff1
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 30 deletions.
92 changes: 80 additions & 12 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,24 +1245,87 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {

hb, err := schedule.CreateScheduler(ScatterRangeType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"}))
c.Assert(err, IsNil)
limit := 0
for {
if limit > 100 {
break
}
ops := hb.Schedule(tc)
if ops == nil {
limit++
continue

scheduleAndApplyOperator(tc, hb, 100)
for i := 1; i <= 5; i++ {
leaderCount := tc.Regions.GetStoreLeaderCount(uint64(i))
c.Check(leaderCount, LessEqual, 12)
regionCount := tc.Regions.GetStoreRegionCount(uint64(i))
c.Check(regionCount, LessEqual, 32)
}
}

func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
// Add stores 1,2,3,4,5.
tc.AddRegionStore(1, 0)
tc.AddRegionStore(2, 0)
tc.AddRegionStore(3, 0)
tc.AddRegionStore(4, 0)
tc.AddRegionStore(5, 0)
var (
id uint64
regions []*metapb.Region
)
for i := 0; i < 50; i++ {
peers := []*metapb.Peer{
{Id: id + 1, StoreId: 1},
{Id: id + 2, StoreId: 2},
{Id: id + 3, StoreId: 3},
}
schedule.ApplyOperator(tc, ops[0])
regions = append(regions, &metapb.Region{
Id: id + 4,
Peers: peers,
StartKey: []byte(fmt.Sprintf("s_%02d", i)),
EndKey: []byte(fmt.Sprintf("s_%02d", i+1)),
})
id += 4
}

// empty case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
regionInfo := core.NewRegionInfo(
meta,
meta.Peers[leader],
core.SetApproximateKeys(96),
core.SetApproximateSize(96),
)

tc.Regions.SetRegion(regionInfo)
}

for i := 0; i < 100; i++ {
_, err := tc.AllocPeer(1)
c.Assert(err, IsNil)
}
for i := 1; i <= 5; i++ {
tc.UpdateStoreStatus(uint64(i))
}
oc := schedule.NewOperatorController(s.ctx, nil, nil)

// test not allow schedule leader
opt.LeaderScheduleLimit = 0
hb, err := schedule.CreateScheduler(ScatterRangeType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"}))
c.Assert(err, IsNil)

scheduleAndApplyOperator(tc, hb, 100)
maxLeaderCount := 0
minLeaderCount := 99
for i := 1; i <= 5; i++ {
leaderCount := tc.Regions.GetStoreLeaderCount(uint64(i))
c.Check(leaderCount, LessEqual, 12)
if leaderCount < minLeaderCount {
minLeaderCount = leaderCount
}
if leaderCount > maxLeaderCount {
maxLeaderCount = leaderCount
}
regionCount := tc.Regions.GetStoreRegionCount(uint64(i))
c.Check(regionCount, LessEqual, 32)
}
c.Check(maxLeaderCount-minLeaderCount, Greater, 10)
}

func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
Expand Down Expand Up @@ -1344,9 +1407,14 @@ func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
hb, err := schedule.CreateScheduler(ScatterRangeType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_09", "t"}))
c.Assert(err, IsNil)

scheduleAndApplyOperator(tc, hb, 100)
}

// scheduleAndApplyOperator will try to schedule for `count` times and apply the operator if the operator is created.
func scheduleAndApplyOperator(tc *mockcluster.Cluster, hb schedule.Scheduler, count int) {
limit := 0
for {
if limit > 100 {
if limit > count {
break
}
ops := hb.Schedule(tc)
Expand Down
54 changes: 36 additions & 18 deletions server/schedulers/scatter_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,18 @@ func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) {
}

func (l *scatterRangeScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return l.allowBalanceLeader(cluster) || l.allowBalanceRegion(cluster)
}

func (l *scatterRangeScheduler) allowBalanceLeader(cluster opt.Cluster) bool {
allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetLeaderScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpLeader.String()).Inc()
}
return allowed
}

func (l *scatterRangeScheduler) allowBalanceRegion(cluster opt.Cluster) bool {
allowed := l.OpController.OperatorCount(operator.OpRange) < cluster.GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpRegion.String()).Inc()
Expand All @@ -205,26 +217,32 @@ func (l *scatterRangeScheduler) Schedule(cluster opt.Cluster) []*operator.Operat
// isolate a new cluster according to the key range
c := schedule.GenRangeCluster(cluster, l.config.GetStartKey(), l.config.GetEndKey())
c.SetTolerantSizeRatio(2)
ops := l.balanceLeader.Schedule(c)
if len(ops) > 0 {
ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", l.config.RangeName))
ops[0].AttachKind(operator.OpRange)
ops[0].Counters = append(ops[0].Counters,
schedulerCounter.WithLabelValues(l.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(l.GetName(), "new-leader-operator"))
return ops
if l.allowBalanceLeader(cluster) {
ops := l.balanceLeader.Schedule(c)
if len(ops) > 0 {
ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", l.config.RangeName))
ops[0].AttachKind(operator.OpRange)
ops[0].Counters = append(ops[0].Counters,
schedulerCounter.WithLabelValues(l.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(l.GetName(), "new-leader-operator"))
return ops
}
schedulerCounter.WithLabelValues(l.GetName(), "no-need-balance-leader").Inc()
}
ops = l.balanceRegion.Schedule(c)
if len(ops) > 0 {
ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", l.config.RangeName))
ops[0].AttachKind(operator.OpRange)
ops[0].Counters = append(ops[0].Counters,
schedulerCounter.WithLabelValues(l.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(l.GetName(), "new-region-operator"),
)
return ops
if l.allowBalanceRegion(cluster) {
ops := l.balanceRegion.Schedule(c)
if len(ops) > 0 {
ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", l.config.RangeName))
ops[0].AttachKind(operator.OpRange)
ops[0].Counters = append(ops[0].Counters,
schedulerCounter.WithLabelValues(l.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(l.GetName(), "new-region-operator"),
)
return ops
}
schedulerCounter.WithLabelValues(l.GetName(), "no-need-balance-region").Inc()
}
schedulerCounter.WithLabelValues(l.GetName(), "no-need").Inc()

return nil
}

Expand Down

0 comments on commit 2417ff1

Please sign in to comment.