Skip to content

Commit

Permalink
scheduler: support range for schedulers (#1791)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and sre-bot committed Nov 6, 2019
1 parent b7d614e commit cb56454
Show file tree
Hide file tree
Showing 21 changed files with 447 additions and 144 deletions.
16 changes: 8 additions & 8 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,23 +628,23 @@ func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo {
}

// RandLeaderRegion returns a random region that has leader on the store.
func (c *RaftCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLeaderRegion(storeID, opts...)
func (c *RaftCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLeaderRegion(storeID, ranges, opts...)
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (c *RaftCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandFollowerRegion(storeID, opts...)
func (c *RaftCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandFollowerRegion(storeID, ranges, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (c *RaftCluster) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandPendingRegion(storeID, opts...)
func (c *RaftCluster) RandPendingRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandPendingRegion(storeID, ranges, opts...)
}

// RandLearnerRegion returns a random region that has a learner peer on the store.
func (c *RaftCluster) RandLearnerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLearnerRegion(storeID, opts...)
func (c *RaftCluster) RandLearnerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLearnerRegion(storeID, ranges, opts...)
}

// RandHotRegionFromStore randomly picks a hot region in specified store.
Expand Down
10 changes: 5 additions & 5 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,10 @@ func (s *testRegionsInfoSuite) Test(c *C) {
}

for i := uint64(0); i < n; i++ {
region := cluster.RandLeaderRegion(i, core.HealthRegion())
region := cache.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")})
c.Assert(region.GetLeader().GetStoreId(), Equals, i)

region = cluster.RandFollowerRegion(i, core.HealthRegion())
region = cache.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")})
c.Assert(region.GetLeader().GetStoreId(), Not(Equals), i)

c.Assert(region.GetStorePeer(i), NotNil)
Expand All @@ -859,14 +859,14 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// All regions will be filtered out if they have pending peers.
for i := uint64(0); i < n; i++ {
for j := 0; j < cache.GetStoreLeaderCount(i); j++ {
region := cluster.RandLeaderRegion(i, core.HealthRegion())
region := cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion())
newRegion := region.Clone(core.WithPendingPeers(region.GetPeers()))
cache.SetRegion(newRegion)
}
c.Assert(cluster.RandLeaderRegion(i, core.HealthRegion()), IsNil)
c.Assert(cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil)
}
for i := uint64(0); i < n; i++ {
c.Assert(cluster.RandFollowerRegion(i, core.HealthRegion()), IsNil)
c.Assert(cluster.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil)
}
}

Expand Down
14 changes: 7 additions & 7 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,13 +715,14 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
co.run()
storage = tc.RaftCluster.storage
c.Assert(co.schedulers, HasLen, 3)
bls, err := schedule.CreateScheduler("balance-leader", oc, storage, nil)
bls, err := schedule.CreateScheduler("balance-leader", oc, storage, schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
c.Assert(co.addScheduler(bls), IsNil)
brs, err := schedule.CreateScheduler("balance-region", oc, storage, nil)
brs, err := schedule.CreateScheduler("balance-region", oc, storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)
c.Assert(co.addScheduler(brs), IsNil)
c.Assert(co.schedulers, HasLen, 5)

// the scheduler option should contain 7 items
// the `hot scheduler` and `label scheduler` are disabled
c.Assert(co.cluster.opt.GetSchedulers(), HasLen, 7)
Expand All @@ -732,7 +733,6 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
c.Assert(co.cluster.opt.Persist(co.cluster.storage), IsNil)
co.stop()
co.wg.Wait()

_, newOpt, err = newTestScheduleConfig()
c.Assert(err, IsNil)
c.Assert(newOpt.Reload(co.cluster.storage), IsNil)
Expand Down Expand Up @@ -931,7 +931,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
}, nil, nil, c)
defer cleanup()
oc := co.opController
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, nil)
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)
c.Assert(tc.addRegionStore(4, 100), IsNil)
c.Assert(tc.addRegionStore(3, 100), IsNil)
Expand Down Expand Up @@ -971,7 +971,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) {
}, nil, nil, c)
defer cleanup()
oc := co.opController
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, nil)
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

c.Assert(tc.addRegionStore(4, 100), IsNil)
Expand Down Expand Up @@ -1031,7 +1031,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {

c.Assert(tc.addLeaderRegion(1, 1), IsNil)
c.Assert(tc.addLeaderRegion(2, 2), IsNil)
scheduler, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), nil)
scheduler, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
lb := &mockLimitScheduler{
Scheduler: scheduler,
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
_, co, cleanup := prepare(nil, nil, nil, c)
defer cleanup()

lb, err := schedule.CreateScheduler("balance-leader", co.opController, core.NewStorage(kv.NewMemoryKV()), nil)
lb, err := schedule.CreateScheduler("balance-leader", co.opController, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
sc := newScheduleController(co, lb)

Expand Down
41 changes: 29 additions & 12 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,39 +155,42 @@ func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regio
const randomRegionMaxRetry = 10

// RandFollowerRegion returns a random region that has a follower on the store.
func (bc *BasicCluster) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
regions := bc.Regions.RandFollowerRegions(storeID, randomRegionMaxRetry)
regions := bc.Regions.RandFollowerRegions(storeID, ranges, randomRegionMaxRetry)
bc.RUnlock()
return bc.selectRegion(regions, opts...)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
regions := bc.Regions.RandLeaderRegions(storeID, randomRegionMaxRetry)
regions := bc.Regions.RandLeaderRegions(storeID, ranges, randomRegionMaxRetry)
bc.RUnlock()
return bc.selectRegion(regions, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
regions := bc.Regions.RandPendingRegions(storeID, randomRegionMaxRetry)
regions := bc.Regions.RandPendingRegions(storeID, ranges, randomRegionMaxRetry)
bc.RUnlock()
return bc.selectRegion(regions, opts...)
}

// RandLearnerRegion returns a random region that has a learner peer on the store.
func (bc *BasicCluster) RandLearnerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandLearnerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
bc.RLock()
regions := bc.Regions.RandLearnerRegions(storeID, randomRegionMaxRetry)
regions := bc.Regions.RandLearnerRegions(storeID, ranges, randomRegionMaxRetry)
bc.RUnlock()
return bc.selectRegion(regions, opts...)
}

func (bc *BasicCluster) selectRegion(regions []*RegionInfo, opts ...RegionOption) *RegionInfo {
for _, r := range regions {
if r == nil {
break
}
if slice.AllOf(opts, func(i int) bool { return opts[i](r) }) {
return r
}
Expand Down Expand Up @@ -331,10 +334,10 @@ func (bc *BasicCluster) Length() int {

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandLearnerRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
RandLearnerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *RegionInfo
Expand All @@ -359,3 +362,17 @@ type StoreSetController interface {

AttachAvailableFunc(id uint64, f func() bool)
}

// KeyRange is a key range.
type KeyRange struct {
StartKey []byte `json:"start-key"`
EndKey []byte `json:"end-key"`
}

// NewKeyRange create a KeyRange with the given start key and end key.
func NewKeyRange(startKey, endKey string) KeyRange {
return KeyRange{
StartKey: []byte(startKey),
EndKey: []byte(endKey),
}
}
58 changes: 41 additions & 17 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"math/rand"
"reflect"
"strings"

Expand Down Expand Up @@ -507,9 +508,14 @@ func (rst *regionSubTree) RandomRegions(n int, startKey, endKey []byte) []*Regio
if rst.length() == 0 {
return nil
}

regions := make([]*RegionInfo, 0, n)
for i := 0; i < n; i++ {
regions = append(regions, rst.regionTree.RandomRegion(startKey, endKey))
region := rst.regionTree.RandomRegion(startKey, endKey)
if region == nil || !isInvolved(region, startKey, endKey) {
continue
}
regions = append(regions, region)
}
return regions
}
Expand Down Expand Up @@ -742,43 +748,57 @@ func (r *RegionsInfo) GetStoreLearnerCount(storeID uint64) int {
}

// RandPendingRegion randomly gets a store's region with a pending peer.
func (r *RegionsInfo) RandPendingRegion(storeID uint64) *RegionInfo {
return r.pendingPeers[storeID].RandomRegion(nil, nil)
func (r *RegionsInfo) RandPendingRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.pendingPeers[storeID].RandomRegion(startKey, endKey)
}

// RandPendingRegions randomly gets a store's n regions with a pending peer.
func (r *RegionsInfo) RandPendingRegions(storeID uint64, n int) []*RegionInfo {
return r.pendingPeers[storeID].RandomRegions(n, nil, nil)
func (r *RegionsInfo) RandPendingRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.pendingPeers[storeID].RandomRegions(n, startKey, endKey)
}

// RandLeaderRegion randomly gets a store's leader region.
func (r *RegionsInfo) RandLeaderRegion(storeID uint64) *RegionInfo {
return r.leaders[storeID].RandomRegion(nil, nil)
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.leaders[storeID].RandomRegion(startKey, endKey)
}

// RandLeaderRegions randomly gets a store's n leader regions.
func (r *RegionsInfo) RandLeaderRegions(storeID uint64, n int) []*RegionInfo {
return r.leaders[storeID].RandomRegions(n, nil, nil)
func (r *RegionsInfo) RandLeaderRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.leaders[storeID].RandomRegions(n, startKey, endKey)
}

// RandFollowerRegion randomly gets a store's follower region.
func (r *RegionsInfo) RandFollowerRegion(storeID uint64) *RegionInfo {
return r.followers[storeID].RandomRegion(nil, nil)
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.followers[storeID].RandomRegion(startKey, endKey)
}

// RandFollowerRegions randomly gets a store's n follower regions.
func (r *RegionsInfo) RandFollowerRegions(storeID uint64, n int) []*RegionInfo {
return r.followers[storeID].RandomRegions(n, nil, nil)
func (r *RegionsInfo) RandFollowerRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.followers[storeID].RandomRegions(n, startKey, endKey)
}

// RandLearnerRegion randomly gets a store's learner region.
func (r *RegionsInfo) RandLearnerRegion(storeID uint64) *RegionInfo {
return r.learners[storeID].RandomRegion(nil, nil)
func (r *RegionsInfo) RandLearnerRegion(storeID uint64, ranges []KeyRange) *RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.learners[storeID].RandomRegion(startKey, endKey)
}

// RandLearnerRegions randomly gets a store's n learner regions.
func (r *RegionsInfo) RandLearnerRegions(storeID uint64, n int) []*RegionInfo {
return r.learners[storeID].RandomRegions(n, nil, nil)
func (r *RegionsInfo) RandLearnerRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo {
startKey, endKey := r.GetKeys(ranges)
return r.learners[storeID].RandomRegions(n, startKey, endKey)
}

// GetKeys gets the start key and end key from random key range.
func (r *RegionsInfo) GetKeys(ranges []KeyRange) ([]byte, []byte) {
idx := rand.Intn(len(ranges))
return ranges[idx].StartKey, ranges[idx].EndKey
}

// GetLeader return leader RegionInfo by storeID and regionID(now only used in test)
Expand Down Expand Up @@ -883,6 +903,10 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string {
return strings.Join(ret, ", ")
}

func isInvolved(region *RegionInfo, startKey, endKey []byte) bool {
return bytes.Compare(region.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(region.GetEndKey()) > 0 && bytes.Compare(region.GetEndKey(), endKey) <= 0))
}

// HexRegionKey converts region key to hex format. Used for formating region in
// logs.
func HexRegionKey(key []byte) []byte {
Expand Down
2 changes: 1 addition & 1 deletion server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func BenchmarkRandomRegion(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
regions.RandLeaderRegion(1)
regions.RandLeaderRegion(1, []KeyRange{NewKeyRange("", "")})
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/schedule/range_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ func (r *RangeCluster) GetTolerantSizeRatio() float64 {
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (r *RangeCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return r.subCluster.RandFollowerRegion(storeID, opts...)
func (r *RangeCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return r.subCluster.RandFollowerRegion(storeID, ranges, opts...)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (r *RangeCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return r.subCluster.RandLeaderRegion(storeID, opts...)
func (r *RangeCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return r.subCluster.RandLeaderRegion(storeID, ranges, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
Expand Down
4 changes: 3 additions & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func init() {
}
conf.LeaderLimit = defaultAdjacentLeaderLimit
conf.PeerLimit = defaultAdjacentPeerLimit
conf.Name = balanceAdjacentRegionName
return nil
}
})
Expand All @@ -77,6 +78,7 @@ func init() {
}

type balanceAdjacentRegionConfig struct {
Name string `json:"name"`
LeaderLimit uint64 `json:"leader-limit"`
PeerLimit uint64 `json:"peer-limit"`
}
Expand Down Expand Up @@ -128,7 +130,7 @@ func newBalanceAdjacentRegionScheduler(opController *schedule.OperatorController
}

func (l *balanceAdjacentRegionScheduler) GetName() string {
return balanceAdjacentRegionName
return l.conf.Name
}

func (l *balanceAdjacentRegionScheduler) GetType() string {
Expand Down
Loading

0 comments on commit cb56454

Please sign in to comment.