From bc2a79d76b20480592709be9807a626ad56cc0e5 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 28 Jul 2021 21:16:17 +0800 Subject: [PATCH 01/10] update Signed-off-by: lhy1024 --- server/schedulers/hot_region.go | 184 +++++++++++++----------- server/schedulers/hot_region_config.go | 10 ++ server/schedulers/utils.go | 18 ++- server/statistics/hot_peer.go | 26 ++++ tools/pd-ctl/pdctl/command/scheduler.go | 8 +- 5 files changed, 156 insertions(+), 90 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index fab9f28ecc6..f560b1fcc74 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -219,7 +219,7 @@ func (h *hotScheduler) gcRegionPendings() { } // summaryStoresLoad Load information of all available stores. -// it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store +// it will filtered the hot peer and calculate the current and future stat(rate,count) for each store func summaryStoresLoad( stores []*core.StoreInfo, storesLoads map[uint64][]float64, @@ -228,7 +228,7 @@ func summaryStoresLoad( rwTy rwType, kind core.ResourceKind, ) map[uint64]*storeLoadDetail { - // loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count) + // loadDetail stores the storeID -> hotPeers stat and its current and future stat(rate,count) loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads)) allLoadSum := make([]float64, statistics.DimLen) allCount := 0.0 @@ -418,8 +418,14 @@ type balanceSolver struct { minDst *storeLoad rankStep *storeLoad - byteIsBetter bool - keyIsBetter bool + secondPriority int + firstPriority int + cpuPriority int + anotherPriority int + isSelectedDim func(int) bool + + firstPriorityIsBetter bool + secondPriorityIsBetter bool } type solution struct { @@ -472,6 +478,24 @@ func (bs *balanceSolver) init() { Loads: stepLoads, Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), } + + priorities := bs.sche.conf.WritePriorities + if bs.rwTy == read { + priorities = bs.sche.conf.ReadPriorities + } + + bs.firstPriority = statistics.StringToDim(priorities[0]) + bs.secondPriority = statistics.StringToDim(priorities[1]) + bs.isSelectedDim = func(dim int) bool { + return dim == bs.firstPriority || dim == bs.secondPriority + } + + // write leader + bs.cpuPriority = statistics.KeyDim + if bs.firstPriority == statistics.QueryDim { + bs.cpuPriority = statistics.QueryDim + } + bs.anotherPriority = statistics.ByteDim } func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver { @@ -556,7 +580,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { } minLoad := detail.LoadPred.min() if slice.AllOf(minLoad.Loads, func(i int) bool { - if statistics.IsSelectedDim(i) { + if bs.isSelectedDim(i) { return minLoad.Loads[i] > bs.sche.conf.GetSrcToleranceRatio()*detail.LoadPred.Expect.Loads[i] } return true @@ -602,32 +626,32 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { } func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat, maxPeerNum int) map[*statistics.HotPeerStat]struct{} { - byteSort := make([]*statistics.HotPeerStat, len(ret)) - copy(byteSort, ret) - sort.Slice(byteSort, func(i, j int) bool { + firstPrioritySort := make([]*statistics.HotPeerStat, len(ret)) + copy(firstPrioritySort, ret) + sort.Slice(firstPrioritySort, func(i, j int) bool { k := getRegionStatKind(bs.rwTy, statistics.ByteDim) - return byteSort[i].GetLoad(k) > byteSort[j].GetLoad(k) + return firstPrioritySort[i].GetLoad(k) > firstPrioritySort[j].GetLoad(k) }) - keySort := make([]*statistics.HotPeerStat, len(ret)) - copy(keySort, ret) - sort.Slice(keySort, func(i, j int) bool { + secondPrioritySort := make([]*statistics.HotPeerStat, len(ret)) + copy(secondPrioritySort, ret) + sort.Slice(secondPrioritySort, func(i, j int) bool { k := getRegionStatKind(bs.rwTy, statistics.KeyDim) - return keySort[i].GetLoad(k) > keySort[j].GetLoad(k) + return secondPrioritySort[i].GetLoad(k) > secondPrioritySort[j].GetLoad(k) }) union := make(map[*statistics.HotPeerStat]struct{}, maxPeerNum) for len(union) < maxPeerNum { - for len(byteSort) > 0 { - peer := byteSort[0] - byteSort = byteSort[1:] + for len(firstPrioritySort) > 0 { + peer := firstPrioritySort[0] + firstPrioritySort = firstPrioritySort[1:] if _, ok := union[peer]; !ok { union[peer] = struct{}{} break } } - for len(union) < maxPeerNum && len(keySort) > 0 { - peer := keySort[0] - keySort = keySort[1:] + for len(union) < maxPeerNum && len(secondPrioritySort) > 0 { + peer := secondPrioritySort[0] + secondPrioritySort = secondPrioritySort[1:] if _, ok := union[peer]; !ok { union[peer] = struct{}{} break @@ -742,7 +766,7 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st if filter.Target(bs.cluster.GetOpts(), store, filters) { maxLoads := detail.LoadPred.max().Loads if slice.AllOf(maxLoads, func(i int) bool { - if statistics.IsSelectedDim(i) { + if bs.isSelectedDim(i) { return maxLoads[i]*dstToleranceRatio < detail.LoadPred.Expect.Loads[i] } return true @@ -766,15 +790,15 @@ func (bs *balanceSolver) calcProgressiveRank() { if bs.rwTy == write && bs.opTy == transferLeader { // In this condition, CPU usage is the matter. // Only consider about key rate. - srcKeyRate := srcLd.Loads[statistics.KeyDim] - dstKeyRate := dstLd.Loads[statistics.KeyDim] - peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, statistics.KeyDim)) + srcKeyRate := srcLd.Loads[bs.cpuPriority] + dstKeyRate := dstLd.Loads[bs.cpuPriority] + peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.cpuPriority)) if srcKeyRate-peerKeyRate >= dstKeyRate+peerKeyRate { rank = -1 } } else { - // we use DecRatio(Decline Ratio) to expect that the dst store's (key/byte) rate should still be less - // than the src store's (key/byte) rate after scheduling one peer. + // we use DecRatio(Decline Ratio) to expect that the dst store's (key/byte/query) rate should still be less + // than the src store's (key/byte/query) rate after scheduling one peer. getSrcDecRate := func(a, b float64) float64 { if a-b <= 0 { return 1 @@ -789,31 +813,26 @@ func (bs *balanceSolver) calcProgressiveRank() { isHot := peerRate >= bs.getMinRate(dim) return isHot, decRatio } - keyHot, keyDecRatio := checkHot(statistics.KeyDim) - byteHot, byteDecRatio := checkHot(statistics.ByteDim) + secondPriorityHot, secondPriorityDecRatio := checkHot(bs.secondPriority) + firstPriorityHot, firstPriorityDecRatio := checkHot(bs.firstPriority) greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio() switch { - case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio: - // If belong to the case, both byte rate and key rate will be more balanced, the best choice. + case firstPriorityHot && firstPriorityDecRatio <= greatDecRatio && secondPriorityHot && secondPriorityDecRatio <= greatDecRatio: + // If belong to the case, two dim will be more balanced, the best choice. rank = -3 - bs.byteIsBetter = true - bs.keyIsBetter = true - case byteDecRatio <= minorDecRatio && keyHot && keyDecRatio <= greatDecRatio: - // If belong to the case, byte rate will be not worsened, key rate will be more balanced. + bs.firstPriorityIsBetter = true + bs.secondPriorityIsBetter = true + case firstPriorityDecRatio <= minorDecRatio && secondPriorityHot && secondPriorityDecRatio <= greatDecRatio: + // If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced. rank = -2 - bs.keyIsBetter = true - case byteHot && byteDecRatio <= greatDecRatio: - // If belong to the case, byte rate will be more balanced, ignore the key rate. + bs.secondPriorityIsBetter = true + case firstPriorityHot && firstPriorityDecRatio <= greatDecRatio: + // If belong to the case, first priority dim will be more balanced, ignore the second priority dim. rank = -1 - bs.byteIsBetter = true + bs.firstPriorityIsBetter = true } } - log.Debug("calcProgressiveRank", - zap.Uint64("region-id", bs.cur.region.GetID()), - zap.Uint64("from-store-id", bs.cur.srcStoreID), - zap.Uint64("to-store-id", bs.cur.dstStoreID), - zap.Int64("rank", rank)) bs.cur.progressiveRank = rank } @@ -823,6 +842,8 @@ func (bs *balanceSolver) getMinRate(dim int) float64 { return bs.sche.conf.GetMinHotKeyRate() case statistics.ByteDim: return bs.sche.conf.GetMinHotByteRate() + case statistics.QueryDim: + return bs.sche.conf.GetMinHotQueryRate() } return -1 } @@ -856,35 +877,36 @@ func (bs *balanceSolver) betterThan(old *solution) bool { // compare region if bs.rwTy == write && bs.opTy == transferLeader { + kind := getRegionStatKind(write, bs.cpuPriority) switch { - case bs.cur.srcPeerStat.GetLoad(statistics.RegionWriteKeys) > old.srcPeerStat.GetLoad(statistics.RegionWriteKeys): + case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind): return true - case bs.cur.srcPeerStat.GetLoad(statistics.RegionWriteKeys) < old.srcPeerStat.GetLoad(statistics.RegionWriteKeys): + case bs.cur.srcPeerStat.GetLoad(kind) < old.srcPeerStat.GetLoad(kind): return false } } else { - bk, kk := getRegionStatKind(bs.rwTy, statistics.ByteDim), getRegionStatKind(bs.rwTy, statistics.KeyDim) - byteRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(bk), old.srcPeerStat.GetLoad(bk), stepRank(0, 100)) - keyRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(kk), old.srcPeerStat.GetLoad(kk), stepRank(0, 10)) + fk, sk := getRegionStatKind(bs.rwTy, bs.firstPriority), getRegionStatKind(bs.rwTy, bs.secondPriority) + firstPriorityRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(fk), old.srcPeerStat.GetLoad(fk), stepRank(0, 100)) + secondPriorityRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(sk), old.srcPeerStat.GetLoad(sk), stepRank(0, 10)) switch bs.cur.progressiveRank { - case -2: // greatDecRatio < byteDecRatio <= minorDecRatio && keyDecRatio <= greatDecRatio - if keyRkCmp != 0 { - return keyRkCmp > 0 + case -2: // greatDecRatio < firstPriorityDecRatio <= minorDecRatio && secondPriorityDecRatio <= greatDecRatio + if secondPriorityRkCmp != 0 { + return secondPriorityRkCmp > 0 } - if byteRkCmp != 0 { - // prefer smaller byte rate, to reduce oscillation - return byteRkCmp < 0 + if firstPriorityRkCmp != 0 { + // prefer smaller first priority rate, to reduce oscillation + return firstPriorityRkCmp < 0 } - case -3: // byteDecRatio <= greatDecRatio && keyDecRatio <= greatDecRatio - if keyRkCmp != 0 { - return keyRkCmp > 0 + case -3: // firstPriorityDecRatio <= greatDecRatio && secondPriorityDecRatio <= greatDecRatio + if secondPriorityRkCmp != 0 { + return secondPriorityRkCmp > 0 } fallthrough - case -1: // byteDecRatio <= greatDecRatio - if byteRkCmp != 0 { - // prefer region with larger byte rate, to converge faster - return byteRkCmp > 0 + case -1: // firstPriorityDecRatio <= greatDecRatio + if firstPriorityRkCmp != 0 { + // prefer region with larger first priority rate, to converge faster + return firstPriorityRkCmp > 0 } } } @@ -901,23 +923,23 @@ func (bs *balanceSolver) compareSrcStore(st1, st2 uint64) int { if bs.rwTy == write && bs.opTy == transferLeader { lpCmp = sliceLPCmp( minLPCmp(negLoadCmp(sliceLoadCmp( - stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])), - stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])), + stLdRankCmp(stLdRate(bs.cpuPriority), stepRank(bs.maxSrc.Loads[bs.cpuPriority], bs.rankStep.Loads[bs.cpuPriority])), + stLdRankCmp(stLdRate(bs.anotherPriority), stepRank(bs.maxSrc.Loads[bs.anotherPriority], bs.rankStep.Loads[bs.anotherPriority])), ))), diffCmp(sliceLoadCmp( stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)), - stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.Loads[statistics.KeyDim])), - stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])), + stLdRankCmp(stLdRate(bs.cpuPriority), stepRank(0, bs.rankStep.Loads[bs.cpuPriority])), + stLdRankCmp(stLdRate(bs.anotherPriority), stepRank(0, bs.rankStep.Loads[bs.anotherPriority])), )), ) } else { lpCmp = sliceLPCmp( minLPCmp(negLoadCmp(sliceLoadCmp( - stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])), - stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])), + stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])), + stLdRankCmp(stLdRate(bs.secondPriority), stepRank(bs.maxSrc.Loads[bs.secondPriority], bs.rankStep.Loads[bs.secondPriority])), ))), diffCmp( - stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])), + stLdRankCmp(stLdRate(bs.firstPriority), stepRank(0, bs.rankStep.Loads[bs.firstPriority])), ), ) } @@ -936,22 +958,22 @@ func (bs *balanceSolver) compareDstStore(st1, st2 uint64) int { if bs.rwTy == write && bs.opTy == transferLeader { lpCmp = sliceLPCmp( maxLPCmp(sliceLoadCmp( - stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])), - stLdRankCmp(stLdByteRate, stepRank(bs.minDst.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])), + stLdRankCmp(stLdRate(bs.cpuPriority), stepRank(bs.minDst.Loads[bs.cpuPriority], bs.rankStep.Loads[bs.cpuPriority])), + stLdRankCmp(stLdRate(bs.anotherPriority), stepRank(bs.minDst.Loads[bs.anotherPriority], bs.rankStep.Loads[bs.anotherPriority])), )), diffCmp(sliceLoadCmp( stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)), - stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.Loads[statistics.KeyDim])), - stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])), + stLdRankCmp(stLdRate(bs.cpuPriority), stepRank(0, bs.rankStep.Loads[bs.cpuPriority])), + stLdRankCmp(stLdRate(bs.anotherPriority), stepRank(0, bs.rankStep.Loads[bs.anotherPriority])), ))) } else { lpCmp = sliceLPCmp( maxLPCmp(sliceLoadCmp( - stLdRankCmp(stLdByteRate, stepRank(bs.minDst.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])), - stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])), + stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.minDst.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])), + stLdRankCmp(stLdRate(bs.secondPriority), stepRank(bs.minDst.Loads[bs.secondPriority], bs.rankStep.Loads[bs.secondPriority])), )), diffCmp( - stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])), + stLdRankCmp(stLdRate(bs.firstPriority), stepRank(0, bs.rankStep.Loads[bs.firstPriority])), ), ) } @@ -997,8 +1019,8 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence dstPeer := &metapb.Peer{StoreId: bs.cur.dstStoreID, Role: srcPeer.Role} sourceLabel = strconv.FormatUint(bs.cur.srcStoreID, 10) targetLabel = strconv.FormatUint(dstPeer.GetStoreId(), 10) - if bs.rwTy == read && bs.cur.region.GetLeader().StoreId == bs.cur.srcStoreID { // move read leader + typ = "move-leader" op, err = operator.CreateMoveLeaderOperator( "move-hot-read-leader", bs.cluster, @@ -1006,7 +1028,6 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence operator.OpHotRegion, bs.cur.srcStoreID, dstPeer) - typ = "move-leader" } else { desc := "move-hot-" + bs.rwTy.String() + "-peer" typ = "move-peer" @@ -1023,6 +1044,7 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence return nil, nil } desc := "transfer-hot-" + bs.rwTy.String() + "-leader" + typ = "transfer-leader" sourceLabel = strconv.FormatUint(bs.cur.srcStoreID, 10) targetLabel = strconv.FormatUint(bs.cur.dstStoreID, 10) op, err = operator.CreateTransferLeaderOperator( @@ -1041,12 +1063,12 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence } dim := "" - if bs.byteIsBetter && bs.keyIsBetter { + if bs.firstPriorityIsBetter && bs.secondPriorityIsBetter { dim = "both" - } else if bs.byteIsBetter { - dim = "byte" - } else if bs.keyIsBetter { - dim = "key" + } else if bs.firstPriorityIsBetter { + dim = statistics.DimToString(bs.firstPriority) + } else if bs.secondPriorityIsBetter { + dim = statistics.DimToString(bs.secondPriority) } op.SetPriorityLevel(core.HighPriority) diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index d06308ca0cd..bdc48aeed9e 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -35,6 +35,8 @@ const ( BytePriority = "byte" // KeyPriority indicates hot-region-scheduler prefer key dim KeyPriority = "key" + // QueryPriority indicates hot-region-scheduler prefer query dim + QueryPriority = "qps" ) // params about hot region. @@ -42,6 +44,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { return &hotRegionSchedulerConfig{ MinHotByteRate: 100, MinHotKeyRate: 10, + MinHotQueryRate: 10, MaxZombieRounds: 3, ByteRateRankStepRatio: 0.05, KeyRateRankStepRatio: 0.05, @@ -63,6 +66,7 @@ type hotRegionSchedulerConfig struct { MinHotByteRate float64 `json:"min-hot-byte-rate"` MinHotKeyRate float64 `json:"min-hot-key-rate"` + MinHotQueryRate float64 `json:"min-hot-query-rate"` MaxZombieRounds int `json:"max-zombie-rounds"` MaxPeerNum int `json:"max-peer-number"` @@ -170,6 +174,12 @@ func (conf *hotRegionSchedulerConfig) GetMinHotByteRate() float64 { return conf.MinHotByteRate } +func (conf *hotRegionSchedulerConfig) GetMinHotQueryRate() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.MinHotQueryRate +} + func (conf *hotRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { router := mux.NewRouter() router.HandleFunc("/list", conf.handleGetConfig).Methods("GET") diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 3d87e08ec1d..8c890c5f1d9 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -235,7 +235,7 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) } // summaryPendingInfluence calculate the summary pending Influence for each store and return storeID -> Influence -// It makes each key/byte rate or count become (1+w) times to the origin value while f is the function to provide w(weight) +// It makes each dim rate or count become (1+w) times to the origin value while f is the function to provide w(weight) func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]*Influence { ret := make(map[uint64]*Influence) for p := range pendings { @@ -270,9 +270,11 @@ func (load storeLoad) ToLoadPred(rwTy rwType, infl *Influence) *storeLoadPred { case read: future.Loads[statistics.ByteDim] += infl.Loads[statistics.RegionReadBytes] future.Loads[statistics.KeyDim] += infl.Loads[statistics.RegionReadKeys] + future.Loads[statistics.QueryDim] += infl.Loads[statistics.RegionReadQuery] case write: future.Loads[statistics.ByteDim] += infl.Loads[statistics.RegionWriteBytes] future.Loads[statistics.KeyDim] += infl.Loads[statistics.RegionWriteKeys] + future.Loads[statistics.QueryDim] += infl.Loads[statistics.RegionWriteQuery] } future.Count += infl.Count } @@ -282,12 +284,14 @@ func (load storeLoad) ToLoadPred(rwTy rwType, infl *Influence) *storeLoadPred { } } -func stLdByteRate(ld *storeLoad) float64 { - return ld.Loads[statistics.ByteDim] -} - -func stLdKeyRate(ld *storeLoad) float64 { - return ld.Loads[statistics.KeyDim] +func stLdRate(dim int) func(ld *storeLoad) float64 { + return func(ld *storeLoad) float64 { + switch dim { + case statistics.ByteDim, statistics.KeyDim, statistics.QueryDim: + return ld.Loads[dim] + } + return 0 + } } func stLdCount(ld *storeLoad) float64 { diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 00a666edb7b..9ad7c52253b 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -30,6 +30,32 @@ const ( DimLen ) +func DimToString(dim int) string { + switch dim { + case ByteDim: + return "byte" + case KeyDim: + return "key" + case QueryDim: + return "qps" + default: + return "" + } +} + +func StringToDim(name string) int { + switch name { + case DimToString(ByteDim): + return ByteDim + case DimToString(KeyDim): + return KeyDim + case DimToString(QueryDim): + return QueryDim + default: + return DimLen + } +} + // IsSelectedDim return whether the dim is selected for hot scheduler func IsSelectedDim(dim int) bool { // TODO: configure diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 43d09dd1327..820f9d7f073 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -532,9 +532,10 @@ func postSchedulerConfigCommandFunc(cmd *cobra.Command, schedulerName string, ar priorities := make([]string, 0) prioritiesMap := make(map[string]struct{}) for _, priority := range strings.Split(value, ",") { - if priority != schedulers.BytePriority && priority != schedulers.KeyPriority { - cmd.Println(fmt.Sprintf("priority should be one of [%s, %s]", + if priority != schedulers.BytePriority && priority != schedulers.KeyPriority && priority != schedulers.QueryPriority { + cmd.Println(fmt.Sprintf("priority should be one of [%s, %s, %s]", schedulers.BytePriority, + schedulers.QueryPriority, schedulers.KeyPriority)) return } @@ -546,6 +547,9 @@ func postSchedulerConfigCommandFunc(cmd *cobra.Command, schedulerName string, ar cmd.Println("priorities shouldn't be repeated") return } + if len(priorities) != 2 { + cmd.Println("priorities should be two dims") + } } else { input[key] = val } From 5a137d7d3f84f12f4458de1fc3bf68b4ec33abb5 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 28 Jul 2021 23:29:43 +0800 Subject: [PATCH 02/10] add test by yisaer Signed-off-by: lhy1024 --- server/schedulers/hot_region.go | 3 +- server/schedulers/hot_region_test.go | 63 +++++++++++++++++++++++++ server/statistics/hot_peer.go | 2 + tests/pdctl/scheduler/scheduler_test.go | 1 + 4 files changed, 68 insertions(+), 1 deletion(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index f560b1fcc74..f80c4df98c3 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -1019,8 +1019,8 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence dstPeer := &metapb.Peer{StoreId: bs.cur.dstStoreID, Role: srcPeer.Role} sourceLabel = strconv.FormatUint(bs.cur.srcStoreID, 10) targetLabel = strconv.FormatUint(dstPeer.GetStoreId(), 10) + if bs.rwTy == read && bs.cur.region.GetLeader().StoreId == bs.cur.srcStoreID { // move read leader - typ = "move-leader" op, err = operator.CreateMoveLeaderOperator( "move-hot-read-leader", bs.cluster, @@ -1028,6 +1028,7 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence operator.OpHotRegion, bs.cur.srcStoreID, dstPeer) + typ = "move-leader" } else { desc := "move-hot-" + bs.rwTy.String() + "-peer" typ = "move-peer" diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index 24448bc310f..c2ff9501ddd 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -1442,3 +1442,66 @@ func (s *testHotSchedulerSuite) TestHotReadPeerSchedule(c *C) { op := hb.Schedule(tc)[0] testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 4) } + +func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := config.NewTestOptions() + hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05) + hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05) + + tc := mockcluster.NewCluster(ctx, opt) + tc.SetHotRegionCacheHitsThreshold(0) + tc.DisableFeature(versioninfo.JointConsensus) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + tc.AddRegionStore(4, 20) + tc.AddRegionStore(5, 20) + + tc.UpdateStorageWrittenStats(1, 10*MB*statistics.StoreHeartBeatReportInterval, 9*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(2, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(3, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(4, 9*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(5, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval) + // must transfer peer + schedulePeerPr = 1.0 + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 2 * MB, 1 * MB}, + {6, []uint64{4, 2, 3}, 1 * MB, 2 * MB}, + }) + hb.(*hotScheduler).conf.WritePriorities = []string{BytePriority, KeyPriority} + ops := hb.Schedule(tc) + c.Assert(len(ops), Equals, 1) + testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 5) + hb.(*hotScheduler).clearPendingInfluence() + hb.(*hotScheduler).conf.WritePriorities = []string{KeyPriority, BytePriority} + ops = hb.Schedule(tc) + c.Assert(len(ops), Equals, 1) + testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5) + hb.(*hotScheduler).clearPendingInfluence() + + // assert read priority schedule + hb, err = schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + tc.UpdateStorageReadStats(5, 10*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(4, 10*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(1, 10*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(2, 1*MB*statistics.StoreHeartBeatReportInterval, 7*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(3, 7*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval) + addRegionInfo(tc, read, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 2 * MB, 2 * MB}, + }) + hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority} + ops = hb.Schedule(tc) + c.Assert(len(ops), Equals, 1) + testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 2) + hb.(*hotScheduler).clearPendingInfluence() + hb.(*hotScheduler).conf.ReadPriorities = []string{KeyPriority, BytePriority} + ops = hb.Schedule(tc) + c.Assert(len(ops), Equals, 1) + testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 3) +} diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 9ad7c52253b..e05d158f8e6 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -30,6 +30,7 @@ const ( DimLen ) +// DimToString returns string with dim func DimToString(dim int) string { switch dim { case ByteDim: @@ -43,6 +44,7 @@ func DimToString(dim int) string { } } +// StringToDim returns dim with string func StringToDim(name string) int { switch name { case DimToString(ByteDim): diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index b29e00f0258..bb7021b3f93 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -259,6 +259,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { expected1 := map[string]interface{}{ "min-hot-byte-rate": float64(100), "min-hot-key-rate": float64(10), + "min-hot-query-rate": float64(10), "max-zombie-rounds": float64(3), "max-peer-number": float64(1000), "byte-rate-rank-step-ratio": 0.05, From 105b4e74955be391ddcbffd8d345276f25f61ffe Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 29 Jul 2021 15:30:06 +0800 Subject: [PATCH 03/10] rename Signed-off-by: lhy1024 --- server/schedulers/hot_region.go | 44 ++++++++++++++++----------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index f80c4df98c3..825e22a4529 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -418,11 +418,11 @@ type balanceSolver struct { minDst *storeLoad rankStep *storeLoad - secondPriority int - firstPriority int - cpuPriority int - anotherPriority int - isSelectedDim func(int) bool + secondPriority int + firstPriority int + writeLeaderFirstPriority int + writeLeaderSecondPriority int + isSelectedDim func(int) bool firstPriorityIsBetter bool secondPriorityIsBetter bool @@ -491,11 +491,11 @@ func (bs *balanceSolver) init() { } // write leader - bs.cpuPriority = statistics.KeyDim + bs.writeLeaderFirstPriority = statistics.KeyDim if bs.firstPriority == statistics.QueryDim { - bs.cpuPriority = statistics.QueryDim + bs.writeLeaderFirstPriority = statistics.QueryDim } - bs.anotherPriority = statistics.ByteDim + bs.writeLeaderSecondPriority = statistics.ByteDim } func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver { @@ -629,13 +629,13 @@ func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat, maxPeerNum firstPrioritySort := make([]*statistics.HotPeerStat, len(ret)) copy(firstPrioritySort, ret) sort.Slice(firstPrioritySort, func(i, j int) bool { - k := getRegionStatKind(bs.rwTy, statistics.ByteDim) + k := getRegionStatKind(bs.rwTy, bs.firstPriority) return firstPrioritySort[i].GetLoad(k) > firstPrioritySort[j].GetLoad(k) }) secondPrioritySort := make([]*statistics.HotPeerStat, len(ret)) copy(secondPrioritySort, ret) sort.Slice(secondPrioritySort, func(i, j int) bool { - k := getRegionStatKind(bs.rwTy, statistics.KeyDim) + k := getRegionStatKind(bs.rwTy, bs.secondPriority) return secondPrioritySort[i].GetLoad(k) > secondPrioritySort[j].GetLoad(k) }) @@ -790,9 +790,9 @@ func (bs *balanceSolver) calcProgressiveRank() { if bs.rwTy == write && bs.opTy == transferLeader { // In this condition, CPU usage is the matter. // Only consider about key rate. - srcKeyRate := srcLd.Loads[bs.cpuPriority] - dstKeyRate := dstLd.Loads[bs.cpuPriority] - peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.cpuPriority)) + srcKeyRate := srcLd.Loads[bs.writeLeaderFirstPriority] + dstKeyRate := dstLd.Loads[bs.writeLeaderFirstPriority] + peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.writeLeaderFirstPriority)) if srcKeyRate-peerKeyRate >= dstKeyRate+peerKeyRate { rank = -1 } @@ -877,7 +877,7 @@ func (bs *balanceSolver) betterThan(old *solution) bool { // compare region if bs.rwTy == write && bs.opTy == transferLeader { - kind := getRegionStatKind(write, bs.cpuPriority) + kind := getRegionStatKind(write, bs.writeLeaderFirstPriority) switch { case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind): return true @@ -923,13 +923,13 @@ func (bs *balanceSolver) compareSrcStore(st1, st2 uint64) int { if bs.rwTy == write && bs.opTy == transferLeader { lpCmp = sliceLPCmp( minLPCmp(negLoadCmp(sliceLoadCmp( - stLdRankCmp(stLdRate(bs.cpuPriority), stepRank(bs.maxSrc.Loads[bs.cpuPriority], bs.rankStep.Loads[bs.cpuPriority])), - stLdRankCmp(stLdRate(bs.anotherPriority), stepRank(bs.maxSrc.Loads[bs.anotherPriority], bs.rankStep.Loads[bs.anotherPriority])), + stLdRankCmp(stLdRate(bs.writeLeaderFirstPriority), stepRank(bs.maxSrc.Loads[bs.writeLeaderFirstPriority], bs.rankStep.Loads[bs.writeLeaderFirstPriority])), + stLdRankCmp(stLdRate(bs.writeLeaderSecondPriority), stepRank(bs.maxSrc.Loads[bs.writeLeaderSecondPriority], bs.rankStep.Loads[bs.writeLeaderSecondPriority])), ))), diffCmp(sliceLoadCmp( stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)), - stLdRankCmp(stLdRate(bs.cpuPriority), stepRank(0, bs.rankStep.Loads[bs.cpuPriority])), - stLdRankCmp(stLdRate(bs.anotherPriority), stepRank(0, bs.rankStep.Loads[bs.anotherPriority])), + stLdRankCmp(stLdRate(bs.writeLeaderFirstPriority), stepRank(0, bs.rankStep.Loads[bs.writeLeaderFirstPriority])), + stLdRankCmp(stLdRate(bs.writeLeaderSecondPriority), stepRank(0, bs.rankStep.Loads[bs.writeLeaderSecondPriority])), )), ) } else { @@ -958,13 +958,13 @@ func (bs *balanceSolver) compareDstStore(st1, st2 uint64) int { if bs.rwTy == write && bs.opTy == transferLeader { lpCmp = sliceLPCmp( maxLPCmp(sliceLoadCmp( - stLdRankCmp(stLdRate(bs.cpuPriority), stepRank(bs.minDst.Loads[bs.cpuPriority], bs.rankStep.Loads[bs.cpuPriority])), - stLdRankCmp(stLdRate(bs.anotherPriority), stepRank(bs.minDst.Loads[bs.anotherPriority], bs.rankStep.Loads[bs.anotherPriority])), + stLdRankCmp(stLdRate(bs.writeLeaderFirstPriority), stepRank(bs.minDst.Loads[bs.writeLeaderFirstPriority], bs.rankStep.Loads[bs.writeLeaderFirstPriority])), + stLdRankCmp(stLdRate(bs.writeLeaderSecondPriority), stepRank(bs.minDst.Loads[bs.writeLeaderSecondPriority], bs.rankStep.Loads[bs.writeLeaderSecondPriority])), )), diffCmp(sliceLoadCmp( stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)), - stLdRankCmp(stLdRate(bs.cpuPriority), stepRank(0, bs.rankStep.Loads[bs.cpuPriority])), - stLdRankCmp(stLdRate(bs.anotherPriority), stepRank(0, bs.rankStep.Loads[bs.anotherPriority])), + stLdRankCmp(stLdRate(bs.writeLeaderFirstPriority), stepRank(0, bs.rankStep.Loads[bs.writeLeaderFirstPriority])), + stLdRankCmp(stLdRate(bs.writeLeaderSecondPriority), stepRank(0, bs.rankStep.Loads[bs.writeLeaderSecondPriority])), ))) } else { lpCmp = sliceLPCmp( From fb55356779f9bb2f5e1619164e201272d7a9d8ab Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 29 Jul 2021 19:21:24 +0800 Subject: [PATCH 04/10] update Signed-off-by: lhy1024 --- server/schedulers/hot_region.go | 42 +++++++++++++------------ server/statistics/hot_peer.go | 28 ----------------- tools/pd-ctl/pdctl/command/scheduler.go | 3 -- 3 files changed, 22 insertions(+), 51 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 990e21f4065..d8e80d141b4 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -414,14 +414,12 @@ type balanceSolver struct { cur *solution - maxSrc *storeLoad - minDst *storeLoad - rankStep *storeLoad - firstPriority int - secondPriority int + maxSrc *storeLoad + minDst *storeLoad + rankStep *storeLoad - secondPriority int firstPriority int + secondPriority int writeLeaderFirstPriority int writeLeaderSecondPriority int isSelectedDim func(int) bool @@ -480,20 +478,19 @@ func (bs *balanceSolver) init() { Loads: stepLoads, Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), } - priorities := bs.sche.conf.ReadPriorities if bs.rwTy == write { priorities = bs.sche.conf.WritePriorities bs.writeLeaderFirstPriority = statistics.KeyDim - if bs.firstPriority == statistics.QueryDim { + if bs.firstPriority == statistics.QueryDim { bs.writeLeaderFirstPriority = statistics.QueryDim } bs.writeLeaderSecondPriority = statistics.ByteDim } - bs.firstPriority = statistics.StringToDim(priorities[0]) - bs.secondPriority = statistics.StringToDim(priorities[1]) + bs.firstPriority = stringToDim(priorities[0]) + bs.secondPriority = stringToDim(priorities[1]) bs.isSelectedDim = func(dim int) bool { return dim == bs.firstPriority || dim == bs.secondPriority } @@ -1095,9 +1092,9 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence if bs.firstPriorityIsBetter && bs.secondPriorityIsBetter { dim = "both" } else if bs.firstPriorityIsBetter { - dim = statistics.DimToString(bs.firstPriority) + dim = dimToString(bs.firstPriority) } else if bs.secondPriorityIsBetter { - dim = statistics.DimToString(bs.secondPriority) + dim = dimToString(bs.secondPriority) } op.SetPriorityLevel(core.HighPriority) @@ -1264,14 +1261,6 @@ func getRegionStatKind(rwTy rwType, dim int) statistics.RegionStatKind { return 0 } -func (bs *balanceSolver) preferPriority() [2]string { - priorities := bs.sche.conf.WritePriorities - if bs.rwTy == read { - priorities = bs.sche.conf.ReadPriorities - } - return [2]string{priorities[0], priorities[1]} -} - func stringToDim(name string) int { switch name { case BytePriority: @@ -1281,3 +1270,16 @@ func stringToDim(name string) int { } return statistics.ByteDim } + +func dimToString(dim int) string { + switch dim { + case statistics.ByteDim: + return BytePriority + case statistics.KeyDim: + return KeyPriority + case statistics.QueryDim: + return QueryPriority + default: + return "" + } +} diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index e05d158f8e6..00a666edb7b 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -30,34 +30,6 @@ const ( DimLen ) -// DimToString returns string with dim -func DimToString(dim int) string { - switch dim { - case ByteDim: - return "byte" - case KeyDim: - return "key" - case QueryDim: - return "qps" - default: - return "" - } -} - -// StringToDim returns dim with string -func StringToDim(name string) int { - switch name { - case DimToString(ByteDim): - return ByteDim - case DimToString(KeyDim): - return KeyDim - case DimToString(QueryDim): - return QueryDim - default: - return DimLen - } -} - // IsSelectedDim return whether the dim is selected for hot scheduler func IsSelectedDim(dim int) bool { // TODO: configure diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 8b6a13db0cc..a8e4e8c9e59 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -551,9 +551,6 @@ func postSchedulerConfigCommandFunc(cmd *cobra.Command, schedulerName string, ar cmd.Println("priorities shouldn't be repeated") return } - if len(priorities) != 2 { - cmd.Println("priorities should be two dims") - } } else { input[key] = val } From 76b8ed3f406741597a83dd45042162f61744efbc Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 29 Jul 2021 19:30:23 +0800 Subject: [PATCH 05/10] use qps Signed-off-by: lhy1024 --- server/schedulers/hot_region.go | 1 - server/schedulers/hot_region_config.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index d8e80d141b4..2402408be00 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -773,7 +773,6 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc() } } - } return ret } diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index bdc48aeed9e..eb403edfb87 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -55,7 +55,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { MaxPeerNum: 1000, SrcToleranceRatio: 1.05, // Tolerate 5% difference DstToleranceRatio: 1.05, // Tolerate 5% difference - ReadPriorities: []string{BytePriority, KeyPriority}, + ReadPriorities: []string{QueryPriority, BytePriority}, WritePriorities: []string{BytePriority, KeyPriority}, } } From 3bf7fef02199f7d3c448e3b6ac673afe0bf56421 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 29 Jul 2021 19:44:27 +0800 Subject: [PATCH 06/10] fix test Signed-off-by: lhy1024 --- tests/pdctl/scheduler/scheduler_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 51b09d30988..8ce47a085fe 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -270,7 +270,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { "minor-dec-ratio": 0.99, "src-tolerance-ratio": 1.05, "dst-tolerance-ratio": 1.05, - "read-priorities": []interface{}{"byte", "key"}, + "read-priorities": []interface{}{"qps", "byte"}, "write-priorities": []interface{}{"byte", "key"}, } c.Assert(conf, DeepEquals, expected1) @@ -280,6 +280,11 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) c.Assert(conf1, DeepEquals, expected1) + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + expected1["read-priorities"] = []interface{}{"byte", "key"} + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + c.Assert(conf1, DeepEquals, expected1) + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) c.Assert(conf1, DeepEquals, expected1) From c2ac5c411d68dc02c57dbee393e2ac849bf89342 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 29 Jul 2021 20:31:03 +0800 Subject: [PATCH 07/10] fix test and comments Signed-off-by: lhy1024 --- server/schedulers/hot_region.go | 51 ++++++++++++++++------------ server/schedulers/hot_region_test.go | 18 +++++----- 2 files changed, 40 insertions(+), 29 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 2402408be00..f4363d45cdf 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -233,7 +233,6 @@ func summaryStoresLoad( allLoadSum := make([]float64, statistics.DimLen) allCount := 0.0 - // Stores without byte rate statistics is not available to schedule. for _, store := range stores { id := store.GetID() storeLoads, ok := storesLoads[id] @@ -303,7 +302,7 @@ func summaryStoresLoad( } } storeLen := float64(len(storesLoads)) - // store expectation byte/key rate and count for each store-load detail. + // store expectation rate and count for each store-load detail. for id, detail := range loadDetail { expectLoads := make([]float64, len(allLoadSum)) for i := range expectLoads { @@ -469,7 +468,10 @@ func (bs *balanceSolver) init() { maxCur = maxLoad(maxCur, &detail.LoadPred.Current) } - rankStepRatios := []float64{bs.sche.conf.GetByteRankStepRatio(), bs.sche.conf.GetKeyRankStepRatio(), bs.sche.conf.GetQueryRateRankStepRatio()} + rankStepRatios := []float64{ + statistics.ByteDim: bs.sche.conf.GetByteRankStepRatio(), + statistics.KeyDim: bs.sche.conf.GetKeyRankStepRatio(), + statistics.QueryDim: bs.sche.conf.GetQueryRateRankStepRatio()} stepLoads := make([]float64, statistics.DimLen) for i := range stepLoads { stepLoads[i] = maxCur.Loads[i] * rankStepRatios[i] @@ -482,6 +484,11 @@ func (bs *balanceSolver) init() { priorities := bs.sche.conf.ReadPriorities if bs.rwTy == write { priorities = bs.sche.conf.WritePriorities + } + bs.firstPriority = stringToDim(priorities[0]) + bs.secondPriority = stringToDim(priorities[1]) + + if bs.rwTy == write { bs.writeLeaderFirstPriority = statistics.KeyDim if bs.firstPriority == statistics.QueryDim { bs.writeLeaderFirstPriority = statistics.QueryDim @@ -489,8 +496,6 @@ func (bs *balanceSolver) init() { bs.writeLeaderSecondPriority = statistics.ByteDim } - bs.firstPriority = stringToDim(priorities[0]) - bs.secondPriority = stringToDim(priorities[1]) bs.isSelectedDim = func(dim int) bool { return dim == bs.firstPriority || dim == bs.secondPriority } @@ -568,7 +573,7 @@ func (bs *balanceSolver) solve() []*operator.Operator { return []*operator.Operator{op} } -// filterSrcStores compare the min rate and the ratio * expectation rate, if both key and byte rate is greater than +// filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than // its expectation * ratio, the store would be selected as hot source store func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail) @@ -796,11 +801,11 @@ func (bs *balanceSolver) calcProgressiveRank() { rank := int64(0) if bs.rwTy == write && bs.opTy == transferLeader { // In this condition, CPU usage is the matter. - // Only consider about key rate. - srcKeyRate := srcLd.Loads[bs.writeLeaderFirstPriority] - dstKeyRate := dstLd.Loads[bs.writeLeaderFirstPriority] - peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.writeLeaderFirstPriority)) - if srcKeyRate-peerKeyRate >= dstKeyRate+peerKeyRate { + // Only consider about key rate or query rate. + srcRate := srcLd.Loads[bs.writeLeaderFirstPriority] + dstRate := dstLd.Loads[bs.writeLeaderFirstPriority] + peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.writeLeaderFirstPriority)) + if srcRate-peerRate >= dstRate+peerRate { rank = -1 } } else { @@ -808,16 +813,16 @@ func (bs *balanceSolver) calcProgressiveRank() { greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio() switch { case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio: - // If belong to the case, both byte rate and key rate will be more balanced, the best choice. + // If belong to the case, two dim will be more balanced, the best choice. rank = -3 bs.firstPriorityIsBetter = true bs.secondPriorityIsBetter = true case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio: - // If belong to the case, byte rate will be not worsened, key rate will be more balanced. + // If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced. rank = -2 bs.secondPriorityIsBetter = true case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio: - // If belong to the case, byte rate will be more balanced, ignore the key rate. + // If belong to the case, first priority dim will be more balanced, ignore the second priority dim. rank = -1 bs.firstPriorityIsBetter = true } @@ -826,8 +831,8 @@ func (bs *balanceSolver) calcProgressiveRank() { } func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *storeLoad, peer *statistics.HotPeerStat) (bool, float64, bool, float64) { - // we use DecRatio(Decline Ratio) to expect that the dst store's (key/byte) rate should still be less - // than the src store's (key/byte) rate after scheduling one peer. + // we use DecRatio(Decline Ratio) to expect that the dst store's rate should still be less + // than the src store's rate after scheduling one peer. getSrcDecRate := func(a, b float64) float64 { if a-b <= 0 { return 1 @@ -898,22 +903,22 @@ func (bs *balanceSolver) betterThan(old *solution) bool { } else { firstCmp, secondCmp := bs.getRkCmpPriorities(old) switch bs.cur.progressiveRank { - case -2: // greatDecRatio < byteDecRatio <= minorDecRatio && keyDecRatio <= greatDecRatio + case -2: // greatDecRatio < firstPriorityDecRatio <= minorDecRatio && secondPriorityDecRatio <= greatDecRatio if secondCmp != 0 { return secondCmp > 0 } if firstCmp != 0 { - // prefer smaller byte rate, to reduce oscillation + // prefer smaller first priority rate, to reduce oscillation return firstCmp < 0 } - case -3: // byteDecRatio <= greatDecRatio && keyDecRatio <= greatDecRatio + case -3: // firstPriorityDecRatio <= greatDecRatio && secondPriorityDecRatio <= greatDecRatio if secondCmp != 0 { return secondCmp > 0 } fallthrough - case -1: // byteDecRatio <= greatDecRatio + case -1: // firstPriorityDecRatio <= greatDecRatio if firstCmp != 0 { - // prefer region with larger byte rate, to converge faster + // prefer region with larger first priority rate, to converge faster return firstCmp > 0 } } @@ -931,6 +936,8 @@ func (bs *balanceSolver) getRkCmpPriorities(old *solution) (firstCmp int, second return 100 case statistics.KeyDim: return 10 + case statistics.QueryDim: + return 10 } return 100 } @@ -1266,6 +1273,8 @@ func stringToDim(name string) int { return statistics.ByteDim case KeyPriority: return statistics.KeyDim + case QueryPriority: + return statistics.QueryDim } return statistics.ByteDim } diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index c2ff9501ddd..50ba00c9871 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -35,7 +35,9 @@ import ( func init() { schedulePeerPr = 1.0 schedule.RegisterScheduler(HotWriteRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newHotWriteScheduler(opController, initHotRegionScheduleConfig()), nil + cfg := initHotRegionScheduleConfig() + cfg.ReadPriorities = []string{BytePriority, KeyPriority} + return newHotWriteScheduler(opController, cfg), nil }) schedule.RegisterScheduler(HotReadRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { return newHotReadScheduler(opController, initHotRegionScheduleConfig()), nil @@ -842,6 +844,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { hb.(*hotScheduler).conf.GreatDecRatio = 0.99 hb.(*hotScheduler).conf.MinorDecRatio = 1 hb.(*hotScheduler).conf.DstToleranceRatio = 1 + hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority} for i := 0; i < 2; i++ { // 0: byte rate @@ -1295,24 +1298,23 @@ func (s *testHotCacheSuite) TestSortHotPeer(c *C) { c.Assert(err, IsNil) hb := sche.(*hotScheduler) leaderSolver := newBalanceSolver(hb, tc, read, transferLeader) - hotPeers := []*statistics.HotPeerStat{{ RegionID: 1, Loads: []float64{ - statistics.RegionReadBytes: 10, - statistics.RegionReadKeys: 1, + statistics.RegionReadQuery: 10, + statistics.RegionReadBytes: 1, }, }, { RegionID: 2, Loads: []float64{ - statistics.RegionReadBytes: 1, - statistics.RegionReadKeys: 10, + statistics.RegionReadQuery: 1, + statistics.RegionReadBytes: 10, }, }, { RegionID: 3, Loads: []float64{ - statistics.RegionReadBytes: 5, - statistics.RegionReadKeys: 6, + statistics.RegionReadQuery: 5, + statistics.RegionReadBytes: 6, }, }} From 9cded0b6028d5ebbe15ab70cdaa34585cade0f04 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 30 Jul 2021 12:38:45 +0800 Subject: [PATCH 08/10] address comments Signed-off-by: lhy1024 --- server/schedulers/hot_region.go | 4 ++-- server/schedulers/utils.go | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index f4363d45cdf..576de11cb04 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -219,7 +219,7 @@ func (h *hotScheduler) gcRegionPendings() { } // summaryStoresLoad Load information of all available stores. -// it will filtered the hot peer and calculate the current and future stat(rate,count) for each store +// it will filter the hot peer and calculate the current and future stat(rate,count) for each store func summaryStoresLoad( stores []*core.StoreInfo, storesLoads map[uint64][]float64, @@ -1096,7 +1096,7 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence dim := "" if bs.firstPriorityIsBetter && bs.secondPriorityIsBetter { - dim = "both" + dim = "all" } else if bs.firstPriorityIsBetter { dim = dimToString(bs.firstPriority) } else if bs.secondPriorityIsBetter { diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 8c890c5f1d9..422e85aa537 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -286,11 +286,7 @@ func (load storeLoad) ToLoadPred(rwTy rwType, infl *Influence) *storeLoadPred { func stLdRate(dim int) func(ld *storeLoad) float64 { return func(ld *storeLoad) float64 { - switch dim { - case statistics.ByteDim, statistics.KeyDim, statistics.QueryDim: - return ld.Loads[dim] - } - return 0 + return ld.Loads[dim] } } From 650207a2021a58637324b1f6a3be060718bea8ef Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 30 Jul 2021 14:55:40 +0800 Subject: [PATCH 09/10] address comment Signed-off-by: lhy1024 --- server/schedulers/hot_region.go | 4 ++-- server/statistics/hot_peer.go | 6 ------ server/statistics/hot_peer_cache.go | 5 +---- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 576de11cb04..c9e8745fbdf 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -594,7 +594,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *storeLoad) bool { return slice.AllOf(minLoad.Loads, func(i int) bool { - if statistics.IsSelectedDim(i) { + if bs.isSelectedDim(i) { return minLoad.Loads[i] > bs.sche.conf.GetSrcToleranceRatio()*expectLoad.Loads[i] } return true @@ -785,7 +785,7 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *storeLoad) bool { dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() return slice.AllOf(maxLoad.Loads, func(i int) bool { - if statistics.IsSelectedDim(i) { + if bs.isSelectedDim(i) { return maxLoad.Loads[i]*dstToleranceRatio < expect.Loads[i] } return true diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 00a666edb7b..a53820f5826 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -30,12 +30,6 @@ const ( DimLen ) -// IsSelectedDim return whether the dim is selected for hot scheduler -func IsSelectedDim(dim int) bool { - // TODO: configure - return dim == ByteDim || dim == KeyDim -} - type dimStat struct { typ RegionStatKind Rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 47886050d2b..9399174c2b0 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -433,10 +433,7 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f return nil } isHot := slice.AnyOf(regionStats, func(i int) bool { - if IsSelectedDim(i) { - return deltaLoads[regionStats[i]]/interval.Seconds() >= newItem.thresholds[i] - } - return false + return deltaLoads[regionStats[i]]/interval.Seconds() >= newItem.thresholds[i] }) if !isHot { return nil From f4c9402e5977ff6075bd463f924a9a1b7e83f9e7 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 30 Jul 2021 16:21:48 +0800 Subject: [PATCH 10/10] fix test Signed-off-by: lhy1024 --- server/schedulers/hot_region_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index 50ba00c9871..8f9bddf2292 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -685,6 +685,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { tc.DisableFeature(versioninfo.JointConsensus) hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) + hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority} tc.SetHotRegionCacheHitsThreshold(0) // Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0. @@ -790,6 +791,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { c.Assert(err, IsNil) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) + hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority} tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) @@ -1435,6 +1437,7 @@ func (s *testHotSchedulerSuite) TestHotReadPeerSchedule(c *C) { sche, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, tc, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigJSONDecoder([]byte("null"))) c.Assert(err, IsNil) hb := sche.(*hotScheduler) + hb.conf.ReadPriorities = []string{BytePriority, KeyPriority} tc.UpdateStorageReadStats(1, 20*MB, 20*MB) tc.UpdateStorageReadStats(2, 19*MB, 19*MB)