diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 039aa7d470e..2c84e7cd87a 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -270,9 +270,14 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en } // AddLeaderRegionWithReadInfo adds region with specified leader, followers and read info. -func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, reportInterval uint64, followerIds ...uint64) { +func (mc *Cluster) AddLeaderRegionWithReadInfo( + regionID uint64, leaderID uint64, + readBytes, readKeys uint64, + reportInterval uint64, + followerIds []uint64) { r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r = r.Clone(core.SetReadBytes(readBytes)) + r = r.Clone(core.SetReadKeys(readKeys)) r = r.Clone(core.SetReportInterval(reportInterval)) items := mc.HotCache.CheckRead(r, mc.StoresStats) for _, item := range items { @@ -282,9 +287,14 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, } // AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info. -func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, reportInterval uint64, followerIds ...uint64) { +func (mc *Cluster) AddLeaderRegionWithWriteInfo( + regionID uint64, leaderID uint64, + writtenBytes, writtenKeys uint64, + reportInterval uint64, + followerIds []uint64) { r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) r = r.Clone(core.SetWrittenBytes(writtenBytes)) + r = r.Clone(core.SetWrittenKeys(writtenKeys)) r = r.Clone(core.SetReportInterval(reportInterval)) items := mc.HotCache.CheckWrite(r, mc.StoresStats) for _, item := range items { @@ -404,6 +414,32 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) { mc.PutStore(newStore) } +// UpdateStorageWrittenKeys updates store written keys. +func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.KeysWritten = keysWritten + now := time.Now().Second() + interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} + newStats.Interval = interval + newStore := store.Clone(core.SetStoreStats(newStats)) + mc.Set(storeID, newStats) + mc.PutStore(newStore) +} + +// UpdateStorageReadKeys updates store read bytes. +func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.KeysRead = keysRead + now := time.Now().Second() + interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} + newStats.Interval = interval + newStore := store.Clone(core.SetStoreStats(newStats)) + mc.Set(storeID, newStats) + mc.PutStore(newStore) +} + // UpdateStoreStatus updates store status. func (mc *Cluster) UpdateStoreStatus(id uint64) { leaderCount := mc.Regions.GetStoreLeaderCount(id) diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index bcc496b6e45..b5e9de9d50e 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -420,6 +420,8 @@ func (c *coordinator) collectHotSpotMetrics() { infl := pendings[storeID] // TODO: add to tidb-ansible after merging pending influence into operator influence. hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.ByteRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.KeyRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count) } // Collects hot read region metrics. @@ -440,6 +442,8 @@ func (c *coordinator) collectHotSpotMetrics() { infl := pendings[storeID] hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.ByteRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.KeyRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count) } } diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 4e0eeefe5c8..2db22ee8f90 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -15,6 +15,7 @@ package schedulers import ( "fmt" + "math" "math/rand" "sort" "sync" @@ -60,43 +61,27 @@ const ( // HotWriteRegionType is hot write region scheduler type. HotWriteRegionType = "hot-write-region" - hotRegionLimitFactor = 0.75 - hotRegionScheduleFactor = 0.95 + hotRegionLimitFactor = 0.75 + + maxPeerNum = 1000 maxZombieDur time.Duration = statistics.StoreHeartBeatReportInterval * time.Second minRegionScheduleInterval time.Duration = statistics.StoreHeartBeatReportInterval * time.Second -) -// rwType : the perspective of balance -type rwType int + minHotByteRate = 100 + minHotKeyRate = 10 -const ( - write rwType = iota - read -) - -type opType int + // rank step ratio decide the step when calculate rank + // step = max current * rank step ratio + byteRateRankStepRatio = 0.05 + keyRateRankStepRatio = 0.05 + countRankStepRatio = 0.1 -const ( - movePeer opType = iota - transferLeader + greatDecRatio = 0.95 + minorDecRatio = 0.99 ) -type storeLoadInfos struct { - ReadLeaders map[uint64]*storeLoadDetail - WriteLeaders map[uint64]*storeLoadDetail - WritePeers map[uint64]*storeLoadDetail -} - -func newStoreLoadInfos() *storeLoadInfos { - return &storeLoadInfos{ - ReadLeaders: make(map[uint64]*storeLoadDetail), - WriteLeaders: make(map[uint64]*storeLoadDetail), - WritePeers: make(map[uint64]*storeLoadDetail), - } -} - type hotScheduler struct { name string *BaseScheduler @@ -107,31 +92,30 @@ type hotScheduler struct { r *rand.Rand // states across multiple `Schedule` calls - readPendings map[*pendingInfluence]struct{} - writePendings map[*pendingInfluence]struct{} + pendings [resourceTypeLen]map[*pendingInfluence]struct{} regionPendings map[uint64][2]*operator.Operator // temporary states but exported to API or metrics - stLoadInfos *storeLoadInfos - readPendingSum map[uint64]Influence - writePendingSum map[uint64]Influence + stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail + pendingSums [resourceTypeLen]map[uint64]Influence } func newHotScheduler(opController *schedule.OperatorController) *hotScheduler { base := NewBaseScheduler(opController) - return &hotScheduler{ + ret := &hotScheduler{ name: HotRegionName, BaseScheduler: base, leaderLimit: 1, peerLimit: 1, types: []rwType{write, read}, r: rand.New(rand.NewSource(time.Now().UnixNano())), - readPendings: map[*pendingInfluence]struct{}{}, - writePendings: map[*pendingInfluence]struct{}{}, regionPendings: make(map[uint64][2]*operator.Operator), - - stLoadInfos: newStoreLoadInfos(), } + for ty := resourceType(0); ty < resourceTypeLen; ty++ { + ret.pendings[ty] = map[*pendingInfluence]struct{}{} + ret.stLoadInfos[ty] = map[uint64]*storeLoadDetail{} + } + return ret } func newHotReadScheduler(opController *schedule.OperatorController) *hotScheduler { @@ -197,11 +181,13 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { minHotDegree := cluster.GetHotRegionCacheHitsThreshold() { // update read statistics regionRead := cluster.RegionReadStats() - storeRead := storesStat.GetStoresBytesReadStat() + storeByte := storesStat.GetStoresBytesReadStat() + storeKey := storesStat.GetStoresKeysReadStat() - h.stLoadInfos.ReadLeaders = summaryStoresLoad( - storeRead, - h.readPendingSum, + h.stLoadInfos[readLeader] = summaryStoresLoad( + storeByte, + storeKey, + h.pendingSums[readLeader], regionRead, minHotDegree, read, core.LeaderKind) @@ -209,18 +195,21 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { { // update write statistics regionWrite := cluster.RegionWriteStats() - storeWrite := storesStat.GetStoresBytesWriteStat() + storeByte := storesStat.GetStoresBytesWriteStat() + storeKey := storesStat.GetStoresKeysWriteStat() - h.stLoadInfos.WriteLeaders = summaryStoresLoad( - storeWrite, - map[uint64]Influence{}, + h.stLoadInfos[writeLeader] = summaryStoresLoad( + storeByte, + storeKey, + h.pendingSums[writeLeader], regionWrite, minHotDegree, write, core.LeaderKind) - h.stLoadInfos.WritePeers = summaryStoresLoad( - storeWrite, - h.writePendingSum, + h.stLoadInfos[writePeer] = summaryStoresLoad( + storeByte, + storeKey, + h.pendingSums[writePeer], regionWrite, minHotDegree, write, core.RegionKind) @@ -228,8 +217,9 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { } func (h *hotScheduler) summaryPendingInfluence() { - h.readPendingSum = summaryPendingInfluence(h.readPendings, calcPendingWeight) - h.writePendingSum = summaryPendingInfluence(h.writePendings, calcPendingWeight) + for ty := resourceType(0); ty < resourceTypeLen; ty++ { + h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], calcPendingWeight) + } h.gcRegionPendings() } @@ -258,6 +248,7 @@ func (h *hotScheduler) gcRegionPendings() { // Load information of all available stores. func summaryStoresLoad( storeByteRate map[uint64]float64, + storeKeyRate map[uint64]float64, pendings map[uint64]Influence, storeHotPeers map[uint64][]*statistics.HotPeerStat, minHotDegree int, @@ -267,30 +258,41 @@ func summaryStoresLoad( loadDetail := make(map[uint64]*storeLoadDetail, len(storeByteRate)) // Stores without byte rate statistics is not available to schedule. - for id, rate := range storeByteRate { + for id, byteRate := range storeByteRate { + keyRate := storeKeyRate[id] // Find all hot peers first hotPeers := make([]*statistics.HotPeerStat, 0) { - hotSum := 0.0 + byteSum := 0.0 + keySum := 0.0 for _, peer := range filterHotPeers(kind, minHotDegree, storeHotPeers[id]) { - hotSum += peer.GetByteRate() + byteSum += peer.GetByteRate() + keySum += peer.GetKeyRate() hotPeers = append(hotPeers, peer.Clone()) } // Use sum of hot peers to estimate leader-only byte rate. if kind == core.LeaderKind && rwTy == write { - rate = hotSum + byteRate = byteSum + keyRate = keySum } // Metric for debug. - ty := "byte-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(hotSum) + { + ty := "byte-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(byteSum) + } + { + ty := "key-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum) + } } // Build store load prediction from current load and pending influence. stLoadPred := (&storeLoad{ - ByteRate: rate, - Count: len(hotPeers), + ByteRate: byteRate, + KeyRate: keyRate, + Count: float64(len(hotPeers)), }).ToLoadPred(pendings[id]) // Construct store load info. @@ -318,21 +320,19 @@ func filterHotPeers( return ret } -func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, balanceType rwType, ty opType) { +func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) { influence := newPendingInfluence(op, srcStore, dstStore, infl) regionID := op.RegionID() - if balanceType == read { - h.readPendings[influence] = struct{}{} - } else { - h.writePendings[influence] = struct{}{} - } + + rcTy := toResourceType(rwTy, opTy) + h.pendings[rcTy][influence] = struct{}{} if _, ok := h.regionPendings[regionID]; !ok { h.regionPendings[regionID] = [2]*operator.Operator{nil, nil} } { // h.pendingOpInfos[regionID][ty] = influence tmp := h.regionPendings[regionID] - tmp[ty] = op + tmp[opTy] = op h.regionPendings[regionID] = tmp } @@ -357,23 +357,18 @@ func (h *hotScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Op return nil } -// balanceHotRetryLimit is the limit to retry schedule for selected balance strategy. -const balanceHotRetryLimit = 5 - func (h *hotScheduler) balanceHotWriteRegions(cluster opt.Cluster) []*operator.Operator { - for i := 0; i < balanceHotRetryLimit; i++ { - // prefer to balance by peer - peerSolver := newBalanceSolver(h, cluster, write, movePeer) - ops := peerSolver.solve() - if len(ops) > 0 { - return ops - } + // prefer to balance by peer + peerSolver := newBalanceSolver(h, cluster, write, movePeer) + ops := peerSolver.solve() + if len(ops) > 0 { + return ops + } - leaderSolver := newBalanceSolver(h, cluster, write, transferLeader) - ops = leaderSolver.solve() - if len(ops) > 0 { - return ops - } + leaderSolver := newBalanceSolver(h, cluster, write, transferLeader) + ops = leaderSolver.solve() + if len(ops) > 0 { + return ops } schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc() @@ -387,28 +382,57 @@ type balanceSolver struct { rwTy rwType opTy opType - // temporary states + cur *solution + + maxSrc *storeLoad + minDst *storeLoad + rankStep *storeLoad +} + +type solution struct { srcStoreID uint64 srcPeerStat *statistics.HotPeerStat region *core.RegionInfo dstStoreID uint64 + + // progressiveRank measures the contribution for balance. + // The smaller the rank, the better this solution is. + // If rank < 0, this solution makes thing better. + progressiveRank int64 } func (bs *balanceSolver) init() { - switch bs.rwTy { - case read: - bs.stLoadDetail = bs.sche.stLoadInfos.ReadLeaders - case write: - switch bs.opTy { - case movePeer: - bs.stLoadDetail = bs.sche.stLoadInfos.WritePeers - case transferLeader: - bs.stLoadDetail = bs.sche.stLoadInfos.WriteLeaders - } + switch toResourceType(bs.rwTy, bs.opTy) { + case writePeer: + bs.stLoadDetail = bs.sche.stLoadInfos[writePeer] + case writeLeader: + bs.stLoadDetail = bs.sche.stLoadInfos[writeLeader] + case readLeader: + bs.stLoadDetail = bs.sche.stLoadInfos[readLeader] } for _, id := range getUnhealthyStores(bs.cluster) { delete(bs.stLoadDetail, id) } + + bs.maxSrc = &storeLoad{} + bs.minDst = &storeLoad{ + ByteRate: math.MaxFloat64, + KeyRate: math.MaxFloat64, + Count: math.MaxFloat64, + } + maxCur := &storeLoad{} + + for _, detail := range bs.stLoadDetail { + bs.maxSrc = maxLoad(bs.maxSrc, detail.LoadPred.min()) + bs.minDst = minLoad(bs.minDst, detail.LoadPred.max()) + maxCur = maxLoad(maxCur, &detail.LoadPred.Current) + } + + bs.rankStep = &storeLoad{ + ByteRate: maxCur.ByteRate * byteRateRankStepRatio, + KeyRate: maxCur.KeyRate * keyRateRankStepRatio, + Count: maxCur.Count * countRankStepRatio, + } } func getUnhealthyStores(cluster opt.Cluster) []uint64 { @@ -439,7 +463,7 @@ func (bs *balanceSolver) isValid() bool { return false } switch bs.rwTy { - case read, write: + case write, read: default: return false } @@ -455,28 +479,43 @@ func (bs *balanceSolver) solve() []*operator.Operator { if !bs.isValid() || !bs.allowBalance() { return nil } - bs.srcStoreID = bs.selectSrcStoreID() - if bs.srcStoreID == 0 { - return nil - } + bs.cur = &solution{} + var ( + best *solution + ops []*operator.Operator + infls []Influence + ) - for _, srcPeerStat := range bs.getPeerList() { - bs.srcPeerStat = srcPeerStat - bs.region = bs.getRegion() - if bs.region == nil { - continue - } - dstCandidates := bs.getDstCandidateIDs() - if len(dstCandidates) <= 0 { - continue - } - bs.dstStoreID = bs.selectDstStoreID(dstCandidates) - ops := bs.buildOperators() - if len(ops) > 0 { - return ops + for srcStoreID := range bs.filterSrcStores() { + bs.cur.srcStoreID = srcStoreID + + for _, srcPeerStat := range bs.filterHotPeers() { + bs.cur.srcPeerStat = srcPeerStat + bs.cur.region = bs.getRegion() + if bs.cur.region == nil { + continue + } + + for dstStoreID := range bs.filterDstStores() { + bs.cur.dstStoreID = dstStoreID + bs.calcProgressiveRank() + + if bs.cur.progressiveRank < 0 && bs.betterThan(best) { + if newOps, newInfls := bs.buildOperators(); len(newOps) > 0 { + ops = newOps + infls = newInfls + clone := *bs.cur + best = &clone + } + } + } } } - return nil + + for i := 0; i < len(ops); i++ { + bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy) + } + return ops } func (bs *balanceSolver) allowBalance() bool { @@ -490,29 +529,62 @@ func (bs *balanceSolver) allowBalance() bool { } } -func (bs *balanceSolver) selectSrcStoreID() uint64 { - var id uint64 - switch bs.opTy { - case movePeer: - id = selectSrcStoreByByteRate(bs.stLoadDetail) - case transferLeader: - if bs.rwTy == write { - id = selectSrcStoreByCount(bs.stLoadDetail) - } else { - id = selectSrcStoreByByteRate(bs.stLoadDetail) +func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { + ret := make(map[uint64]*storeLoadDetail) + for id, detail := range bs.stLoadDetail { + if bs.cluster.GetStore(id) == nil { + log.Error("failed to get the source store", zap.Uint64("store-id", id)) + continue } + if len(detail.HotPeers) == 0 { + continue + } + ret[id] = detail } - if id != 0 && bs.cluster.GetStore(id) == nil { - log.Error("failed to get the source store", zap.Uint64("store-id", id)) - } - return id + return ret } -func (bs *balanceSolver) getPeerList() []*statistics.HotPeerStat { - ret := bs.stLoadDetail[bs.srcStoreID].HotPeers - bs.sche.r.Shuffle(len(ret), func(i, j int) { - ret[i], ret[j] = ret[j], ret[i] +func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { + ret := bs.stLoadDetail[bs.cur.srcStoreID].HotPeers + // Return at most maxPeerNum peers, to prevent balanceSolver.solve() too slow. + if len(ret) <= maxPeerNum { + return ret + } + + byteSort := make([]*statistics.HotPeerStat, len(ret)) + copy(byteSort, ret) + sort.Slice(byteSort, func(i, j int) bool { + return byteSort[i].GetByteRate() > byteSort[j].GetByteRate() }) + keySort := make([]*statistics.HotPeerStat, len(ret)) + copy(keySort, ret) + sort.Slice(keySort, func(i, j int) bool { + return keySort[i].GetKeyRate() > keySort[j].GetKeyRate() + }) + + union := make(map[*statistics.HotPeerStat]struct{}, maxPeerNum) + for len(union) < maxPeerNum { + for len(byteSort) > 0 { + peer := byteSort[0] + byteSort = byteSort[1:] + if _, ok := union[peer]; !ok { + union[peer] = struct{}{} + break + } + } + for len(keySort) > 0 { + peer := keySort[0] + keySort = keySort[1:] + if _, ok := union[peer]; !ok { + union[peer] = struct{}{} + break + } + } + } + ret = make([]*statistics.HotPeerStat, 0, len(union)) + for peer := range union { + ret = append(ret, peer) + } return ret } @@ -548,21 +620,21 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool { } func (bs *balanceSolver) getRegion() *core.RegionInfo { - region := bs.cluster.GetRegion(bs.srcPeerStat.ID()) + region := bs.cluster.GetRegion(bs.cur.srcPeerStat.ID()) if !bs.isRegionAvailable(region) { return nil } switch bs.opTy { case movePeer: - srcPeer := region.GetStorePeer(bs.srcStoreID) + srcPeer := region.GetStorePeer(bs.cur.srcStoreID) if srcPeer == nil { - log.Debug("region does not have a peer on source store, maybe stat out of date", zap.Uint64("region-id", bs.srcPeerStat.ID())) + log.Debug("region does not have a peer on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID())) return nil } case transferLeader: - if region.GetLeader().GetStoreId() != bs.srcStoreID { - log.Debug("region leader is not on source store, maybe stat out of date", zap.Uint64("region-id", bs.srcPeerStat.ID())) + if region.GetLeader().GetStoreId() != bs.cur.srcStoreID { + log.Debug("region leader is not on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID())) return nil } default: @@ -572,7 +644,7 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo { return region } -func (bs *balanceSolver) getDstCandidateIDs() map[uint64]struct{} { +func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { var ( filters []filter.Filter candidates []*core.StoreInfo @@ -582,18 +654,18 @@ func (bs *balanceSolver) getDstCandidateIDs() map[uint64]struct{} { case movePeer: var scoreGuard filter.Filter if bs.cluster.IsPlacementRulesEnabled() { - scoreGuard = filter.NewRuleFitFilter(bs.sche.GetName(), bs.cluster, bs.region, bs.srcStoreID) + scoreGuard = filter.NewRuleFitFilter(bs.sche.GetName(), bs.cluster, bs.cur.region, bs.cur.srcStoreID) } else { - srcStore := bs.cluster.GetStore(bs.srcStoreID) + srcStore := bs.cluster.GetStore(bs.cur.srcStoreID) if srcStore == nil { return nil } - scoreGuard = filter.NewDistinctScoreFilter(bs.sche.GetName(), bs.cluster.GetLocationLabels(), bs.cluster.GetRegionStores(bs.region), srcStore) + scoreGuard = filter.NewDistinctScoreFilter(bs.sche.GetName(), bs.cluster.GetLocationLabels(), bs.cluster.GetRegionStores(bs.cur.region), srcStore) } filters = []filter.Filter{ filter.StoreStateFilter{ActionScope: bs.sche.GetName(), MoveRegion: true}, - filter.NewExcludedFilter(bs.sche.GetName(), bs.region.GetStoreIds(), bs.region.GetStoreIds()), + filter.NewExcludedFilter(bs.sche.GetName(), bs.cur.region.GetStoreIds(), bs.cur.region.GetStoreIds()), filter.NewHealthFilter(bs.sche.GetName()), scoreGuard, } @@ -606,184 +678,264 @@ func (bs *balanceSolver) getDstCandidateIDs() map[uint64]struct{} { filter.NewHealthFilter(bs.sche.GetName()), } - candidates = bs.cluster.GetFollowerStores(bs.region) + candidates = bs.cluster.GetFollowerStores(bs.cur.region) default: return nil } - ret := make(map[uint64]struct{}, len(candidates)) + ret := make(map[uint64]*storeLoadDetail, len(candidates)) for _, store := range candidates { if !filter.Target(bs.cluster, store, filters) { - ret[store.GetID()] = struct{}{} + ret[store.GetID()] = bs.stLoadDetail[store.GetID()] } } return ret } -func (bs *balanceSolver) selectDstStoreID(candidateIDs map[uint64]struct{}) uint64 { - candidateLoadDetail := make(map[uint64]*storeLoadDetail, len(candidateIDs)) - for id := range candidateIDs { - candidateLoadDetail[id] = bs.stLoadDetail[id] - } - switch bs.opTy { - case movePeer: - return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID]) - case transferLeader: - if bs.rwTy == write { - return selectDstStoreByCount(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID]) +// calcProgressiveRank calculates `bs.cur.progressiveRank`. +// See the comments of `solution.progressiveRank` for more about progressive rank. +func (bs *balanceSolver) calcProgressiveRank() { + srcLd := bs.stLoadDetail[bs.cur.srcStoreID].LoadPred.min() + dstLd := bs.stLoadDetail[bs.cur.dstStoreID].LoadPred.max() + peer := bs.cur.srcPeerStat + rank := int64(0) + if bs.rwTy == write && bs.opTy == transferLeader { + // In this condition, CPU usage is the matter. + // Only consider about count and key rate. + if srcLd.Count > dstLd.Count && + srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() { + rank = -1 + } + } else { + keyDecRatio := (dstLd.KeyRate + peer.GetKeyRate()) / (srcLd.KeyRate + 1) + keyHot := peer.GetKeyRate() >= minHotKeyRate + byteDecRatio := (dstLd.ByteRate + peer.GetByteRate()) / (srcLd.ByteRate + 1) + byteHot := peer.GetByteRate() > minHotByteRate + switch { + case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio: + // Both byte rate and key rate are balanced, the best choice. + rank = -3 + case byteDecRatio <= minorDecRatio && keyHot && keyDecRatio <= greatDecRatio: + // Byte rate is not worsened, key rate is balanced. + rank = -2 + case byteHot && byteDecRatio <= greatDecRatio: + // Byte rate is balanced, ignore the key rate. + rank = -1 } - return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID]) - default: - return 0 } + bs.cur.progressiveRank = rank } -func (bs *balanceSolver) isReadyToBuild() bool { - if bs.srcStoreID == 0 || bs.dstStoreID == 0 || - bs.srcPeerStat == nil || bs.region == nil { - return false +// betterThan checks if `bs.cur` is a better solution than `old`. +func (bs *balanceSolver) betterThan(old *solution) bool { + if old == nil { + return true } - if bs.srcStoreID != bs.srcPeerStat.StoreID || - bs.region.GetID() != bs.srcPeerStat.ID() { + + switch { + case bs.cur.progressiveRank < old.progressiveRank: + return true + case bs.cur.progressiveRank > old.progressiveRank: return false } - return true -} -func (bs *balanceSolver) buildOperators() []*operator.Operator { - if !bs.isReadyToBuild() { - return nil + if r := bs.compareSrcStore(bs.cur.srcStoreID, old.srcStoreID); r < 0 { + return true + } else if r > 0 { + return false } - var ( - op *operator.Operator - err error - ) - switch bs.opTy { - case movePeer: - srcPeer := bs.region.GetStorePeer(bs.srcStoreID) // checked in getRegionAndSrcPeer - dstPeer := &metapb.Peer{StoreId: bs.dstStoreID, IsLearner: srcPeer.IsLearner} - bs.sche.peerLimit = bs.sche.adjustBalanceLimit(bs.srcStoreID, bs.stLoadDetail) - op, err = operator.CreateMovePeerOperator("move-hot-"+bs.rwTy.String()+"-region", bs.cluster, bs.region, operator.OpHotRegion, bs.srcStoreID, dstPeer) - case transferLeader: - if bs.region.GetStoreVoter(bs.dstStoreID) == nil { - return nil - } - bs.sche.leaderLimit = bs.sche.adjustBalanceLimit(bs.srcStoreID, bs.stLoadDetail) - op, err = operator.CreateTransferLeaderOperator("transfer-hot-"+bs.rwTy.String()+"-leader", bs.cluster, bs.region, bs.srcStoreID, bs.dstStoreID, operator.OpHotRegion) + if r := bs.compareDstStore(bs.cur.dstStoreID, old.dstStoreID); r < 0 { + return true + } else if r > 0 { + return false } - if err != nil { - log.Debug("fail to create operator", zap.Error(err), zap.Stringer("opType", bs.opTy), zap.Stringer("rwType", bs.rwTy)) - schedulerCounter.WithLabelValues(bs.sche.GetName(), "create-operator-fail").Inc() - return nil - } + if bs.cur.srcPeerStat != old.srcPeerStat { + // compare region - op.SetPriorityLevel(core.HighPriority) - op.Counters = append(op.Counters, - schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"), - schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String())) + if bs.rwTy == write && bs.opTy == transferLeader { + switch { + case bs.cur.srcPeerStat.GetKeyRate() > old.srcPeerStat.GetKeyRate(): + return true + case bs.cur.srcPeerStat.GetKeyRate() < old.srcPeerStat.GetKeyRate(): + return false + } + } else { + byteRkCmp := rankCmp(bs.cur.srcPeerStat.GetByteRate(), old.srcPeerStat.GetByteRate(), stepRank(0, 100)) + keyRkCmp := rankCmp(bs.cur.srcPeerStat.GetKeyRate(), old.srcPeerStat.GetKeyRate(), stepRank(0, 10)) - infl := Influence{ByteRate: bs.srcPeerStat.GetByteRate()} - if bs.opTy == transferLeader && bs.rwTy == write { - infl.ByteRate = 0 + switch bs.cur.progressiveRank { + case -2: // greatDecRatio < byteDecRatio <= minorDecRatio && keyDecRatio <= greatDecRatio + if keyRkCmp != 0 { + return keyRkCmp > 0 + } + if byteRkCmp != 0 { + // prefer smaller byte rate, to reduce oscillation + return byteRkCmp < 0 + } + case -3: // byteDecRatio <= greatDecRatio && keyDecRatio <= greatDecRatio + if keyRkCmp != 0 { + return keyRkCmp > 0 + } + fallthrough + case -1: // byteDecRatio <= greatDecRatio + if byteRkCmp != 0 { + // prefer region with larger byte rate, to converge faster + return byteRkCmp > 0 + } + } + } } - bs.sche.addPendingInfluence(op, bs.srcStoreID, bs.dstStoreID, infl, bs.rwTy, bs.opTy) - - return []*operator.Operator{op} -} -// Sort stores according to their load prediction. -func sortStores(loadDetail map[uint64]*storeLoadDetail, better func(lp1, lp2 *storeLoadPred) bool) []uint64 { - ids := make([]uint64, 0, len(loadDetail)) - for id := range loadDetail { - ids = append(ids, id) - } - sort.Slice(ids, func(i, j int) bool { - id1, id2 := ids[i], ids[j] - return better(loadDetail[id1].LoadPred, loadDetail[id2].LoadPred) - }) - return ids + return false } -// Prefer store with larger `count`. -func selectSrcStoreByCount(loadDetail map[uint64]*storeLoadDetail) uint64 { - stores := sortStores(loadDetail, func(lp1, lp2 *storeLoadPred) bool { - ld1, ld2 := lp1.min(), lp2.min() - if ld1.Count > ld2.Count || - (ld1.Count == ld2.Count && ld1.ByteRate > ld2.ByteRate) { - return true +// smaller is better +func (bs *balanceSolver) compareSrcStore(st1, st2 uint64) int { + if st1 != st2 { + // compare source store + var lpCmp storeLPCmp + if bs.rwTy == write && bs.opTy == transferLeader { + lpCmp = sliceLPCmp( + minLPCmp(negLoadCmp(sliceLoadCmp( + stLdRankCmp(stLdCount, stepRank(bs.maxSrc.Count, bs.rankStep.Count)), + stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.KeyRate, bs.rankStep.KeyRate)), + stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.ByteRate, bs.rankStep.ByteRate)), + ))), + diffCmp(sliceLoadCmp( + stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)), + stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.KeyRate)), + stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.ByteRate)), + )), + ) + } else { + lpCmp = sliceLPCmp( + minLPCmp(negLoadCmp(sliceLoadCmp( + stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.ByteRate, bs.rankStep.ByteRate)), + stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.KeyRate, bs.rankStep.KeyRate)), + ))), + diffCmp( + stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.ByteRate)), + ), + ) } - return false - }) - if len(stores) > 0 && loadDetail[stores[0]].LoadPred.Current.Count > 1 { - return stores[0] + lp1 := bs.stLoadDetail[st1].LoadPred + lp2 := bs.stLoadDetail[st2].LoadPred + return lpCmp(lp1, lp2) } return 0 } -// Prefer store with larger `byteRate`. -func selectSrcStoreByByteRate(loadDetail map[uint64]*storeLoadDetail) uint64 { - stores := sortStores(loadDetail, func(lp1, lp2 *storeLoadPred) bool { - ld1, ld2 := lp1.min(), lp2.min() - if ld1.ByteRate > ld2.ByteRate || - (ld1.ByteRate == ld2.ByteRate && ld1.Count > ld2.Count) { - return true +// smaller is better +func (bs *balanceSolver) compareDstStore(st1, st2 uint64) int { + if st1 != st2 { + // compare destination store + var lpCmp storeLPCmp + if bs.rwTy == write && bs.opTy == transferLeader { + lpCmp = sliceLPCmp( + maxLPCmp(sliceLoadCmp( + stLdRankCmp(stLdCount, stepRank(bs.minDst.Count, bs.rankStep.Count)), + stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.KeyRate, bs.rankStep.KeyRate)), + stLdRankCmp(stLdByteRate, stepRank(bs.minDst.ByteRate, bs.rankStep.ByteRate)), + )), + diffCmp(sliceLoadCmp( + stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)), + stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.KeyRate)), + stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.ByteRate)), + ))) + } else { + lpCmp = sliceLPCmp( + maxLPCmp(sliceLoadCmp( + stLdRankCmp(stLdByteRate, stepRank(bs.minDst.ByteRate, bs.rankStep.ByteRate)), + stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.KeyRate, bs.rankStep.KeyRate)), + )), + diffCmp( + stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.ByteRate)), + ), + ) } - return false - }) - for _, id := range stores { - if loadDetail[id].LoadPred.Current.Count > 1 { - return id - } + lp1 := bs.stLoadDetail[st1].LoadPred + lp2 := bs.stLoadDetail[st2].LoadPred + return lpCmp(lp1, lp2) } return 0 } -// Prefer store with smaller `count`. -func selectDstStoreByCount(candidates map[uint64]*storeLoadDetail, regionBytesRate float64, srcLoadDetail *storeLoadDetail) uint64 { - stores := sortStores(candidates, func(lp1, lp2 *storeLoadPred) bool { - ld1, ld2 := lp1.max(), lp2.max() - if ld1.Count < ld2.Count || - (ld1.Count == ld2.Count && ld1.ByteRate < ld2.ByteRate) { - return true - } - return false - }) - - srcLoad := srcLoadDetail.LoadPred.min() - for _, id := range stores { - dstLoad := candidates[id].LoadPred.max() - if srcLoad.Count-1 >= dstLoad.Count+1 && - srcLoad.ByteRate*hotRegionScheduleFactor > dstLoad.ByteRate+regionBytesRate { - return id - } +func stepRank(rk0 float64, step float64) func(float64) int64 { + return func(rate float64) int64 { + return int64((rate - rk0) / step) } - return 0 } -// Prefer store with smaller `byteRate`. -func selectDstStoreByByteRate(candidates map[uint64]*storeLoadDetail, regionBytesRate float64, srcLoadDetail *storeLoadDetail) uint64 { - stores := sortStores(candidates, func(lp1, lp2 *storeLoadPred) bool { - ld1, ld2 := lp1.max(), lp2.max() - if ld1.ByteRate < ld2.ByteRate || - (ld1.ByteRate == ld2.ByteRate && ld1.Count < ld2.Count) { - return true - } +func (bs *balanceSolver) isReadyToBuild() bool { + if bs.cur.srcStoreID == 0 || bs.cur.dstStoreID == 0 || + bs.cur.srcPeerStat == nil || bs.cur.region == nil { return false - }) + } + if bs.cur.srcStoreID != bs.cur.srcPeerStat.StoreID || + bs.cur.region.GetID() != bs.cur.srcPeerStat.ID() { + return false + } + return true +} - srcLoad := srcLoadDetail.LoadPred.min() - for _, id := range stores { - dstLoad := candidates[id].LoadPred.max() - if srcLoad.ByteRate*hotRegionScheduleFactor > dstLoad.ByteRate+regionBytesRate { - return id +func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { + if !bs.isReadyToBuild() { + return nil, nil + } + var ( + op *operator.Operator + err error + ) + + switch bs.opTy { + case movePeer: + srcPeer := bs.cur.region.GetStorePeer(bs.cur.srcStoreID) // checked in getRegionAndSrcPeer + dstPeer := &metapb.Peer{StoreId: bs.cur.dstStoreID, IsLearner: srcPeer.IsLearner} + bs.sche.peerLimit = bs.sche.adjustBalanceLimit(bs.cur.srcStoreID, bs.stLoadDetail) + op, err = operator.CreateMovePeerOperator( + "move-hot-"+bs.rwTy.String()+"-region", + bs.cluster, + bs.cur.region, + operator.OpHotRegion, + bs.cur.srcStoreID, + dstPeer) + case transferLeader: + if bs.cur.region.GetStoreVoter(bs.cur.dstStoreID) == nil { + return nil, nil } + bs.sche.leaderLimit = bs.sche.adjustBalanceLimit(bs.cur.srcStoreID, bs.stLoadDetail) + op, err = operator.CreateTransferLeaderOperator( + "transfer-hot-"+bs.rwTy.String()+"-leader", + bs.cluster, + bs.cur.region, + bs.cur.srcStoreID, + bs.cur.dstStoreID, + operator.OpHotRegion) } - return 0 + + if err != nil { + log.Debug("fail to create operator", zap.Error(err), zap.Stringer("rwType", bs.rwTy), zap.Stringer("opType", bs.opTy)) + schedulerCounter.WithLabelValues(bs.sche.GetName(), "create-operator-fail").Inc() + return nil, nil + } + + op.SetPriorityLevel(core.HighPriority) + op.Counters = append(op.Counters, + schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"), + schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String())) + + infl := Influence{ + ByteRate: bs.cur.srcPeerStat.GetByteRate(), + KeyRate: bs.cur.srcPeerStat.GetKeyRate(), + Count: 1, + } + + return []*operator.Operator{op}, []Influence{infl} } func (h *hotScheduler) adjustBalanceLimit(storeID uint64, loadDetail map[uint64]*storeLoadDetail) uint64 { @@ -803,8 +955,8 @@ func (h *hotScheduler) adjustBalanceLimit(storeID uint64, loadDetail map[uint64] func (h *hotScheduler) GetHotReadStatus() *statistics.StoreHotPeersInfos { h.RLock() defer h.RUnlock() - asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos.ReadLeaders)) - for id, detail := range h.stLoadInfos.ReadLeaders { + asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[readLeader])) + for id, detail := range h.stLoadInfos[readLeader] { asLeader[id] = detail.toHotPeersStat() } return &statistics.StoreHotPeersInfos{ @@ -815,12 +967,12 @@ func (h *hotScheduler) GetHotReadStatus() *statistics.StoreHotPeersInfos { func (h *hotScheduler) GetHotWriteStatus() *statistics.StoreHotPeersInfos { h.RLock() defer h.RUnlock() - asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos.WriteLeaders)) - asPeer := make(statistics.StoreHotPeersStat, len(h.stLoadInfos.WritePeers)) - for id, detail := range h.stLoadInfos.WriteLeaders { + asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[writeLeader])) + asPeer := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[writePeer])) + for id, detail := range h.stLoadInfos[writeLeader] { asLeader[id] = detail.toHotPeersStat() } - for id, detail := range h.stLoadInfos.WritePeers { + for id, detail := range h.stLoadInfos[writePeer] { asPeer[id] = detail.toHotPeersStat() } return &statistics.StoreHotPeersInfos{ @@ -830,22 +982,17 @@ func (h *hotScheduler) GetHotWriteStatus() *statistics.StoreHotPeersInfos { } func (h *hotScheduler) GetWritePendingInfluence() map[uint64]Influence { - return h.copyPendingInfluence(write) + return h.copyPendingInfluence(writePeer) } func (h *hotScheduler) GetReadPendingInfluence() map[uint64]Influence { - return h.copyPendingInfluence(read) + return h.copyPendingInfluence(readLeader) } -func (h *hotScheduler) copyPendingInfluence(typ rwType) map[uint64]Influence { +func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]Influence { h.RLock() defer h.RUnlock() - var pendingSum map[uint64]Influence - if typ == read { - pendingSum = h.readPendingSum - } else { - pendingSum = h.writePendingSum - } + pendingSum := h.pendingSums[ty] ret := make(map[uint64]Influence, len(pendingSum)) for id, infl := range pendingSum { ret[id] = infl @@ -875,13 +1022,21 @@ func calcPendingWeight(op *operator.Operator) float64 { } func (h *hotScheduler) clearPendingInfluence() { - h.readPendings = map[*pendingInfluence]struct{}{} - h.writePendings = map[*pendingInfluence]struct{}{} - h.readPendingSum = nil - h.writePendingSum = nil + for ty := resourceType(0); ty < resourceTypeLen; ty++ { + h.pendings[ty] = map[*pendingInfluence]struct{}{} + h.pendingSums[ty] = nil + } h.regionPendings = make(map[uint64][2]*operator.Operator) } +// rwType : the perspective of balance +type rwType int + +const ( + write rwType = iota + read +) + func (rw rwType) String() string { switch rw { case read: @@ -893,6 +1048,13 @@ func (rw rwType) String() string { } } +type opType int + +const ( + movePeer opType = iota + transferLeader +) + func (ty opType) String() string { switch ty { case movePeer: @@ -903,3 +1065,27 @@ func (ty opType) String() string { return "" } } + +type resourceType int + +const ( + writePeer resourceType = iota + writeLeader + readLeader + resourceTypeLen +) + +func toResourceType(rwTy rwType, opTy opType) resourceType { + switch rwTy { + case write: + switch opTy { + case movePeer: + return writePeer + case transferLeader: + return writeLeader + } + case read: + return readLeader + } + panic(fmt.Sprintf("invalid arguments for toResourceType: rwTy = %v, opTy = %v", rwTy, opTy)) +} diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 57098fc9476..d31c438e90a 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -114,7 +114,7 @@ func newTestRegion(id uint64) *core.RegionInfo { type testHotWriteRegionSchedulerSuite struct{} -func (s *testHotWriteRegionSchedulerSuite) TestSchedule(c *C) { +func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnly(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() statistics.Denoising = false @@ -125,12 +125,12 @@ func (s *testHotWriteRegionSchedulerSuite) TestSchedule(c *C) { c.Assert(err, IsNil) opt.HotRegionCacheHitsThreshold = 0 - s.checkSchedule(c, tc, opt, hb) + s.checkByteRateOnly(c, tc, opt, hb) opt.EnablePlacementRules = true - s.checkSchedule(c, tc, opt, hb) + s.checkByteRateOnly(c, tc, opt, hb) } -func (s *testHotWriteRegionSchedulerSuite) checkSchedule(c *C, tc *mockcluster.Cluster, opt *mockoption.ScheduleOptions, hb schedule.Scheduler) { +func (s *testHotWriteRegionSchedulerSuite) checkByteRateOnly(c *C, tc *mockcluster.Cluster, opt *mockoption.ScheduleOptions, hb schedule.Scheduler) { // Add stores 1, 2, 3, 4, 5, 6 with region counts 3, 2, 2, 2, 0, 0. tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) @@ -163,9 +163,11 @@ func (s *testHotWriteRegionSchedulerSuite) checkSchedule(c *C, tc *mockcluster.C //| 2 | 1 | 3 | 4 | 512KB | //| 3 | 1 | 2 | 4 | 512KB | // Region 1, 2 and 3 are hot regions. - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 3, 4) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 4) + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 512 * KB, 0}, + {2, []uint64{1, 3, 4}, 512 * KB, 0}, + {3, []uint64{1, 2, 4}, 512 * KB, 0}, + }) // Will transfer a hot region from store 1, because the total count of peers // which is hot for store 1 is more larger than other stores. @@ -242,11 +244,13 @@ func (s *testHotWriteRegionSchedulerSuite) checkSchedule(c *C, tc *mockcluster.C //| 3 | 6 | 1 | 4 | 512KB | //| 4 | 5 | 6 | 4 | 512KB | //| 5 | 3 | 4 | 5 | 512KB | - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(3, 6, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 4) - tc.AddLeaderRegionWithWriteInfo(4, 5, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 6, 4) - tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 4, 5) + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 512 * KB, 0}, + {2, []uint64{1, 2, 3}, 512 * KB, 0}, + {3, []uint64{6, 1, 4}, 512 * KB, 0}, + {4, []uint64{5, 6, 4}, 512 * KB, 0}, + {5, []uint64{3, 4, 5}, 512 * KB, 0}, + }) // 6 possible operator. // Assuming different operators have the same possibility, @@ -287,81 +291,184 @@ func (s *testHotWriteRegionSchedulerSuite) checkSchedule(c *C, tc *mockcluster.C hb.(*hotScheduler).clearPendingInfluence() } -func (s *testHotWriteRegionSchedulerSuite) TestWithPendingInfluence(c *C) { +func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() statistics.Denoising = false opt := mockoption.NewScheduleOptions() - tc := mockcluster.NewCluster(opt) hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) opt.HotRegionCacheHitsThreshold = 0 - opt.LeaderScheduleLimit = 0 + tc := mockcluster.NewCluster(opt) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) tc.AddRegionStore(4, 20) + tc.AddRegionStore(5, 20) + + tc.UpdateStorageWrittenBytes(1, 10.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(3, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval) + + tc.UpdateStorageWrittenKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval) + + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, + {2, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, + {3, []uint64{2, 4, 3}, 0.05 * MB, 0.1 * MB}, + }) + + for i := 0; i < 100; i++ { + hb.(*hotScheduler).clearPendingInfluence() + op := hb.Schedule(tc)[0] + // byteDecRatio <= 0.95 && keyDecRatio <= 0.95 + testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 4) + // store byte rate (min, max): (10, 10.5) | 9.5 | 9.5 | (9, 9.5) | 8.9 + // store key rate (min, max): (9.5, 10) | 9.5 | 9.8 | (9, 9.5) | 9.2 + + op = hb.Schedule(tc)[0] + // byteDecRatio <= 0.99 && keyDecRatio <= 0.95 + testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 3, 5) + // store byte rate (min, max): (10, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 8.95) + // store key rate (min, max): (9.5, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3) + + op = hb.Schedule(tc)[0] + // byteDecRatio <= 0.95 + testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 5) + // store byte rate (min, max): (9.5, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 9.45) + // store key rate (min, max): (9, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8) + } +} - //| store_id | write_bytes_rate | - //|----------|------------------| - //| 1 | 8MB | - //| 2 | 6MB | - //| 3 | 6MB | - //| 4 | 4MB | - tc.UpdateStorageWrittenBytes(1, 8*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(2, 6*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(4, 4*MB*statistics.StoreHeartBeatReportInterval) +func (s *testHotWriteRegionSchedulerSuite) TestLeader(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := mockoption.NewScheduleOptions() + hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + opt.HotRegionCacheHitsThreshold = 0 - //| region_id | leader_store | follower_store | follower_store | written_bytes | - //|-----------|--------------|----------------|----------------|---------------| - //| 1 | 1 | 2 | 3 | 512KB | - //| 2 | 1 | 2 | 3 | 512KB | - //| 3 | 1 | 2 | 3 | 512KB | - //| 4 | 1 | 2 | 3 | 512KB | - //| 5 | 1 | 2 | 3 | 512KB | - //| 6 | 1 | 2 | 3 | 512KB | - // All regions are hot. - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(4, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(5, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(6, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc := mockcluster.NewCluster(opt) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) - for i := 0; i < 20; i++ { + tc.UpdateStorageWrittenBytes(1, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(3, 10*MB*statistics.StoreHeartBeatReportInterval) + + tc.UpdateStorageWrittenKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenKeys(2, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenKeys(3, 10*MB*statistics.StoreHeartBeatReportInterval) + + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB}, + {2, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB}, + {3, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, + {4, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, + {5, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, + {6, []uint64{3, 1, 2}, 0.5 * MB, 1 * MB}, + {7, []uint64{3, 1, 2}, 0.5 * MB, 1 * MB}, + }) + + for i := 0; i < 100; i++ { hb.(*hotScheduler).clearPendingInfluence() - cnt := 0 - testLoop: - for j := 0; j < 1000; j++ { - c.Assert(cnt, LessEqual, 5) - emptyCnt := 0 - ops := hb.Schedule(tc) - for len(ops) == 0 { - emptyCnt++ - if emptyCnt >= 10 { - break testLoop + op := hb.Schedule(tc)[0] + testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 2) + + c.Assert(hb.Schedule(tc), HasLen, 0) + } +} + +func (s *testHotWriteRegionSchedulerSuite) TestWithPendingInfluence(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := mockoption.NewScheduleOptions() + hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + opt.HotRegionCacheHitsThreshold = 0 + opt.LeaderScheduleLimit = 0 + + for i := 0; i < 2; i++ { + // 0: byte rate + // 1: key rate + tc := mockcluster.NewCluster(opt) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + tc.AddRegionStore(4, 20) + + updateStore := tc.UpdateStorageWrittenBytes // byte rate + if i == 1 { // key rate + updateStore = tc.UpdateStorageWrittenKeys + } + updateStore(1, 8*MB*statistics.StoreHeartBeatReportInterval) + updateStore(2, 6*MB*statistics.StoreHeartBeatReportInterval) + updateStore(3, 6*MB*statistics.StoreHeartBeatReportInterval) + updateStore(4, 4*MB*statistics.StoreHeartBeatReportInterval) + + if i == 0 { // byte rate + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 512 * KB, 0}, + {2, []uint64{1, 2, 3}, 512 * KB, 0}, + {3, []uint64{1, 2, 3}, 512 * KB, 0}, + {4, []uint64{1, 2, 3}, 512 * KB, 0}, + {5, []uint64{1, 2, 3}, 512 * KB, 0}, + {6, []uint64{1, 2, 3}, 512 * KB, 0}, + }) + } else if i == 1 { // key rate + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 0, 512 * KB}, + {2, []uint64{1, 2, 3}, 0, 512 * KB}, + {3, []uint64{1, 2, 3}, 0, 512 * KB}, + {4, []uint64{1, 2, 3}, 0, 512 * KB}, + {5, []uint64{1, 2, 3}, 0, 512 * KB}, + {6, []uint64{1, 2, 3}, 0, 512 * KB}, + }) + } + + for i := 0; i < 20; i++ { + hb.(*hotScheduler).clearPendingInfluence() + cnt := 0 + testLoop: + for j := 0; j < 1000; j++ { + c.Assert(cnt, LessEqual, 5) + emptyCnt := 0 + ops := hb.Schedule(tc) + for len(ops) == 0 { + emptyCnt++ + if emptyCnt >= 10 { + break testLoop + } + ops = hb.Schedule(tc) } - ops = hb.Schedule(tc) - } - op := ops[0] - switch op.Len() { - case 1: - // balance by leader selected - testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 1) - case 4: - // balance by peer selected - testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 4) - cnt++ - if cnt == 3 { - c.Assert(op.Cancel(), IsTrue) + op := ops[0] + switch op.Len() { + case 1: + // balance by leader selected + testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 1) + case 4: + // balance by peer selected + testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 4) + cnt++ + if cnt == 3 { + c.Assert(op.Cancel(), IsTrue) + } + default: + c.Fatalf("wrong op: %v", op) } - default: - c.Fatalf("wrong op: %v", op) } + c.Assert(cnt, Equals, 5) } - c.Assert(cnt, Equals, 5) } } @@ -369,7 +476,7 @@ var _ = Suite(&testHotReadRegionSchedulerSuite{}) type testHotReadRegionSchedulerSuite struct{} -func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { +func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := mockoption.NewScheduleOptions() @@ -388,12 +495,12 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { //| store_id | read_bytes_rate | //|----------|-----------------| //| 1 | 7.5MB | - //| 2 | 4.6MB | + //| 2 | 4.9MB | //| 3 | 4.5MB | //| 4 | 6MB | //| 5 | 0MB | tc.UpdateStorageReadBytes(1, 7.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(2, 4.6*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(2, 4.9*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(4, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(5, 0) @@ -403,13 +510,14 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { //| 1 | 1 | 2 | 3 | 512KB | //| 2 | 2 | 1 | 3 | 512KB | //| 3 | 1 | 2 | 3 | 512KB | - //| 11 | 1 | 2 | 3 | 24KB | + //| 11 | 1 | 2 | 3 | 7KB | // Region 1, 2 and 3 are hot regions. - tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) - tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - // lower than hot read flow rate, but higher than write flow rate - tc.AddLeaderRegionWithReadInfo(11, 1, 7*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + addRegionInfo(tc, read, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 512 * KB, 0}, + {2, []uint64{2, 1, 3}, 512 * KB, 0}, + {3, []uint64{1, 2, 3}, 512 * KB, 0}, + {11, []uint64{1, 2, 3}, 7 * KB, 0}, + }) c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) c.Assert(tc.IsRegionHot(tc.GetRegion(11)), IsFalse) @@ -426,14 +534,10 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { } } - // Will transfer a hot region leader from store 1 to store 3. - // bytes_rate[store 1] * 0.9 > bytes_rate[store 3] + region_bytes_rate - // read_bytes_rate[store 3] < read_bytes_rate[store 2] - // when select dest store for hot read, we use score. testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3) hb.(*hotScheduler).clearPendingInfluence() // assume handle the operator - tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 2) + tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2}) // After transfer a hot region leader from store 1 to store 3 // the three region leader will be evenly distributed in three stores @@ -441,13 +545,13 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { //|----------|-----------------| //| 1 | 6MB | //| 2 | 5.5MB | - //| 3 | 6MB | - //| 4 | 3.1MB | + //| 3 | 5.5MB | + //| 4 | 3.4MB | //| 5 | 3MB | tc.UpdateStorageReadBytes(1, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(2, 5.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(4, 3.1*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(3, 5.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(4, 3.4*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(5, 3*MB*statistics.StoreHeartBeatReportInterval) //| region_id | leader_store | follower_store | follower_store | read_bytes_rate | @@ -458,15 +562,12 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { //| 4 | 1 | 2 | 3 | 512KB | //| 5 | 4 | 2 | 5 | 512KB | //| 11 | 1 | 2 | 3 | 24KB | - tc.AddLeaderRegionWithReadInfo(4, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(5, 4, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 5) + addRegionInfo(tc, read, []testRegionInfo{ + {4, []uint64{1, 2, 3}, 512 * KB, 0}, + {5, []uint64{4, 2, 5}, 512 * KB, 0}, + }) // We will move leader peer of region 1 from 1 to 5 - // Store 1 will be selected as source store (max rate, count > store 3 count). - // When trying to transfer leader: - // Store 2 and store 3 are also hot, failed. - // Trying to move leader peer: - // Store 5 is selected as destination because of less hot region count. testutil.CheckTransferPeerWithLeaderTransfer(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 5) hb.(*hotScheduler).clearPendingInfluence() @@ -478,73 +579,154 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { hb.(*hotScheduler).clearPendingInfluence() } -func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { +func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + statistics.Denoising = false opt := mockoption.NewScheduleOptions() - tc := mockcluster.NewCluster(opt) hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) opt.HotRegionCacheHitsThreshold = 0 + tc := mockcluster.NewCluster(opt) tc.AddRegionStore(1, 20) tc.AddRegionStore(2, 20) tc.AddRegionStore(3, 20) tc.AddRegionStore(4, 20) + tc.AddRegionStore(5, 20) + + tc.UpdateStorageReadBytes(1, 10.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(2, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(3, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval) + + tc.UpdateStorageReadKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval) + + addRegionInfo(tc, read, []testRegionInfo{ + {1, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB}, + {2, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB}, + {3, []uint64{3, 4, 5}, 0.05 * MB, 0.1 * MB}, + }) + + for i := 0; i < 100; i++ { + hb.(*hotScheduler).clearPendingInfluence() + op := hb.Schedule(tc)[0] + // byteDecRatio <= 0.95 && keyDecRatio <= 0.95 + testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 4) + // store byte rate (min, max): (10, 10.5) | 9.5 | 9.5 | (9, 9.5) | 8.9 + // store key rate (min, max): (9.5, 10) | 9.5 | 9.8 | (9, 9.5) | 9.2 + + op = hb.Schedule(tc)[0] + // byteDecRatio <= 0.99 && keyDecRatio <= 0.95 + testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 3, 5) + // store byte rate (min, max): (10, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 8.95) + // store key rate (min, max): (9.5, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3) + + op = hb.Schedule(tc)[0] + // byteDecRatio <= 0.95 + testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 5) + // store byte rate (min, max): (9.5, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 9.45) + // store key rate (min, max): (9, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8) + } +} - //| store_id | write_bytes_rate | - //|----------|------------------| - //| 1 | 7.1MB | - //| 2 | 6.1MB | - //| 3 | 6MB | - //| 4 | 5MB | - tc.UpdateStorageReadBytes(1, 7.1*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(2, 6.1*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(4, 5*MB*statistics.StoreHeartBeatReportInterval) +func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opt := mockoption.NewScheduleOptions() + hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + opt.HotRegionCacheHitsThreshold = 0 - //| region_id | leader_store | follower_store | follower_store | read_bytes_rate | - //|-----------|--------------|----------------|----------------|--------------------| - //| 1 | 1 | 2 | 3 | 512KB | - //| 2 | 1 | 2 | 3 | 512KB | - //| 3 | 1 | 2 | 3 | 512KB | - //| 4 | 1 | 2 | 3 | 512KB | - //| 5 | 2 | 1 | 3 | 512KB | - //| 6 | 2 | 1 | 3 | 512KB | - //| 7 | 3 | 1 | 2 | 512KB | - //| 8 | 3 | 1 | 2 | 512KB | - tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(4, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(5, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) - tc.AddLeaderRegionWithReadInfo(6, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) - tc.AddLeaderRegionWithReadInfo(7, 3, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 2) - tc.AddLeaderRegionWithReadInfo(8, 3, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 2) + for i := 0; i < 2; i++ { + // 0: byte rate + // 1: key rate + tc := mockcluster.NewCluster(opt) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + tc.AddRegionStore(4, 20) + + updateStore := tc.UpdateStorageReadBytes // byte rate + if i == 1 { // key rate + updateStore = tc.UpdateStorageReadKeys + } + updateStore(1, 7.1*MB*statistics.StoreHeartBeatReportInterval) + updateStore(2, 6.1*MB*statistics.StoreHeartBeatReportInterval) + updateStore(3, 6*MB*statistics.StoreHeartBeatReportInterval) + updateStore(4, 5*MB*statistics.StoreHeartBeatReportInterval) + + if i == 0 { // byte rate + addRegionInfo(tc, read, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 512 * KB, 0}, + {2, []uint64{1, 2, 3}, 512 * KB, 0}, + {3, []uint64{1, 2, 3}, 512 * KB, 0}, + {4, []uint64{1, 2, 3}, 512 * KB, 0}, + {5, []uint64{2, 1, 3}, 512 * KB, 0}, + {6, []uint64{2, 1, 3}, 512 * KB, 0}, + {7, []uint64{3, 2, 1}, 512 * KB, 0}, + {8, []uint64{3, 2, 1}, 512 * KB, 0}, + }) + } else if i == 1 { // key rate + addRegionInfo(tc, read, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 0, 512 * KB}, + {2, []uint64{1, 2, 3}, 0, 512 * KB}, + {3, []uint64{1, 2, 3}, 0, 512 * KB}, + {4, []uint64{1, 2, 3}, 0, 512 * KB}, + {5, []uint64{2, 1, 3}, 0, 512 * KB}, + {6, []uint64{2, 1, 3}, 0, 512 * KB}, + {7, []uint64{3, 2, 1}, 0, 512 * KB}, + {8, []uint64{3, 2, 1}, 0, 512 * KB}, + }) + } - for i := 0; i < 20; i++ { - hb.(*hotScheduler).clearPendingInfluence() - op1 := hb.Schedule(tc)[0] - testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3) - op2 := hb.Schedule(tc)[0] - testutil.CheckTransferPeerWithLeaderTransfer(c, op2, operator.OpHotRegion, 1, 4) - ops := hb.Schedule(tc) - c.Assert(ops, HasLen, 0) - } - for i := 0; i < 20; i++ { - hb.(*hotScheduler).clearPendingInfluence() - op1 := hb.Schedule(tc)[0] - testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3) - op2 := hb.Schedule(tc)[0] - testutil.CheckTransferPeerWithLeaderTransfer(c, op2, operator.OpHotRegion, 1, 4) - c.Assert(op2.Cancel(), IsTrue) - op2 = hb.Schedule(tc)[0] - testutil.CheckTransferPeerWithLeaderTransfer(c, op2, operator.OpHotRegion, 1, 4) - c.Assert(op1.Cancel(), IsTrue) - op3 := hb.Schedule(tc)[0] - testutil.CheckTransferPeerWithLeaderTransfer(c, op3, operator.OpHotRegion, 1, 4) - ops := hb.Schedule(tc) - c.Assert(ops, HasLen, 0) + for i := 0; i < 20; i++ { + hb.(*hotScheduler).clearPendingInfluence() + + op1 := hb.Schedule(tc)[0] + testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3) + // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5 + + op2 := hb.Schedule(tc)[0] + testutil.CheckTransferPeerWithLeaderTransfer(c, op2, operator.OpHotRegion, 1, 4) + // store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) + + ops := hb.Schedule(tc) + c.Logf("%v", ops) + c.Assert(ops, HasLen, 0) + } + for i := 0; i < 20; i++ { + hb.(*hotScheduler).clearPendingInfluence() + + op1 := hb.Schedule(tc)[0] + testutil.CheckTransferLeader(c, op1, operator.OpLeader, 1, 3) + // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5 + + op2 := hb.Schedule(tc)[0] + testutil.CheckTransferPeerWithLeaderTransfer(c, op2, operator.OpHotRegion, 1, 4) + // store bytekey rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) + c.Assert(op2.Cancel(), IsTrue) + // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | (6, 6.5) | 5 + + op2 = hb.Schedule(tc)[0] + testutil.CheckTransferPeerWithLeaderTransfer(c, op2, operator.OpHotRegion, 1, 4) + // store byte/key rate (min, max): (6.1, 7.1) | 6.1 | (6, 6.5) | (5, 5.5) + + c.Assert(op1.Cancel(), IsTrue) + // store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5) + + op3 := hb.Schedule(tc)[0] + testutil.CheckTransferPeerWithLeaderTransfer(c, op3, operator.OpHotRegion, 1, 4) + // store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6) + + ops := hb.Schedule(tc) + c.Assert(ops, HasLen, 0) + } } } @@ -554,58 +736,44 @@ type testHotCacheSuite struct{} func (s *testHotCacheSuite) TestUpdateCache(c *C) { opt := mockoption.NewScheduleOptions() + opt.HotRegionCacheHitsThreshold = 0 tc := mockcluster.NewCluster(opt) - // Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0. - tc.AddRegionStore(1, 3) - tc.AddRegionStore(2, 2) - tc.AddRegionStore(3, 2) - tc.AddRegionStore(4, 2) - tc.AddRegionStore(5, 0) - - // Report store read bytes. - tc.UpdateStorageReadBytes(1, 7.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(4, 6*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(5, 0) - /// For read flow - tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) - tc.AddLeaderRegionWithReadInfo(3, 1, 20*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - // lower than hot read flow rate, but higher than write flow rate - tc.AddLeaderRegionWithReadInfo(11, 1, 7*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - opt.HotRegionCacheHitsThreshold = 0 + addRegionInfo(tc, read, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 512 * KB, 0}, + {2, []uint64{2, 1, 3}, 512 * KB, 0}, + {3, []uint64{1, 2, 3}, 20 * KB, 0}, + // lower than hot read flow rate, but higher than write flow rate + {11, []uint64{1, 2, 3}, 7 * KB, 0}, + }) stats := tc.RegionStats(statistics.ReadFlow) c.Assert(len(stats[1]), Equals, 2) c.Assert(len(stats[2]), Equals, 1) c.Assert(len(stats[3]), Equals, 0) - tc.AddLeaderRegionWithReadInfo(3, 2, 20*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithReadInfo(11, 1, 7*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + addRegionInfo(tc, read, []testRegionInfo{ + {3, []uint64{2, 1, 3}, 20 * KB, 0}, + {11, []uint64{1, 2, 3}, 7 * KB, 0}, + }) stats = tc.RegionStats(statistics.ReadFlow) - c.Assert(len(stats[1]), Equals, 1) c.Assert(len(stats[2]), Equals, 2) c.Assert(len(stats[3]), Equals, 0) - // For write flow - tc.UpdateStorageWrittenBytes(1, 6*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(2, 3*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(4, 3*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(5, 0) - tc.AddLeaderRegionWithWriteInfo(4, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(5, 1, 20*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(6, 1, 0.8*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - + addRegionInfo(tc, write, []testRegionInfo{ + {4, []uint64{1, 2, 3}, 512 * KB, 0}, + {5, []uint64{1, 2, 3}, 20 * KB, 0}, + {6, []uint64{1, 2, 3}, 0.8 * KB, 0}, + }) stats = tc.RegionStats(statistics.WriteFlow) c.Assert(len(stats[1]), Equals, 2) c.Assert(len(stats[2]), Equals, 2) c.Assert(len(stats[3]), Equals, 2) - tc.AddLeaderRegionWithWriteInfo(5, 1, 20*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 5) + addRegionInfo(tc, write, []testRegionInfo{ + {5, []uint64{1, 2, 5}, 20 * KB, 0}, + }) stats = tc.RegionStats(statistics.WriteFlow) c.Assert(len(stats[1]), Equals, 2) @@ -613,3 +781,141 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { c.Assert(len(stats[3]), Equals, 1) c.Assert(len(stats[5]), Equals, 1) } + +func (s *testHotCacheSuite) TestKeyThresholds(c *C) { + opt := mockoption.NewScheduleOptions() + opt.HotRegionCacheHitsThreshold = 0 + { // only a few regions + tc := mockcluster.NewCluster(opt) + addRegionInfo(tc, read, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 0, 1}, + {2, []uint64{1, 2, 3}, 0, 1 * KB}, + }) + stats := tc.RegionStats(statistics.ReadFlow) + c.Assert(stats[1], HasLen, 1) + addRegionInfo(tc, write, []testRegionInfo{ + {3, []uint64{4, 5, 6}, 0, 1}, + {4, []uint64{4, 5, 6}, 0, 1 * KB}, + }) + stats = tc.RegionStats(statistics.WriteFlow) + c.Assert(stats[4], HasLen, 1) + c.Assert(stats[5], HasLen, 1) + c.Assert(stats[6], HasLen, 1) + } + { // many regions + tc := mockcluster.NewCluster(opt) + regions := []testRegionInfo{} + for i := 1; i <= 1000; i += 2 { + regions = append(regions, testRegionInfo{ + id: uint64(i), + peers: []uint64{1, 2, 3}, + keyRate: 100 * KB, + }) + regions = append(regions, testRegionInfo{ + id: uint64(i + 1), + peers: []uint64{1, 2, 3}, + keyRate: 10 * KB, + }) + } + + { // read + addRegionInfo(tc, read, regions) + stats := tc.RegionStats(statistics.ReadFlow) + c.Assert(len(stats[1]), Greater, 500) + + // for AntiCount + addRegionInfo(tc, read, regions) + addRegionInfo(tc, read, regions) + addRegionInfo(tc, read, regions) + addRegionInfo(tc, read, regions) + stats = tc.RegionStats(statistics.ReadFlow) + c.Assert(len(stats[1]), Equals, 500) + } + { // write + addRegionInfo(tc, write, regions) + stats := tc.RegionStats(statistics.WriteFlow) + c.Assert(len(stats[1]), Greater, 500) + c.Assert(len(stats[2]), Greater, 500) + c.Assert(len(stats[3]), Greater, 500) + + // for AntiCount + addRegionInfo(tc, write, regions) + addRegionInfo(tc, write, regions) + addRegionInfo(tc, write, regions) + addRegionInfo(tc, write, regions) + stats = tc.RegionStats(statistics.WriteFlow) + c.Assert(len(stats[1]), Equals, 500) + c.Assert(len(stats[2]), Equals, 500) + c.Assert(len(stats[3]), Equals, 500) + } + } +} + +func (s *testHotCacheSuite) TestByteAndKey(c *C) { + opt := mockoption.NewScheduleOptions() + opt.HotRegionCacheHitsThreshold = 0 + tc := mockcluster.NewCluster(opt) + regions := []testRegionInfo{} + for i := 1; i <= 500; i++ { + regions = append(regions, testRegionInfo{ + id: uint64(i), + peers: []uint64{1, 2, 3}, + byteRate: 100 * KB, + keyRate: 100 * KB, + }) + } + { // read + addRegionInfo(tc, read, regions) + stats := tc.RegionStats(statistics.ReadFlow) + c.Assert(len(stats[1]), Equals, 500) + + addRegionInfo(tc, read, []testRegionInfo{ + {10001, []uint64{1, 2, 3}, 10 * KB, 10 * KB}, + {10002, []uint64{1, 2, 3}, 500 * KB, 10 * KB}, + {10003, []uint64{1, 2, 3}, 10 * KB, 500 * KB}, + {10004, []uint64{1, 2, 3}, 500 * KB, 500 * KB}, + }) + stats = tc.RegionStats(statistics.ReadFlow) + c.Assert(len(stats[1]), Equals, 503) + } + { // write + addRegionInfo(tc, write, regions) + stats := tc.RegionStats(statistics.WriteFlow) + c.Assert(len(stats[1]), Equals, 500) + c.Assert(len(stats[2]), Equals, 500) + c.Assert(len(stats[3]), Equals, 500) + addRegionInfo(tc, write, []testRegionInfo{ + {10001, []uint64{1, 2, 3}, 10 * KB, 10 * KB}, + {10002, []uint64{1, 2, 3}, 500 * KB, 10 * KB}, + {10003, []uint64{1, 2, 3}, 10 * KB, 500 * KB}, + {10004, []uint64{1, 2, 3}, 500 * KB, 500 * KB}, + }) + stats = tc.RegionStats(statistics.WriteFlow) + c.Assert(len(stats[1]), Equals, 503) + c.Assert(len(stats[2]), Equals, 503) + c.Assert(len(stats[3]), Equals, 503) + } +} + +type testRegionInfo struct { + id uint64 + peers []uint64 + byteRate float64 + keyRate float64 +} + +func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInfo) { + addFunc := tc.AddLeaderRegionWithReadInfo + if rwTy == write { + addFunc = tc.AddLeaderRegionWithWriteInfo + } + for _, r := range regions { + addFunc( + r.id, r.peers[0], + uint64(r.byteRate*statistics.RegionHeartBeatReportInterval), + uint64(r.keyRate*statistics.RegionHeartBeatReportInterval), + statistics.RegionHeartBeatReportInterval, + r.peers[1:], + ) + } +} diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 1d2f8c0fc3d..a1973dc024f 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -381,9 +381,9 @@ func (s *testShuffleHotRegionSchedulerSuite) checkBalance(c *C, tc *mockcluster. //| 1 | 1 | 2 | 3 | 512KB | //| 2 | 1 | 3 | 4 | 512KB | //| 3 | 1 | 2 | 4 | 512KB | - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 3, 4) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 4) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4}) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 4}) opt.HotRegionCacheHitsThreshold = 0 // try to get an operator @@ -421,9 +421,9 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) { tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) - tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2) - tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 1, 3) - tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, statistics.RegionHeartBeatReportInterval, 2, 3) + tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2}) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}) opt.HotRegionCacheHitsThreshold = 0 c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) c.Assert(hb.Schedule(tc), IsNil) diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 1ce82731cb1..314ddd5f0eb 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -75,7 +75,7 @@ type shuffleHotRegionSchedulerConfig struct { // the hot peer. type shuffleHotRegionScheduler struct { *BaseScheduler - stLoadInfos *storeLoadInfos + stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail r *rand.Rand conf *shuffleHotRegionSchedulerConfig types []rwType @@ -84,13 +84,16 @@ type shuffleHotRegionScheduler struct { // newShuffleHotRegionScheduler creates an admin scheduler that random balance hot regions func newShuffleHotRegionScheduler(opController *schedule.OperatorController, conf *shuffleHotRegionSchedulerConfig) schedule.Scheduler { base := NewBaseScheduler(opController) - return &shuffleHotRegionScheduler{ + ret := &shuffleHotRegionScheduler{ BaseScheduler: base, conf: conf, - stLoadInfos: newStoreLoadInfos(), types: []rwType{read, write}, r: rand.New(rand.NewSource(time.Now().UnixNano())), } + for ty := resourceType(0); ty < resourceTypeLen; ty++ { + ret.stLoadInfos[ty] = map[uint64]*storeLoadDetail{} + } + return ret } func (s *shuffleHotRegionScheduler) GetName() string { @@ -122,21 +125,23 @@ func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) [] minHotDegree := cluster.GetHotRegionCacheHitsThreshold() switch typ { case read: - s.stLoadInfos.ReadLeaders = summaryStoresLoad( + s.stLoadInfos[readLeader] = summaryStoresLoad( storesStats.GetStoresBytesReadStat(), + storesStats.GetStoresKeysReadStat(), map[uint64]Influence{}, cluster.RegionReadStats(), minHotDegree, read, core.LeaderKind) - return s.randomSchedule(cluster, s.stLoadInfos.ReadLeaders) + return s.randomSchedule(cluster, s.stLoadInfos[readLeader]) case write: - s.stLoadInfos.WriteLeaders = summaryStoresLoad( + s.stLoadInfos[writeLeader] = summaryStoresLoad( storesStats.GetStoresBytesWriteStat(), + storesStats.GetStoresKeysWriteStat(), map[uint64]Influence{}, cluster.RegionWriteStats(), minHotDegree, write, core.LeaderKind) - return s.randomSchedule(cluster, s.stLoadInfos.WriteLeaders) + return s.randomSchedule(cluster, s.stLoadInfos[writeLeader]) } return nil } diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index a12667873c2..7cce93ce527 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -164,10 +164,14 @@ func getKeyRanges(args []string) ([]core.KeyRange, error) { // Influence records operator influence. type Influence struct { ByteRate float64 + KeyRate float64 + Count float64 } func (infl Influence) add(rhs *Influence, w float64) Influence { infl.ByteRate += rhs.ByteRate * w + infl.KeyRate += rhs.KeyRate * w + infl.Count += rhs.Count * w return infl } @@ -202,58 +206,136 @@ func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*op type storeLoad struct { ByteRate float64 - Count int + KeyRate float64 + Count float64 } func (load *storeLoad) ToLoadPred(infl Influence) *storeLoadPred { future := *load future.ByteRate += infl.ByteRate + future.KeyRate += infl.KeyRate + future.Count += infl.Count return &storeLoadPred{ Current: *load, Future: future, } } +func stLdByteRate(ld *storeLoad) float64 { + return ld.ByteRate +} + +func stLdKeyRate(ld *storeLoad) float64 { + return ld.KeyRate +} + +func stLdCount(ld *storeLoad) float64 { + return ld.Count +} + +type storeLoadCmp func(ld1, ld2 *storeLoad) int + +func negLoadCmp(cmp storeLoadCmp) storeLoadCmp { + return func(ld1, ld2 *storeLoad) int { + return -cmp(ld1, ld2) + } +} + +func sliceLoadCmp(cmps ...storeLoadCmp) storeLoadCmp { + return func(ld1, ld2 *storeLoad) int { + for _, cmp := range cmps { + if r := cmp(ld1, ld2); r != 0 { + return r + } + } + return 0 + } +} + +func stLdRankCmp(dim func(ld *storeLoad) float64, rank func(value float64) int64) storeLoadCmp { + return func(ld1, ld2 *storeLoad) int { + return rankCmp(dim(ld1), dim(ld2), rank) + } +} + +func rankCmp(a, b float64, rank func(value float64) int64) int { + aRk, bRk := rank(a), rank(b) + if aRk < bRk { + return -1 + } else if aRk > bRk { + return 1 + } + return 0 +} + // store load prediction type storeLoadPred struct { Current storeLoad Future storeLoad } -func (lp *storeLoadPred) min() storeLoad { +func (lp *storeLoadPred) min() *storeLoad { return minLoad(&lp.Current, &lp.Future) } -func (lp *storeLoadPred) max() storeLoad { +func (lp *storeLoadPred) max() *storeLoad { return maxLoad(&lp.Current, &lp.Future) } -func minLoad(a, b *storeLoad) storeLoad { - return storeLoad{ - ByteRate: math.Min(a.ByteRate, b.ByteRate), - Count: minInt(a.Count, b.Count), +func (lp *storeLoadPred) diff() *storeLoad { + mx, mn := lp.max(), lp.min() + return &storeLoad{ + ByteRate: mx.ByteRate - mn.ByteRate, + KeyRate: mx.KeyRate - mn.KeyRate, + Count: mx.Count - mn.Count, } } -func maxLoad(a, b *storeLoad) storeLoad { - return storeLoad{ - ByteRate: math.Max(a.ByteRate, b.ByteRate), - Count: maxInt(a.Count, b.Count), +type storeLPCmp func(lp1, lp2 *storeLoadPred) int + +func sliceLPCmp(cmps ...storeLPCmp) storeLPCmp { + return func(lp1, lp2 *storeLoadPred) int { + for _, cmp := range cmps { + if r := cmp(lp1, lp2); r != 0 { + return r + } + } + return 0 } } -func minInt(a, b int) int { - if a < b { - return a +func minLPCmp(ldCmp storeLoadCmp) storeLPCmp { + return func(lp1, lp2 *storeLoadPred) int { + return ldCmp(lp1.min(), lp2.min()) } - return b } -func maxInt(a, b int) int { - if a < b { - return b +func maxLPCmp(ldCmp storeLoadCmp) storeLPCmp { + return func(lp1, lp2 *storeLoadPred) int { + return ldCmp(lp1.max(), lp2.max()) + } +} + +func diffCmp(ldCmp storeLoadCmp) storeLPCmp { + return func(lp1, lp2 *storeLoadPred) int { + return ldCmp(lp1.diff(), lp2.diff()) + } +} + +func minLoad(a, b *storeLoad) *storeLoad { + return &storeLoad{ + ByteRate: math.Min(a.ByteRate, b.ByteRate), + KeyRate: math.Min(a.KeyRate, b.KeyRate), + Count: math.Min(a.Count, b.Count), + } +} + +func maxLoad(a, b *storeLoad) *storeLoad { + return &storeLoad{ + ByteRate: math.Max(a.ByteRate, b.ByteRate), + KeyRate: math.Max(a.KeyRate, b.KeyRate), + Count: math.Max(a.Count, b.Count), } - return a } type storeLoadDetail struct {