diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index f5a9afef17a..bcd49e144e1 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -333,13 +333,14 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en // AddRegionWithReadInfo adds region with specified leader, followers and read info. func (mc *Cluster) AddRegionWithReadInfo( regionID uint64, leaderStoreID uint64, - readBytes, readKeys uint64, + readBytes, readKeys, readQuery uint64, reportInterval uint64, otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat { r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...) r = r.Clone(core.SetReadBytes(readBytes)) r = r.Clone(core.SetReadKeys(readKeys)) r = r.Clone(core.SetReportInterval(reportInterval)) + r = r.Clone(core.SetReadQuery(readQuery)) filledNum := mc.HotCache.GetFilledPeriod(statistics.ReadFlow) if len(filledNums) > 0 { filledNum = filledNums[0] @@ -381,12 +382,13 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderStoreID, targetStor // AddRegionLeaderWithReadInfo add region leader read info func (mc *Cluster) AddRegionLeaderWithReadInfo( regionID uint64, leaderStoreID uint64, - readBytes, readKeys uint64, + readBytes, readKeys, readQuery uint64, reportInterval uint64, otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat { r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...) r = r.Clone(core.SetReadBytes(readBytes)) r = r.Clone(core.SetReadKeys(readKeys)) + r = r.Clone(core.SetReadQuery(readQuery)) r = r.Clone(core.SetReportInterval(reportInterval)) filledNum := mc.HotCache.GetFilledPeriod(statistics.ReadFlow) if len(filledNums) > 0 { @@ -407,13 +409,14 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo( // AddLeaderRegionWithWriteInfo adds region with specified leader and peers write info. func (mc *Cluster) AddLeaderRegionWithWriteInfo( regionID uint64, leaderStoreID uint64, - writtenBytes, writtenKeys uint64, + writtenBytes, writtenKeys, writtenQuery uint64, reportInterval uint64, otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat { r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...) r = r.Clone(core.SetWrittenBytes(writtenBytes)) r = r.Clone(core.SetWrittenKeys(writtenKeys)) r = r.Clone(core.SetReportInterval(reportInterval)) + r = r.Clone(core.SetWrittenQuery(writtenQuery)) filledNum := mc.HotCache.GetFilledPeriod(statistics.WriteFlow) if len(filledNums) > 0 { @@ -491,11 +494,9 @@ func (mc *Cluster) UpdateRegionCount(storeID uint64, regionCount int) { // UpdateSnapshotCount updates store snapshot count. func (mc *Cluster) UpdateSnapshotCount(storeID uint64, snapshotCount int) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.ReceivingSnapCount = uint32(snapshotCount) - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.PutStore(newStore) + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.ReceivingSnapCount = uint32(snapshotCount) + }) } // UpdatePendingPeerCount updates store pending peer count. @@ -507,91 +508,92 @@ func (mc *Cluster) UpdatePendingPeerCount(storeID uint64, pendingPeerCount int) // UpdateStorageRatio updates store storage ratio count. func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio float64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.Capacity = defaultStoreCapacity - newStats.UsedSize = uint64(float64(newStats.Capacity) * usedRatio) - newStats.Available = uint64(float64(newStats.Capacity) * availableRatio) - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.PutStore(newStore) + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.Capacity = defaultStoreCapacity + newStats.UsedSize = uint64(float64(newStats.Capacity) * usedRatio) + newStats.Available = uint64(float64(newStats.Capacity) * availableRatio) + }) } // UpdateStorageWrittenStats updates store written bytes. func (mc *Cluster) UpdateStorageWrittenStats(storeID, bytesWritten, keysWritten uint64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.BytesWritten = bytesWritten - newStats.KeysWritten = keysWritten - now := time.Now().Second() - interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} - newStats.Interval = interval - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.Set(storeID, newStats) - mc.PutStore(newStore) + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.BytesWritten = bytesWritten + newStats.KeysWritten = keysWritten + }) } // UpdateStorageReadStats updates store written bytes. func (mc *Cluster) UpdateStorageReadStats(storeID, bytesRead, keysRead uint64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.BytesRead = bytesRead - newStats.KeysRead = keysRead - now := time.Now().Second() - interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} - newStats.Interval = interval - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.Set(storeID, newStats) - mc.PutStore(newStore) + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.BytesRead = bytesRead + newStats.KeysRead = keysRead + }) } // UpdateStorageWrittenBytes updates store written bytes. func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.BytesWritten = bytesWritten - newStats.KeysWritten = bytesWritten / 100 - now := time.Now().Second() - interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} - newStats.Interval = interval - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.Set(storeID, newStats) - mc.PutStore(newStore) + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.BytesWritten = bytesWritten + newStats.KeysWritten = bytesWritten / 100 + newStats.QueryStats = &pdpb.QueryStats{ + Put: bytesWritten / 100, + } + }) } // UpdateStorageReadBytes updates store read bytes. func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.BytesRead = bytesRead - newStats.KeysRead = bytesRead / 100 - now := time.Now().Second() - interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} - newStats.Interval = interval - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.Set(storeID, newStats) - mc.PutStore(newStore) + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.BytesRead = bytesRead + newStats.KeysRead = bytesRead / 100 + }) } // UpdateStorageWrittenKeys updates store written keys. func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.KeysWritten = keysWritten - newStats.BytesWritten = keysWritten * 100 - now := time.Now().Second() - interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} - newStats.Interval = interval - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.Set(storeID, newStats) - mc.PutStore(newStore) + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.KeysWritten = keysWritten + newStats.BytesWritten = keysWritten * 100 + }) } // UpdateStorageReadKeys updates store read bytes. func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) { + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.KeysRead = keysRead + newStats.BytesRead = keysRead * 100 + }) +} + +// UpdateStorageReadQuery updates store read query. +func (mc *Cluster) UpdateStorageReadQuery(storeID uint64, queryRead uint64) { + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.QueryStats = &pdpb.QueryStats{ + Coprocessor: queryRead / 3, + Scan: queryRead / 3, + Get: queryRead / 3, + } + newStats.BytesRead = queryRead * 100 + }) +} + +// UpdateStorageWriteQuery updates store write query. +func (mc *Cluster) UpdateStorageWriteQuery(storeID uint64, queryWrite uint64) { + mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) { + newStats.QueryStats = &pdpb.QueryStats{ + Put: queryWrite / 3, + Delete: queryWrite / 3, + DeleteRange: queryWrite / 3, + } + newStats.BytesWritten = queryWrite * 100 + }) +} + +func (mc *Cluster) updateStorageStatistics(storeID uint64, update func(*pdpb.StoreStats)) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.KeysRead = keysRead - newStats.BytesRead = keysRead * 100 + update(newStats) now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval diff --git a/server/core/region_option.go b/server/core/region_option.go index 88df6363024..d148f3f9ab1 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -187,6 +187,26 @@ func SetReadKeys(v uint64) RegionCreateOption { } } +// SetReadQuery sets the read query for the region. +func SetReadQuery(v uint64) RegionCreateOption { + q := &pdpb.QueryStats{ + Coprocessor: v / 3, + Get: v / 3, + Scan: v / 3, + } + return SetQueryStats(q) +} + +// SetWrittenQuery sets the write query for the region. +func SetWrittenQuery(v uint64) RegionCreateOption { + q := &pdpb.QueryStats{ + Put: v / 3, + Delete: v / 3, + DeleteRange: v / 3, + } + return SetQueryStats(q) +} + // SetQueryStats sets the query stats for the region. func SetQueryStats(v *pdpb.QueryStats) RegionCreateOption { return func(region *RegionInfo) { diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index eec1fb683da..afde1e46f53 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -70,6 +70,9 @@ const ( // schedulePeerPr the probability of schedule the hot peer. var schedulePeerPr = 0.66 +// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together +var pendingAmpFactor = 8.0 + type hotScheduler struct { name string *BaseScheduler @@ -393,13 +396,13 @@ func (bs *balanceSolver) init() { // For write, they are different switch bs.rwTy { case read: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetReadPriorities(), []string{BytePriority, KeyPriority}) + bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetReadPriorities(), getReadLeaderPriorities) case write: switch bs.opTy { case transferLeader: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWriteLeaderPriorites(), []string{KeyPriority, BytePriority}) + bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities) case movePeer: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWritePeerPriorites(), []string{BytePriority, KeyPriority}) + bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities) } } } @@ -410,16 +413,31 @@ func (bs *balanceSolver) isSelectedDim(dim int) bool { // adjustConfig will adjust config for cluster with low version tikv // because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions -func (bs *balanceSolver) adjustConfig(origins, defaults []string) (first, second int) { +func (bs *balanceSolver) adjustConfig(origins []string, getPriorities func(*prioritiesConfig) []string) (first, second int) { querySupport := bs.cluster.IsFeatureSupported(versioninfo.HotScheduleWithQuery) withQuery := slice.AnyOf(origins, func(i int) bool { return origins[i] == QueryPriority }) - priorities := origins + compatibles := getPriorities(&compatibleConfig) if !querySupport && withQuery { - priorities = defaults + schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-compatible-config").Inc() + return prioritiesToDim(compatibles) + } + + defaults := getPriorities(&defaultConfig) + isLegal := slice.AllOf(origins, func(i int) bool { + return origins[i] == BytePriority || origins[i] == KeyPriority || origins[i] == QueryPriority + }) + if len(defaults) == len(origins) && isLegal && origins[0] != origins[1] { + return prioritiesToDim(origins) + } + + if !querySupport { + schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-compatible-config").Inc() + return prioritiesToDim(compatibles) } - return prioritiesToDim(priorities) + schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-default-config").Inc() + return prioritiesToDim(defaults) } func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver { @@ -496,9 +514,9 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { // If the statistics are from the sum of Regions, there will be a longer ZombieDuration. var maxZombieDur time.Duration switch { - case bs.rwTy == write && bs.opTy == transferLeader: + case bs.isForWriteLeader(): maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration() - case bs.rwTy == write && bs.opTy == movePeer: + case bs.isForWritePeer(): if bs.best.srcDetail.Info.IsTiFlash { maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration() } else { @@ -510,6 +528,14 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcDetail.getID(), bs.best.dstDetail.getID(), bs.infl, maxZombieDur) } +func (bs *balanceSolver) isForWriteLeader() bool { + return bs.rwTy == write && bs.opTy == transferLeader +} + +func (bs *balanceSolver) isForWritePeer() bool { + return bs.rwTy == write && bs.opTy == movePeer +} + // filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than // its expectation * ratio, the store would be selected as hot source store func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { @@ -762,18 +788,22 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *storeLo // calcProgressiveRank calculates `bs.cur.progressiveRank`. // See the comments of `solution.progressiveRank` for more about progressive rank. func (bs *balanceSolver) calcProgressiveRank() { - srcLd := bs.cur.srcDetail.LoadPred.min() - dstLd := bs.cur.dstDetail.LoadPred.max() + src := bs.cur.srcDetail + dst := bs.cur.dstDetail + srcLd := src.LoadPred.min() + dstLd := dst.LoadPred.max() + bs.cur.progressiveRank = 0 peer := bs.cur.srcPeerStat - rank := int64(0) - if bs.rwTy == write && bs.opTy == transferLeader { - // In this condition, CPU usage is the matter. - // Only consider key rate or query rate. + + if bs.isForWriteLeader() { + if !bs.isTolerance(src, dst, bs.firstPriority) { + return + } srcRate := srcLd.Loads[bs.firstPriority] dstRate := dstLd.Loads[bs.firstPriority] peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.firstPriority)) if srcRate-peerRate >= dstRate+peerRate { - rank = -1 + bs.cur.progressiveRank = -1 } } else { firstPriorityDimHot, firstPriorityDecRatio, secondPriorityDimHot, secondPriorityDecRatio := bs.getHotDecRatioByPriorities(srcLd, dstLd, peer) @@ -781,20 +811,43 @@ func (bs *balanceSolver) calcProgressiveRank() { switch { case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio: // If belong to the case, two dim will be more balanced, the best choice. - rank = -3 + if !bs.isTolerance(src, dst, bs.firstPriority) || !bs.isTolerance(src, dst, bs.secondPriority) { + return + } + bs.cur.progressiveRank = -3 bs.firstPriorityIsBetter = true bs.secondPriorityIsBetter = true case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio: // If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced. - rank = -2 + if !bs.isTolerance(src, dst, bs.secondPriority) { + return + } + bs.cur.progressiveRank = -2 bs.secondPriorityIsBetter = true case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio: // If belong to the case, first priority dim will be more balanced, ignore the second priority dim. - rank = -1 + if !bs.isTolerance(src, dst, bs.firstPriority) { + return + } + bs.cur.progressiveRank = -1 bs.firstPriorityIsBetter = true } } - bs.cur.progressiveRank = rank +} + +// isTolerance checks source store and target store by checking the difference value with pendingAmpFactor * pendingPeer. +// This will make the hot region scheduling slow even serializely running when each 2 store's pending influence is close. +func (bs *balanceSolver) isTolerance(src, dst *storeLoadDetail, dim int) bool { + srcRate := src.LoadPred.Current.Loads[dim] + dstRate := dst.LoadPred.Current.Loads[dim] + if srcRate <= dstRate { + return false + } + pendingAmp := (1 + pendingAmpFactor*srcRate/(srcRate-dstRate)) + srcPending := src.LoadPred.pending().Loads[dim] + dstPending := dst.LoadPred.pending().Loads[dim] + hotPendingStatus.WithLabelValues(bs.rwTy.String(), strconv.FormatUint(src.getID(), 10), strconv.FormatUint(dst.getID(), 10)).Set(pendingAmp) + return srcRate-pendingAmp*srcPending > dstRate+pendingAmp*dstPending } func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *storeLoad, peer *statistics.HotPeerStat) (bool, float64, bool, float64) { @@ -859,7 +912,7 @@ func (bs *balanceSolver) betterThan(old *solution) bool { if bs.cur.srcPeerStat != old.srcPeerStat { // compare region - if bs.rwTy == write && bs.opTy == transferLeader { + if bs.isForWriteLeader() { kind := getRegionStatKind(write, bs.firstPriority) switch { case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind): @@ -918,7 +971,7 @@ func (bs *balanceSolver) compareSrcStore(detail1, detail2 *storeLoadDetail) int if detail1 != detail2 { // compare source store var lpCmp storeLPCmp - if bs.rwTy == write && bs.opTy == transferLeader { + if bs.isForWriteLeader() { lpCmp = sliceLPCmp( minLPCmp(negLoadCmp(sliceLoadCmp( stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])), @@ -951,7 +1004,7 @@ func (bs *balanceSolver) compareDstStore(detail1, detail2 *storeLoadDetail) int if detail1 != detail2 { // compare destination store var lpCmp storeLPCmp - if bs.rwTy == write && bs.opTy == transferLeader { + if bs.isForWriteLeader() { lpCmp = sliceLPCmp( maxLPCmp(sliceLoadCmp( stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.minDst.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])), diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index a8266135dca..bc0e28872c3 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -43,6 +43,19 @@ const ( tiflashToleranceRatioCorrection = 0.1 ) +var defaultConfig = prioritiesConfig{ + readLeader: []string{QueryPriority, BytePriority}, + writeLeader: []string{KeyPriority, BytePriority}, + writePeer: []string{BytePriority, KeyPriority}, +} + +// because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions +var compatibleConfig = prioritiesConfig{ + readLeader: []string{BytePriority, KeyPriority}, + writeLeader: []string{KeyPriority, BytePriority}, + writePeer: []string{BytePriority, KeyPriority}, +} + // params about hot region. func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { return &hotRegionSchedulerConfig{ @@ -59,9 +72,9 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { MinorDecRatio: 0.99, SrcToleranceRatio: 1.05, // Tolerate 5% difference DstToleranceRatio: 1.05, // Tolerate 5% difference - ReadPriorities: []string{QueryPriority, BytePriority}, - WriteLeaderPriorities: []string{KeyPriority, BytePriority}, - WritePeerPriorities: []string{BytePriority, KeyPriority}, + ReadPriorities: defaultConfig.readLeader, + WriteLeaderPriorities: defaultConfig.writeLeader, + WritePeerPriorities: defaultConfig.writePeer, StrictPickingStore: true, EnableForTiFlash: true, } @@ -88,9 +101,11 @@ type hotRegionSchedulerConfig struct { SrcToleranceRatio float64 `json:"src-tolerance-ratio"` DstToleranceRatio float64 `json:"dst-tolerance-ratio"` ReadPriorities []string `json:"read-priorities"` - WriteLeaderPriorities []string `json:"write-leader-priorities"` - WritePeerPriorities []string `json:"write-peer-priorities"` - StrictPickingStore bool `json:"strict-picking-store,string"` + + // For first priority of write leader, it is better to consider key rate or query rather than byte + WriteLeaderPriorities []string `json:"write-leader-priorities"` + WritePeerPriorities []string `json:"write-peer-priorities"` + StrictPickingStore bool `json:"strict-picking-store,string"` // Separately control whether to start hotspot scheduling for TiFlash EnableForTiFlash bool `json:"enable-for-tiflash,string"` @@ -216,13 +231,13 @@ func (conf *hotRegionSchedulerConfig) GetReadPriorities() []string { return conf.ReadPriorities } -func (conf *hotRegionSchedulerConfig) GetWriteLeaderPriorites() []string { +func (conf *hotRegionSchedulerConfig) GetWriteLeaderPriorities() []string { conf.RLock() defer conf.RUnlock() return conf.WriteLeaderPriorities } -func (conf *hotRegionSchedulerConfig) GetWritePeerPriorites() []string { +func (conf *hotRegionSchedulerConfig) GetWritePeerPriorities() []string { conf.RLock() defer conf.RUnlock() return conf.WritePeerPriorities @@ -298,3 +313,21 @@ func (conf *hotRegionSchedulerConfig) persist() error { } return conf.storage.SaveScheduleConfig(HotRegionName, data) } + +type prioritiesConfig struct { + readLeader []string + writeLeader []string + writePeer []string +} + +func getReadLeaderPriorities(c *prioritiesConfig) []string { + return c.readLeader +} + +func getWriteLeaderPriorities(c *prioritiesConfig) []string { + return c.writeLeader +} + +func getWritePeerPriorities(c *prioritiesConfig) []string { + return c.writePeer +} diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index a6caa70f725..20f0a6ff0d0 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -206,9 +206,9 @@ func (s *testHotWriteRegionSchedulerSuite) checkByteRateOnly(c *C, tc *mockclust //| 3 | 1 | 2 | 4 | 512KB | // Region 1, 2 and 3 are hot regions. addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 512 * KB, 0}, - {2, []uint64{1, 3, 4}, 512 * KB, 0}, - {3, []uint64{1, 2, 4}, 512 * KB, 0}, + {1, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {2, []uint64{1, 3, 4}, 512 * KB, 0, 0}, + {3, []uint64{1, 2, 4}, 512 * KB, 0, 0}, }) c.Assert(len(hb.Schedule(tc)) == 0, IsFalse) hb.(*hotScheduler).clearPendingInfluence() @@ -288,11 +288,11 @@ func (s *testHotWriteRegionSchedulerSuite) checkByteRateOnly(c *C, tc *mockclust //| 4 | 5 | 6 | 4 | 512KB | //| 5 | 3 | 4 | 5 | 512KB | addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 512 * KB, 0}, - {2, []uint64{1, 2, 3}, 512 * KB, 0}, - {3, []uint64{6, 1, 4}, 512 * KB, 0}, - {4, []uint64{5, 6, 4}, 512 * KB, 0}, - {5, []uint64{3, 4, 5}, 512 * KB, 0}, + {1, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {2, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {3, []uint64{6, 1, 4}, 512 * KB, 0, 0}, + {4, []uint64{5, 6, 4}, 512 * KB, 0, 0}, + {5, []uint64{3, 4, 5}, 512 * KB, 0, 0}, }) // 6 possible operator. @@ -401,23 +401,27 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { //| 4 | 2 | | | 10 | 100 B | // Region 1, 2 and 3 are hot regions. testRegions := []testRegionInfo{ - {1, []uint64{1, 2, 3, 8}, 512 * KB, 5 * KB}, - {2, []uint64{1, 3, 4, 8}, 512 * KB, 5 * KB}, - {3, []uint64{1, 2, 4, 9}, 512 * KB, 5 * KB}, - {4, []uint64{2, 10}, 100, 1}, + {1, []uint64{1, 2, 3, 8}, 512 * KB, 5 * KB, 3000}, + {2, []uint64{1, 3, 4, 8}, 512 * KB, 5 * KB, 3000}, + {3, []uint64{1, 2, 4, 9}, 512 * KB, 5 * KB, 3000}, + {4, []uint64{2, 10}, 100, 1, 1}, } addRegionInfo(tc, write, testRegions) regionBytesSum := 0.0 regionKeysSum := 0.0 + regionQuerySum := 0.0 hotRegionBytesSum := 0.0 hotRegionKeysSum := 0.0 + hotRegionQuerySum := 0.0 for _, r := range testRegions { regionBytesSum += r.byteRate regionKeysSum += r.keyRate + regionQuerySum += r.queryRate } for _, r := range testRegions[0:3] { hotRegionBytesSum += r.byteRate hotRegionKeysSum += r.keyRate + hotRegionQuerySum += r.queryRate } // Will transfer a hot learner from store 8, because the total count of peers // which is hot for store 8 is larger than other TiFlash stores. @@ -461,10 +465,11 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { 3: 4.5 * MB * statistics.StoreHeartBeatReportInterval, 4: 6 * MB * statistics.StoreHeartBeatReportInterval, } - tikvBytesSum, tikvKeysSum := 0.0, 0.0 + tikvBytesSum, tikvKeysSum, tikvQuerySum := 0.0, 0.0, 0.0 for i := aliveTiKVStartID; i <= aliveTiKVLastID; i++ { tikvBytesSum += float64(storesBytes[i]) / 10 tikvKeysSum += float64(storesBytes[i]/100) / 10 + tikvQuerySum += float64(storesBytes[i]/100) / 10 } for i := uint64(1); i <= storeCount; i++ { if i != downStoreID { @@ -479,8 +484,9 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { c.Assert( loadsEqual( hb.stLoadInfos[writeLeader][1].LoadPred.Expect.Loads, - []float64{hotRegionBytesSum / aliveTiKVCount, hotRegionKeysSum / aliveTiKVCount, 0}), + []float64{hotRegionBytesSum / aliveTiKVCount, hotRegionKeysSum / aliveTiKVCount, tikvQuerySum / aliveTiKVCount}), IsTrue) + c.Assert(tikvQuerySum != hotRegionQuerySum, IsTrue) c.Assert( loadsEqual( hb.stLoadInfos[writePeer][1].LoadPred.Expect.Loads, @@ -530,6 +536,44 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { } } +func (s *testHotWriteRegionSchedulerSuite) TestWithQuery(c *C) { + originValue := schedulePeerPr + defer func() { + schedulePeerPr = originValue + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := config.NewTestOptions() + hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) + hb.(*hotScheduler).conf.SetDstToleranceRatio(1) + hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"qps", "byte"} + + tc := mockcluster.NewCluster(ctx, opt) + tc.SetHotRegionCacheHitsThreshold(0) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + + tc.UpdateStorageWriteQuery(1, 11000*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWriteQuery(2, 10000*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWriteQuery(3, 9000*statistics.StoreHeartBeatReportInterval) + + addRegionInfo(tc, write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 500, 0, 500}, + {2, []uint64{1, 2, 3}, 500, 0, 500}, + {3, []uint64{2, 1, 3}, 500, 0, 500}, + }) + schedulePeerPr = 0.0 + for i := 0; i < 100; i++ { + hb.(*hotScheduler).clearPendingInfluence() + op := hb.Schedule(tc)[0] + testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 3) + } +} + func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -556,9 +600,9 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { tc.UpdateStorageWrittenStats(5, 8.9*MB*statistics.StoreHeartBeatReportInterval, 9.2*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, - {2, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, - {3, []uint64{2, 4, 3}, 0.05 * MB, 0.1 * MB}, + {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB, 0}, + {2, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB, 0}, + {3, []uint64{2, 4, 3}, 0.05 * MB, 0.1 * MB, 0}, }) for i := 0; i < 100; i++ { @@ -607,9 +651,9 @@ func (s *testHotWriteRegionSchedulerSuite) TestUnhealthyStore(c *C) { tc.UpdateStorageWrittenStats(3, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenStats(4, 0*MB*statistics.StoreHeartBeatReportInterval, 0*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 0.5 * MB, 0.5 * MB}, - {2, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, - {3, []uint64{3, 2, 1}, 0.5 * MB, 0.5 * MB}, + {1, []uint64{1, 2, 3}, 0.5 * MB, 0.5 * MB, 0}, + {2, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB, 0}, + {3, []uint64{3, 2, 1}, 0.5 * MB, 0.5 * MB, 0}, }) intervals := []time.Duration{ @@ -656,9 +700,9 @@ func (s *testHotWriteRegionSchedulerSuite) TestCheckHot(c *C) { tc.UpdateStorageWrittenStats(4, 9.5*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 90, 0.5 * MB}, // no hot - {1, []uint64{2, 1, 3}, 90, 0.5 * MB}, // no hot - {2, []uint64{3, 2, 1}, 0.5 * MB, 0.5 * MB}, // byteDecRatio is greater than greatDecRatio + {1, []uint64{1, 2, 3}, 90, 0.5 * MB, 0}, // no hot + {1, []uint64{2, 1, 3}, 90, 0.5 * MB, 0}, // no hot + {2, []uint64{3, 2, 1}, 0.5 * MB, 0.5 * MB, 0}, // byteDecRatio is greater than greatDecRatio }) c.Check(hb.Schedule(tc), HasLen, 0) @@ -691,13 +735,13 @@ func (s *testHotWriteRegionSchedulerSuite) TestLeader(c *C) { // store3 has 2 peer as leader // If transfer leader from store2 to store1 or store3, it will keep on looping, which introduces a lot of unnecessary scheduling addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB}, - {2, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB}, - {3, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, - {4, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, - {5, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, - {6, []uint64{3, 1, 2}, 0.5 * MB, 1 * MB}, - {7, []uint64{3, 1, 2}, 0.5 * MB, 1 * MB}, + {1, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB, 0}, + {2, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB, 0}, + {3, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, + {4, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, + {5, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, + {6, []uint64{3, 1, 2}, 0.5 * MB, 1 * MB, 0}, + {7, []uint64{3, 1, 2}, 0.5 * MB, 1 * MB, 0}, }) for i := 0; i < 100; i++ { @@ -706,7 +750,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestLeader(c *C) { } addRegionInfo(tc, write, []testRegionInfo{ - {8, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, + {8, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, }) // store1 has 2 peer as leader @@ -728,6 +772,11 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithPendingInfluence(c *C) { opt := config.NewTestOptions() hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) + old := pendingAmpFactor + pendingAmpFactor = 0.0 + defer func() { + pendingAmpFactor = old + }() for i := 0; i < 2; i++ { // 0: byte rate // 1: key rate @@ -750,21 +799,21 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithPendingInfluence(c *C) { if i == 0 { // byte rate addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 512 * KB, 0}, - {2, []uint64{1, 2, 3}, 512 * KB, 0}, - {3, []uint64{1, 2, 3}, 512 * KB, 0}, - {4, []uint64{1, 2, 3}, 512 * KB, 0}, - {5, []uint64{1, 2, 3}, 512 * KB, 0}, - {6, []uint64{1, 2, 3}, 512 * KB, 0}, + {1, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {2, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {3, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {4, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {5, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {6, []uint64{1, 2, 3}, 512 * KB, 0, 0}, }) } else if i == 1 { // key rate addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 0, 512 * KB}, - {2, []uint64{1, 2, 3}, 0, 512 * KB}, - {3, []uint64{1, 2, 3}, 0, 512 * KB}, - {4, []uint64{1, 2, 3}, 0, 512 * KB}, - {5, []uint64{1, 2, 3}, 0, 512 * KB}, - {6, []uint64{1, 2, 3}, 0, 512 * KB}, + {1, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {2, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {3, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {4, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {5, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {6, []uint64{1, 2, 3}, 0, 512 * KB, 0}, }) } @@ -860,13 +909,13 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithRuleEnabled(c *C) { tc.UpdateStorageWrittenKeys(3, 10*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB}, - {2, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB}, - {3, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, - {4, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, - {5, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, - {6, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, - {7, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB}, + {1, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB, 0}, + {2, []uint64{1, 2, 3}, 0.5 * MB, 1 * MB, 0}, + {3, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, + {4, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, + {5, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, + {6, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, + {7, []uint64{2, 1, 3}, 0.5 * MB, 1 * MB, 0}, }) for i := 0; i < 100; i++ { @@ -917,10 +966,10 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { //| 11 | 1 | 2 | 3 | 7KB | // Region 1, 2 and 3 are hot regions. addRegionInfo(tc, read, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 512 * KB, 0}, - {2, []uint64{2, 1, 3}, 512 * KB, 0}, - {3, []uint64{1, 2, 3}, 512 * KB, 0}, - {11, []uint64{1, 2, 3}, 7 * KB, 0}, + {1, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {2, []uint64{2, 1, 3}, 512 * KB, 0, 0}, + {3, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {11, []uint64{1, 2, 3}, 7 * KB, 0, 0}, }) c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) @@ -946,7 +995,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { hb.(*hotScheduler).clearPendingInfluence() // assume handle the transfer leader operator rather than move leader - tc.AddRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 2}) + tc.AddRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, 0, statistics.ReadReportInterval, []uint64{1, 2}) // After transfer a hot region leader from store 1 to store 3 // the three region leader will be evenly distributed in three stores @@ -972,8 +1021,8 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { //| 5 | 4 | 2 | 5 | 512KB | //| 11 | 1 | 2 | 3 | 24KB | addRegionInfo(tc, read, []testRegionInfo{ - {4, []uint64{1, 2, 3}, 512 * KB, 0}, - {5, []uint64{4, 2, 5}, 512 * KB, 0}, + {4, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {5, []uint64{4, 2, 5}, 512 * KB, 0, 0}, }) // We will move leader peer of region 1 from 1 to 5 @@ -988,6 +1037,38 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { hb.(*hotScheduler).clearPendingInfluence() } +func (s *testHotReadRegionSchedulerSuite) TestWithQuery(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := config.NewTestOptions() + hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) + hb.(*hotScheduler).conf.SetDstToleranceRatio(1) + + tc := mockcluster.NewCluster(ctx, opt) + tc.SetHotRegionCacheHitsThreshold(0) + tc.AddRegionStore(1, 20) + tc.AddRegionStore(2, 20) + tc.AddRegionStore(3, 20) + + tc.UpdateStorageReadQuery(1, 10500*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadQuery(2, 10000*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadQuery(3, 9000*statistics.StoreHeartBeatReportInterval) + + addRegionInfo(tc, read, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 0, 0, 500}, + {2, []uint64{2, 1, 3}, 0, 0, 500}, + }) + + for i := 0; i < 100; i++ { + hb.(*hotScheduler).clearPendingInfluence() + op := hb.Schedule(tc)[0] + testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 3) + } +} + func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1014,9 +1095,9 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { tc.UpdateStorageReadStats(5, 8.9*MB*statistics.StoreHeartBeatReportInterval, 9.2*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, read, []testRegionInfo{ - {1, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB}, - {2, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB}, - {3, []uint64{3, 4, 5}, 0.05 * MB, 0.1 * MB}, + {1, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB, 0}, + {2, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB, 0}, + {3, []uint64{3, 4, 5}, 0.05 * MB, 0.1 * MB, 0}, }) for i := 0; i < 100; i++ { @@ -1053,7 +1134,11 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { hb.(*hotScheduler).conf.MinorDecRatio = 1 hb.(*hotScheduler).conf.DstToleranceRatio = 1 hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority} - + old := pendingAmpFactor + pendingAmpFactor = 0.0 + defer func() { + pendingAmpFactor = old + }() for i := 0; i < 2; i++ { // 0: byte rate // 1: key rate @@ -1076,25 +1161,25 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { if i == 0 { // byte rate addRegionInfo(tc, read, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 512 * KB, 0}, - {2, []uint64{1, 2, 3}, 512 * KB, 0}, - {3, []uint64{1, 2, 3}, 512 * KB, 0}, - {4, []uint64{1, 2, 3}, 512 * KB, 0}, - {5, []uint64{2, 1, 3}, 512 * KB, 0}, - {6, []uint64{2, 1, 3}, 512 * KB, 0}, - {7, []uint64{3, 2, 1}, 512 * KB, 0}, - {8, []uint64{3, 2, 1}, 512 * KB, 0}, + {1, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {2, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {3, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {4, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {5, []uint64{2, 1, 3}, 512 * KB, 0, 0}, + {6, []uint64{2, 1, 3}, 512 * KB, 0, 0}, + {7, []uint64{3, 2, 1}, 512 * KB, 0, 0}, + {8, []uint64{3, 2, 1}, 512 * KB, 0, 0}, }) } else if i == 1 { // key rate addRegionInfo(tc, read, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 0, 512 * KB}, - {2, []uint64{1, 2, 3}, 0, 512 * KB}, - {3, []uint64{1, 2, 3}, 0, 512 * KB}, - {4, []uint64{1, 2, 3}, 0, 512 * KB}, - {5, []uint64{2, 1, 3}, 0, 512 * KB}, - {6, []uint64{2, 1, 3}, 0, 512 * KB}, - {7, []uint64{3, 2, 1}, 0, 512 * KB}, - {8, []uint64{3, 2, 1}, 0, 512 * KB}, + {1, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {2, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {3, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {4, []uint64{1, 2, 3}, 0, 512 * KB, 0}, + {5, []uint64{2, 1, 3}, 0, 512 * KB, 0}, + {6, []uint64{2, 1, 3}, 0, 512 * KB, 0}, + {7, []uint64{3, 2, 1}, 0, 512 * KB, 0}, + {8, []uint64{3, 2, 1}, 0, 512 * KB, 0}, }) } @@ -1107,11 +1192,16 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4) // After move-peer, store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5) + pendingAmpFactor = old + ops := hb.Schedule(tc) + c.Assert(ops, HasLen, 0) + pendingAmpFactor = 0.0 + op2 := hb.Schedule(tc)[0] testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4) // After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6) - ops := hb.Schedule(tc) + ops = hb.Schedule(tc) c.Logf("%v", ops) c.Assert(ops, HasLen, 0) } @@ -1155,11 +1245,11 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { /// For read flow addRegionInfo(tc, read, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 512 * KB, 0}, - {2, []uint64{2, 1, 3}, 512 * KB, 0}, - {3, []uint64{1, 2, 3}, 20 * KB, 0}, + {1, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {2, []uint64{2, 1, 3}, 512 * KB, 0, 0}, + {3, []uint64{1, 2, 3}, 20 * KB, 0, 0}, // lower than hot read flow rate, but higher than write flow rate - {11, []uint64{1, 2, 3}, 7 * KB, 0}, + {11, []uint64{1, 2, 3}, 7 * KB, 0, 0}, }) stats := tc.RegionStats(statistics.ReadFlow, 0) c.Assert(len(stats[1]), Equals, 3) @@ -1167,8 +1257,8 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { c.Assert(len(stats[3]), Equals, 3) addRegionInfo(tc, read, []testRegionInfo{ - {3, []uint64{2, 1, 3}, 20 * KB, 0}, - {11, []uint64{1, 2, 3}, 7 * KB, 0}, + {3, []uint64{2, 1, 3}, 20 * KB, 0, 0}, + {11, []uint64{1, 2, 3}, 7 * KB, 0, 0}, }) stats = tc.RegionStats(statistics.ReadFlow, 0) c.Assert(len(stats[1]), Equals, 3) @@ -1176,9 +1266,9 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { c.Assert(len(stats[3]), Equals, 3) addRegionInfo(tc, write, []testRegionInfo{ - {4, []uint64{1, 2, 3}, 512 * KB, 0}, - {5, []uint64{1, 2, 3}, 20 * KB, 0}, - {6, []uint64{1, 2, 3}, 0.8 * KB, 0}, + {4, []uint64{1, 2, 3}, 512 * KB, 0, 0}, + {5, []uint64{1, 2, 3}, 20 * KB, 0, 0}, + {6, []uint64{1, 2, 3}, 0.8 * KB, 0, 0}, }) stats = tc.RegionStats(statistics.WriteFlow, 0) c.Assert(len(stats[1]), Equals, 2) @@ -1186,7 +1276,7 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { c.Assert(len(stats[3]), Equals, 2) addRegionInfo(tc, write, []testRegionInfo{ - {5, []uint64{1, 2, 5}, 20 * KB, 0}, + {5, []uint64{1, 2, 5}, 20 * KB, 0, 0}, }) stats = tc.RegionStats(statistics.WriteFlow, 0) @@ -1197,11 +1287,11 @@ func (s *testHotCacheSuite) TestUpdateCache(c *C) { // For leader read flow addRegionLeaderReadInfo(tc, []testRegionInfo{ - {21, []uint64{4, 5, 6}, 512 * KB, 0}, - {22, []uint64{5, 4, 6}, 512 * KB, 0}, - {23, []uint64{4, 5, 6}, 20 * KB, 0}, + {21, []uint64{4, 5, 6}, 512 * KB, 0, 0}, + {22, []uint64{5, 4, 6}, 512 * KB, 0, 0}, + {23, []uint64{4, 5, 6}, 20 * KB, 0, 0}, // lower than hot read flow rate, but higher than write flow rate - {31, []uint64{4, 5, 6}, 7 * KB, 0}, + {31, []uint64{4, 5, 6}, 7 * KB, 0, 0}, }) stats = tc.RegionStats(statistics.ReadFlow, 0) c.Assert(len(stats[4]), Equals, 2) @@ -1217,14 +1307,14 @@ func (s *testHotCacheSuite) TestKeyThresholds(c *C) { tc := mockcluster.NewCluster(ctx, opt) tc.SetHotRegionCacheHitsThreshold(0) addRegionInfo(tc, read, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 0, 1}, - {2, []uint64{1, 2, 3}, 0, 1 * KB}, + {1, []uint64{1, 2, 3}, 0, 1, 0}, + {2, []uint64{1, 2, 3}, 0, 1 * KB, 0}, }) stats := tc.RegionStats(statistics.ReadFlow, 0) c.Assert(stats[1], HasLen, 1) addRegionInfo(tc, write, []testRegionInfo{ - {3, []uint64{4, 5, 6}, 0, 1}, - {4, []uint64{4, 5, 6}, 0, 1 * KB}, + {3, []uint64{4, 5, 6}, 0, 1, 0}, + {4, []uint64{4, 5, 6}, 0, 1 * KB, 0}, }) stats = tc.RegionStats(statistics.WriteFlow, 0) c.Assert(stats[4], HasLen, 1) @@ -1303,10 +1393,10 @@ func (s *testHotCacheSuite) TestByteAndKey(c *C) { c.Assert(len(stats[1]), Equals, 500) addRegionInfo(tc, read, []testRegionInfo{ - {10001, []uint64{1, 2, 3}, 10 * KB, 10 * KB}, - {10002, []uint64{1, 2, 3}, 500 * KB, 10 * KB}, - {10003, []uint64{1, 2, 3}, 10 * KB, 500 * KB}, - {10004, []uint64{1, 2, 3}, 500 * KB, 500 * KB}, + {10001, []uint64{1, 2, 3}, 10 * KB, 10 * KB, 0}, + {10002, []uint64{1, 2, 3}, 500 * KB, 10 * KB, 0}, + {10003, []uint64{1, 2, 3}, 10 * KB, 500 * KB, 0}, + {10004, []uint64{1, 2, 3}, 500 * KB, 500 * KB, 0}, }) stats = tc.RegionStats(statistics.ReadFlow, 0) c.Assert(len(stats[1]), Equals, 503) @@ -1318,10 +1408,10 @@ func (s *testHotCacheSuite) TestByteAndKey(c *C) { c.Assert(len(stats[2]), Equals, 500) c.Assert(len(stats[3]), Equals, 500) addRegionInfo(tc, write, []testRegionInfo{ - {10001, []uint64{1, 2, 3}, 10 * KB, 10 * KB}, - {10002, []uint64{1, 2, 3}, 500 * KB, 10 * KB}, - {10003, []uint64{1, 2, 3}, 10 * KB, 500 * KB}, - {10004, []uint64{1, 2, 3}, 500 * KB, 500 * KB}, + {10001, []uint64{1, 2, 3}, 10 * KB, 10 * KB, 0}, + {10002, []uint64{1, 2, 3}, 500 * KB, 10 * KB, 0}, + {10003, []uint64{1, 2, 3}, 10 * KB, 500 * KB, 0}, + {10004, []uint64{1, 2, 3}, 500 * KB, 500 * KB, 0}, }) stats = tc.RegionStats(statistics.WriteFlow, 0) c.Assert(len(stats[1]), Equals, 503) @@ -1333,9 +1423,10 @@ func (s *testHotCacheSuite) TestByteAndKey(c *C) { type testRegionInfo struct { id uint64 // the storeID list for the peers, the leader is stored in the first store - peers []uint64 - byteRate float64 - keyRate float64 + peers []uint64 + byteRate float64 + keyRate float64 + queryRate float64 } func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInfo) { @@ -1352,6 +1443,7 @@ func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInf r.id, r.peers[0], uint64(r.byteRate*float64(reportIntervalSecs)), uint64(r.keyRate*float64(reportIntervalSecs)), + uint64(r.queryRate*float64(reportIntervalSecs)), uint64(reportIntervalSecs), r.peers[1:], ) @@ -1366,6 +1458,7 @@ func addRegionLeaderReadInfo(tc *mockcluster.Cluster, regions []testRegionInfo) r.id, r.peers[0], uint64(r.byteRate*float64(reportIntervalSecs)), uint64(r.keyRate*float64(reportIntervalSecs)), + uint64(r.queryRate*float64(reportIntervalSecs)), uint64(reportIntervalSecs), r.peers[1:], ) @@ -1420,15 +1513,15 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) { reportInterval = uint64(statistics.ReadReportInterval) } // hot degree increase - heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1) - heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1) - items := heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1) + heartbeat(1, 1, 512*KB*reportInterval, 0, 0, reportInterval, []uint64{2, 3}, 1) + heartbeat(1, 1, 512*KB*reportInterval, 0, 0, reportInterval, []uint64{2, 3}, 1) + items := heartbeat(1, 1, 512*KB*reportInterval, 0, 0, reportInterval, []uint64{2, 3}, 1) c.Check(len(items), Greater, 0) for _, item := range items { c.Check(item.HotDegree, Equals, 3) } // transfer leader - items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3}, 1) + items = heartbeat(1, 2, 512*KB*reportInterval, 0, 0, reportInterval, []uint64{1, 3}, 1) for _, item := range items { if item.StoreID == 2 { c.Check(item.HotDegree, Equals, testcase.DegreeAfterTransferLeader) @@ -1448,12 +1541,12 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) { } // move peer: add peer and remove peer - items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3, 4}, 1) + items = heartbeat(1, 2, 512*KB*reportInterval, 0, 0, reportInterval, []uint64{1, 3, 4}, 1) c.Check(len(items), Greater, 0) for _, item := range items { c.Check(item.HotDegree, Equals, testcase.DegreeAfterTransferLeader+1) } - items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 4}, 1) + items = heartbeat(1, 2, 512*KB*reportInterval, 0, 0, reportInterval, []uint64{1, 4}, 1) c.Check(len(items), Greater, 0) for _, item := range items { if item.StoreID == 3 { @@ -1479,16 +1572,16 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) { rate := uint64(512 * KB) for i := 0; i < statistics.TopNN; i++ { for j := 0; j < statistics.DefaultAotSize; j++ { - tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1) + tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1) } } - items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1) + items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1) c.Check(items[0].GetThresholds()[0], Equals, float64(rate)*statistics.HotThresholdRatio) // Threshold of store 1,2,3 is 409.6 KB and others are 1 KB // Make the hot threshold of some store is high and the others are low rate = 10 * KB - tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3, 4}, 1) - items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1) + tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3, 4}, 1) + items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1) for _, item := range items { if item.StoreID < 4 { c.Check(item.IsNeedDelete(), IsTrue) @@ -1575,10 +1668,10 @@ func (s *testInfluenceSerialSuite) TestInfluenceByRWType(c *C) { tc.UpdateStorageWrittenStats(3, 2*MB*statistics.StoreHeartBeatReportInterval, 2*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenStats(4, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, + {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB, 0}, }) addRegionInfo(tc, read, []testRegionInfo{ - {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, + {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB, 0}, }) // must move peer schedulePeerPr = 1.0 @@ -1596,16 +1689,13 @@ func (s *testInfluenceSerialSuite) TestInfluenceByRWType(c *C) { c.Assert(nearlyAbout(stInfos[4].PendingSum.Loads[statistics.RegionReadKeys], 0.5*MB), IsTrue) c.Assert(nearlyAbout(stInfos[4].PendingSum.Loads[statistics.RegionReadBytes], 0.5*MB), IsTrue) - addRegionInfo(tc, write, []testRegionInfo{ - {2, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB}, - {3, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB}, - {4, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB}, - }) - addRegionInfo(tc, read, []testRegionInfo{ - {2, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB}, - {3, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB}, - {4, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB}, - }) + // consider pending amp, there are nine regions or more. + for i := 2; i < 13; i++ { + addRegionInfo(tc, write, []testRegionInfo{ + {uint64(i), []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB, 0}, + }) + } + // must transfer leader schedulePeerPr = 0 // must transfer leader from 1 to 3 @@ -1696,8 +1786,8 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { // must transfer peer schedulePeerPr = 1.0 addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 2 * MB, 1 * MB}, - {6, []uint64{4, 2, 3}, 1 * MB, 2 * MB}, + {1, []uint64{1, 2, 3}, 2 * MB, 1 * MB, 0}, + {6, []uint64{4, 2, 3}, 1 * MB, 2 * MB, 0}, }) hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority} ops := hb.Schedule(tc) @@ -1719,7 +1809,7 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { tc.UpdateStorageReadStats(2, 1*MB*statistics.StoreHeartBeatReportInterval, 7*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadStats(3, 7*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, read, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 2 * MB, 2 * MB}, + {1, []uint64{1, 2, 3}, 2 * MB, 2 * MB, 0}, }) hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority} ops = hb.Schedule(tc) @@ -1757,6 +1847,7 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5) hb.(*hotScheduler).clearPendingInfluence() } + func (s *testHotSchedulerSuite) TestHotWriteLeaderScheduleWithPriority(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1778,23 +1869,25 @@ func (s *testHotSchedulerSuite) TestHotWriteLeaderScheduleWithPriority(c *C) { tc.UpdateStorageWrittenStats(3, 1*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, write, []testRegionInfo{ - {1, []uint64{1, 2, 3}, 10 * MB, 10 * MB}, - {2, []uint64{1, 2, 3}, 10 * MB, 10 * MB}, - {3, []uint64{1, 2, 3}, 10 * MB, 10 * MB}, - {4, []uint64{2, 1, 3}, 10 * MB, 0 * MB}, - {5, []uint64{3, 2, 1}, 0 * MB, 10 * MB}, + {1, []uint64{1, 2, 3}, 10 * MB, 10 * MB, 0}, + {2, []uint64{1, 2, 3}, 10 * MB, 10 * MB, 0}, + {3, []uint64{1, 2, 3}, 10 * MB, 10 * MB, 0}, + {4, []uint64{2, 1, 3}, 10 * MB, 0 * MB, 0}, + {5, []uint64{3, 2, 1}, 0 * MB, 10 * MB, 0}, }) - old := schedulePeerPr - schedulePeerPr = 0.0 + old1, old2 := schedulePeerPr, pendingAmpFactor + schedulePeerPr, pendingAmpFactor = 0.0, 0.0 + defer func() { + schedulePeerPr, pendingAmpFactor = old1, old2 + }() hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{KeyPriority, BytePriority} ops := hb.Schedule(tc) c.Assert(len(ops), Equals, 1) testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 2) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{BytePriority, KeyPriority} ops = hb.Schedule(tc) - c.Assert(len(ops), Equals, 1) + c.Assert(ops, HasLen, 1) testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 3) - schedulePeerPr = old } func (s *testHotSchedulerSuite) TestCompatibility(c *C) { @@ -1811,6 +1904,15 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) { {statistics.KeyDim, statistics.ByteDim}, {statistics.ByteDim, statistics.KeyDim}, }) + // config error value + hb.(*hotScheduler).conf.ReadPriorities = []string{"hahaha"} + hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"hahaha", "byte"} + hb.(*hotScheduler).conf.WritePeerPriorities = []string{"qps", "byte", "key"} + checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ + {statistics.QueryDim, statistics.ByteDim}, + {statistics.KeyDim, statistics.ByteDim}, + {statistics.ByteDim, statistics.KeyDim}, + }) // low version tc.DisableFeature(versioninfo.HotScheduleWithQuery) checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ @@ -1836,6 +1938,15 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) { {statistics.KeyDim, statistics.ByteDim}, {statistics.ByteDim, statistics.KeyDim}, }) + // config error value + hb.(*hotScheduler).conf.ReadPriorities = []string{"error", "error"} + hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{} + hb.(*hotScheduler).conf.WritePeerPriorities = []string{"qps", "byte", "key"} + checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ + {statistics.ByteDim, statistics.KeyDim}, + {statistics.KeyDim, statistics.ByteDim}, + {statistics.ByteDim, statistics.KeyDim}, + }) } func checkPriority(c *C, hb *hotScheduler, tc *mockcluster.Cluster, dims [3][2]int) { diff --git a/server/schedulers/metrics.go b/server/schedulers/metrics.go index 1f8ce0c308b..8da55e1ce07 100644 --- a/server/schedulers/metrics.go +++ b/server/schedulers/metrics.go @@ -111,6 +111,14 @@ var scatterRangeRegionCounter = prometheus.NewCounterVec( Help: "Counter of scatter range region scheduler.", }, []string{"type", "store"}) +var hotPendingStatus = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "scheduler", + Name: "hot_pending", + Help: "Counter of direction of balance related schedulers.", + }, []string{"type", "source", "target"}) + func init() { prometheus.MustRegister(schedulerCounter) prometheus.MustRegister(schedulerStatus) @@ -124,4 +132,5 @@ func init() { prometheus.MustRegister(scatterRangeRegionCounter) prometheus.MustRegister(opInfluenceStatus) prometheus.MustRegister(tolerantResourceStatus) + prometheus.MustRegister(hotPendingStatus) } diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index a36d072120e..fd77265dd98 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -192,9 +192,9 @@ func (s *testShuffleHotRegionSchedulerSuite) checkBalance(c *C, tc *mockcluster. //| 1 | 1 | 2 | 3 | 512KB | //| 2 | 1 | 3 | 4 | 512KB | //| 3 | 1 | 2 | 4 | 512KB | - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4}) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 4}) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{3, 4}) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 4}) tc.SetHotRegionCacheHitsThreshold(0) // try to get an operator @@ -232,9 +232,9 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) { tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) - tc.AddRegionWithReadInfo(1, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2}) - tc.AddRegionWithReadInfo(2, 2, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 3}) - tc.AddRegionWithReadInfo(3, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2, 3}) + tc.AddRegionWithReadInfo(1, 1, 512*KB*statistics.ReadReportInterval, 0, 0, statistics.ReadReportInterval, []uint64{2}) + tc.AddRegionWithReadInfo(2, 2, 512*KB*statistics.ReadReportInterval, 0, 0, statistics.ReadReportInterval, []uint64{1, 3}) + tc.AddRegionWithReadInfo(3, 1, 512*KB*statistics.ReadReportInterval, 0, 0, statistics.ReadReportInterval, []uint64{2, 3}) tc.SetHotRegionCacheHitsThreshold(0) c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) c.Assert(hb.IsScheduleAllowed(tc), IsFalse) @@ -401,11 +401,11 @@ func (s *testSpecialUseSuite) TestSpecialUseHotRegion(c *C) { tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenBytes(4, 0) tc.UpdateStorageWrittenBytes(5, 0) - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}) - tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 3}) - tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 2}) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{1, 3}) + tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{1, 2}) ops = hs.Schedule(tc) c.Assert(ops, HasLen, 1) testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 4) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 545ac6fcbcf..082dfc23727 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -317,6 +317,18 @@ func (lp *storeLoadPred) max() *storeLoad { return maxLoad(&lp.Current, &lp.Future) } +func (lp *storeLoadPred) pending() *storeLoad { + mx, mn := lp.max(), lp.min() + loads := make([]float64, len(mx.Loads)) + for i := range loads { + loads[i] = mx.Loads[i] - mn.Loads[i] + } + return &storeLoad{ + Loads: loads, + Count: 0, + } +} + func (lp *storeLoadPred) diff() *storeLoad { mx, mn := lp.max(), lp.min() loads := make([]float64, len(mx.Loads))