Skip to content

Commit

Permalink
scheduler: support priority for each dimension in hot scheduling (#3920)
Browse files Browse the repository at this point in the history
* support priority

Signed-off-by: yisaer <disxiaofei@163.com>

* support priority

Signed-off-by: yisaer <disxiaofei@163.com>

* support priority

Signed-off-by: yisaer <disxiaofei@163.com>

* support priority

Signed-off-by: yisaer <disxiaofei@163.com>

* fix test

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

* address the comment

Signed-off-by: yisaer <disxiaofei@163.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Yisaer and ti-chi-bot authored Jul 29, 2021
1 parent 78460ca commit fa0ada7
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 81 deletions.
214 changes: 135 additions & 79 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,11 @@ type balanceSolver struct {

cur *solution

maxSrc *storeLoad
minDst *storeLoad
rankStep *storeLoad
maxSrc *storeLoad
minDst *storeLoad
rankStep *storeLoad
firstPriority int
secondPriority int

byteIsBetter bool
keyIsBetter bool
Expand Down Expand Up @@ -472,6 +474,8 @@ func (bs *balanceSolver) init() {
Loads: stepLoads,
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}
bs.firstPriority = stringToDim(bs.preferPriority()[0])
bs.secondPriority = stringToDim(bs.preferPriority()[1])
}

func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver {
Expand Down Expand Up @@ -555,20 +559,25 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
continue
}
minLoad := detail.LoadPred.min()
if slice.AllOf(minLoad.Loads, func(i int) bool {
if statistics.IsSelectedDim(i) {
return minLoad.Loads[i] > bs.sche.conf.GetSrcToleranceRatio()*detail.LoadPred.Expect.Loads[i]
}
return true
}) {
if bs.checkSrcByDimPriorityAndTolerance(minLoad, &detail.LoadPred.Expect) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
}
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
}
return ret
}

func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *storeLoad) bool {
return slice.AllOf(minLoad.Loads, func(i int) bool {
if statistics.IsSelectedDim(i) {
return minLoad.Loads[i] > bs.sche.conf.GetSrcToleranceRatio()*expectLoad.Loads[i]
}
return true
})
}

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
Expand Down Expand Up @@ -602,32 +611,31 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
}

func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat, maxPeerNum int) map[*statistics.HotPeerStat]struct{} {
byteSort := make([]*statistics.HotPeerStat, len(ret))
copy(byteSort, ret)
sort.Slice(byteSort, func(i, j int) bool {
k := getRegionStatKind(bs.rwTy, statistics.ByteDim)
return byteSort[i].GetLoad(k) > byteSort[j].GetLoad(k)
firstSort := make([]*statistics.HotPeerStat, len(ret))
copy(firstSort, ret)
sort.Slice(firstSort, func(i, j int) bool {
k := getRegionStatKind(bs.rwTy, bs.firstPriority)
return firstSort[i].GetLoad(k) > firstSort[j].GetLoad(k)
})
keySort := make([]*statistics.HotPeerStat, len(ret))
copy(keySort, ret)
sort.Slice(keySort, func(i, j int) bool {
k := getRegionStatKind(bs.rwTy, statistics.KeyDim)
return keySort[i].GetLoad(k) > keySort[j].GetLoad(k)
secondSort := make([]*statistics.HotPeerStat, len(ret))
copy(secondSort, ret)
sort.Slice(secondSort, func(i, j int) bool {
k := getRegionStatKind(bs.rwTy, bs.secondPriority)
return secondSort[i].GetLoad(k) > secondSort[j].GetLoad(k)
})

union := make(map[*statistics.HotPeerStat]struct{}, maxPeerNum)
for len(union) < maxPeerNum {
for len(byteSort) > 0 {
peer := byteSort[0]
byteSort = byteSort[1:]
for len(firstSort) > 0 {
peer := firstSort[0]
firstSort = firstSort[1:]
if _, ok := union[peer]; !ok {
union[peer] = struct{}{}
break
}
}
for len(union) < maxPeerNum && len(keySort) > 0 {
peer := keySort[0]
keySort = keySort[1:]
for len(union) < maxPeerNum && len(secondSort) > 0 {
peer := secondSort[0]
secondSort = secondSort[1:]
if _, ok := union[peer]; !ok {
union[peer] = struct{}{}
break
Expand Down Expand Up @@ -736,26 +744,32 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, detail := range candidates {
store := detail.Store
if filter.Target(bs.cluster.GetOpts(), store, filters) {
maxLoads := detail.LoadPred.max().Loads
if slice.AllOf(maxLoads, func(i int) bool {
if statistics.IsSelectedDim(i) {
return maxLoads[i]*dstToleranceRatio < detail.LoadPred.Expect.Loads[i]
}
return true
}) {
ret[store.GetID()] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc()
id := detail.Store.GetID()
maxLoad := detail.LoadPred.max()
if filter.Target(bs.cluster.GetOpts(), detail.Store, filters) {
if bs.checkDstByPriorityAndTolerance(maxLoad, &detail.LoadPred.Expect) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
}
hotSchedulerResultCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc()
}

}
return ret
}

func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *storeLoad) bool {
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
return slice.AllOf(maxLoad.Loads, func(i int) bool {
if statistics.IsSelectedDim(i) {
return maxLoad.Loads[i]*dstToleranceRatio < expect.Loads[i]
}
return true
})
}

// calcProgressiveRank calculates `bs.cur.progressiveRank`.
// See the comments of `solution.progressiveRank` for more about progressive rank.
func (bs *balanceSolver) calcProgressiveRank() {
Expand All @@ -773,37 +787,19 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank = -1
}
} else {
// we use DecRatio(Decline Ratio) to expect that the dst store's (key/byte) rate should still be less
// than the src store's (key/byte) rate after scheduling one peer.
getSrcDecRate := func(a, b float64) float64 {
if a-b <= 0 {
return 1
}
return a - b
}
checkHot := func(dim int) (bool, float64) {
srcRate := srcLd.Loads[dim]
dstRate := dstLd.Loads[dim]
peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, dim))
decRatio := (dstRate + peerRate) / getSrcDecRate(srcRate, peerRate)
isHot := peerRate >= bs.getMinRate(dim)
return isHot, decRatio
}
keyHot, keyDecRatio := checkHot(statistics.KeyDim)
byteHot, byteDecRatio := checkHot(statistics.ByteDim)

firstPriorityDimHot, firstPriorityDecRatio, secondPriorityDimHot, secondPriorityDecRatio := bs.getHotDecRatioByPriorities(srcLd, dstLd, peer)
greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio()
switch {
case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio:
case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio:
// If belong to the case, both byte rate and key rate will be more balanced, the best choice.
rank = -3
bs.byteIsBetter = true
bs.keyIsBetter = true
case byteDecRatio <= minorDecRatio && keyHot && keyDecRatio <= greatDecRatio:
case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be not worsened, key rate will be more balanced.
rank = -2
bs.keyIsBetter = true
case byteHot && byteDecRatio <= greatDecRatio:
case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be more balanced, ignore the key rate.
rank = -1
bs.byteIsBetter = true
Expand All @@ -817,6 +813,28 @@ func (bs *balanceSolver) calcProgressiveRank() {
bs.cur.progressiveRank = rank
}

func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *storeLoad, peer *statistics.HotPeerStat) (bool, float64, bool, float64) {
// we use DecRatio(Decline Ratio) to expect that the dst store's (key/byte) rate should still be less
// than the src store's (key/byte) rate after scheduling one peer.
getSrcDecRate := func(a, b float64) float64 {
if a-b <= 0 {
return 1
}
return a - b
}
checkHot := func(dim int) (bool, float64) {
srcRate := srcLd.Loads[dim]
dstRate := dstLd.Loads[dim]
peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, dim))
decRatio := (dstRate + peerRate) / getSrcDecRate(srcRate, peerRate)
isHot := peerRate >= bs.getMinRate(dim)
return isHot, decRatio
}
firstHot, firstDecRatio := checkHot(bs.firstPriority)
secondHot, secondDecRatio := checkHot(bs.secondPriority)
return firstHot, firstDecRatio, secondHot, secondDecRatio
}

func (bs *balanceSolver) getMinRate(dim int) float64 {
switch dim {
case statistics.KeyDim:
Expand Down Expand Up @@ -863,28 +881,25 @@ func (bs *balanceSolver) betterThan(old *solution) bool {
return false
}
} else {
bk, kk := getRegionStatKind(bs.rwTy, statistics.ByteDim), getRegionStatKind(bs.rwTy, statistics.KeyDim)
byteRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(bk), old.srcPeerStat.GetLoad(bk), stepRank(0, 100))
keyRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(kk), old.srcPeerStat.GetLoad(kk), stepRank(0, 10))

firstCmp, secondCmp := bs.getRkCmpPriorities(old)
switch bs.cur.progressiveRank {
case -2: // greatDecRatio < byteDecRatio <= minorDecRatio && keyDecRatio <= greatDecRatio
if keyRkCmp != 0 {
return keyRkCmp > 0
if secondCmp != 0 {
return secondCmp > 0
}
if byteRkCmp != 0 {
if firstCmp != 0 {
// prefer smaller byte rate, to reduce oscillation
return byteRkCmp < 0
return firstCmp < 0
}
case -3: // byteDecRatio <= greatDecRatio && keyDecRatio <= greatDecRatio
if keyRkCmp != 0 {
return keyRkCmp > 0
if secondCmp != 0 {
return secondCmp > 0
}
fallthrough
case -1: // byteDecRatio <= greatDecRatio
if byteRkCmp != 0 {
if firstCmp != 0 {
// prefer region with larger byte rate, to converge faster
return byteRkCmp > 0
return firstCmp > 0
}
}
}
Expand All @@ -893,6 +908,22 @@ func (bs *balanceSolver) betterThan(old *solution) bool {
return false
}

func (bs *balanceSolver) getRkCmpPriorities(old *solution) (firstCmp int, secondCmp int) {
fk, sk := getRegionStatKind(bs.rwTy, bs.firstPriority), getRegionStatKind(bs.rwTy, bs.secondPriority)
dimToStep := func(priority int) float64 {
switch priority {
case statistics.ByteDim:
return 100
case statistics.KeyDim:
return 10
}
return 100
}
fkRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(fk), old.srcPeerStat.GetLoad(fk), stepRank(0, dimToStep(bs.firstPriority)))
skRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(sk), old.srcPeerStat.GetLoad(sk), stepRank(0, dimToStep(bs.secondPriority)))
return fkRkCmp, skRkCmp
}

// smaller is better
func (bs *balanceSolver) compareSrcStore(st1, st2 uint64) int {
if st1 != st2 {
Expand All @@ -911,14 +942,21 @@ func (bs *balanceSolver) compareSrcStore(st1, st2 uint64) int {
)),
)
} else {
f := func(dim int) func(ld *storeLoad) float64 {
switch dim {
case statistics.ByteDim:
return stLdByteRate
case statistics.KeyDim:
return stLdKeyRate
}
return stLdByteRate
}
lpCmp = sliceLPCmp(
minLPCmp(negLoadCmp(sliceLoadCmp(
stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])),
stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])),
stLdRankCmp(f(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
stLdRankCmp(f(bs.secondPriority), stepRank(bs.maxSrc.Loads[bs.secondPriority], bs.rankStep.Loads[bs.secondPriority])),
))),
diffCmp(
stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])),
),
diffCmp(stLdRankCmp(f(bs.firstPriority), stepRank(0, bs.rankStep.Loads[bs.firstPriority]))),
)
}
lp1 := bs.stLoadDetail[st1].LoadPred
Expand Down Expand Up @@ -1212,3 +1250,21 @@ func getRegionStatKind(rwTy rwType, dim int) statistics.RegionStatKind {
}
return 0
}

func (bs *balanceSolver) preferPriority() [2]string {
priorities := bs.sche.conf.WritePriorities
if bs.rwTy == read {
priorities = bs.sche.conf.ReadPriorities
}
return [2]string{priorities[0], priorities[1]}
}

func stringToDim(name string) int {
switch name {
case BytePriority:
return statistics.ByteDim
case KeyPriority:
return statistics.KeyDim
}
return statistics.ByteDim
}
63 changes: 63 additions & 0 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,3 +1442,66 @@ func (s *testHotSchedulerSuite) TestHotReadPeerSchedule(c *C) {
op := hb.Schedule(tc)[0]
testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 4)
}

func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05)

tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
tc.DisableFeature(versioninfo.JointConsensus)
tc.AddRegionStore(1, 20)
tc.AddRegionStore(2, 20)
tc.AddRegionStore(3, 20)
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)

tc.UpdateStorageWrittenStats(1, 10*MB*statistics.StoreHeartBeatReportInterval, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 9*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
// must transfer peer
schedulePeerPr = 1.0
addRegionInfo(tc, write, []testRegionInfo{
{1, []uint64{1, 2, 3}, 2 * MB, 1 * MB},
{6, []uint64{4, 2, 3}, 1 * MB, 2 * MB},
})
hb.(*hotScheduler).conf.WritePriorities = []string{BytePriority, KeyPriority}
ops := hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 5)
hb.(*hotScheduler).clearPendingInfluence()
hb.(*hotScheduler).conf.WritePriorities = []string{KeyPriority, BytePriority}
ops = hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5)
hb.(*hotScheduler).clearPendingInfluence()

// assert read priority schedule
hb, err = schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
tc.UpdateStorageReadStats(5, 10*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(4, 10*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(1, 10*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(2, 1*MB*statistics.StoreHeartBeatReportInterval, 7*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(3, 7*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
addRegionInfo(tc, read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 2 * MB, 2 * MB},
})
hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority}
ops = hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 2)
hb.(*hotScheduler).clearPendingInfluence()
hb.(*hotScheduler).conf.ReadPriorities = []string{KeyPriority, BytePriority}
ops = hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 3)
}
Loading

0 comments on commit fa0ada7

Please sign in to comment.