diff --git a/server/api/trend.go b/server/api/trend.go index a893546a541..60357965004 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -125,7 +125,6 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { if err != nil { return nil, err } - trendStores := make([]trendStore, 0, len(stores)) for _, store := range stores { info := newStoreInfo(h.svr.GetScheduleConfig(), store) @@ -141,21 +140,21 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { LastHeartbeatTS: info.Status.LastHeartbeatTS, Uptime: info.Status.Uptime, } - s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, store.GetID()) - s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, store.GetID()) + s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, statistics.RegionReadBytes, store.GetID()) + s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, statistics.RegionWriteBytes, store.GetID()) trendStores = append(trendStores, s) } return trendStores, nil } -func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, storeID uint64) (storeFlow float64, regionFlows []float64) { +func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, regionStatKind statistics.RegionStatKind, storeID uint64) (storeFlow float64, regionFlows []float64) { if stats == nil { return } if stat, ok := stats[storeID]; ok { - storeFlow = stat.TotalBytesRate + storeFlow = stat.TotalLoads[regionStatKind] for _, flow := range stat.Stats { - regionFlows = append(regionFlows, flow.GetByteRate()) + regionFlows = append(regionFlows, flow.GetLoad(regionStatKind)) } } return diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index ff9e1a1cac2..dadbcef4e1f 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -426,8 +426,8 @@ func (c *coordinator) stop() { type hasHotStatus interface { GetHotReadStatus() *statistics.StoreHotPeersInfos GetHotWriteStatus() *statistics.StoreHotPeersInfos - GetWritePendingInfluence() map[uint64]schedulers.Influence - GetReadPendingInfluence() map[uint64]schedulers.Influence + GetWritePendingInfluence() map[uint64]*schedulers.Influence + GetReadPendingInfluence() map[uint64]*schedulers.Influence } func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos { @@ -512,8 +512,8 @@ func (c *coordinator) collectHotSpotMetrics() { storeLabel := fmt.Sprintf("%d", storeID) stat, ok := status.AsPeer[storeID] if ok { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalBytesRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalKeysRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalLoads[statistics.RegionWriteBytes]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalLoads[statistics.RegionWriteKeys]) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0) @@ -523,8 +523,8 @@ func (c *coordinator) collectHotSpotMetrics() { stat, ok = status.AsLeader[storeID] if ok { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalBytesRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalKeysRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalLoads[statistics.RegionWriteBytes]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalLoads[statistics.RegionWriteKeys]) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0) @@ -532,11 +532,12 @@ func (c *coordinator) collectHotSpotMetrics() { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0) } - infl := pendings[storeID] // TODO: add to tidb-ansible after merging pending influence into operator influence. - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.ByteRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.KeyRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count) + if infl := pendings[storeID]; infl != nil { + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count) + } } // Collects hot read region metrics. @@ -548,8 +549,8 @@ func (c *coordinator) collectHotSpotMetrics() { storeLabel := fmt.Sprintf("%d", storeID) stat, ok := status.AsLeader[storeID] if ok { - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalBytesRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalKeysRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalLoads[statistics.RegionReadBytes]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalLoads[statistics.RegionReadKeys]) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0) @@ -557,10 +558,11 @@ func (c *coordinator) collectHotSpotMetrics() { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0) } - infl := pendings[storeID] - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.ByteRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.KeyRate) - hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count) + if infl := pendings[storeID]; infl != nil { + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim]) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count) + } } } diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index b35de504822..0a6e333233c 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/filter" @@ -100,7 +101,7 @@ type hotScheduler struct { stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail // pendingSums indicates the [resourceType] storeID -> pending Influence // This stores the pending Influence for each store by resource type. - pendingSums [resourceTypeLen]map[uint64]Influence + pendingSums [resourceTypeLen]map[uint64]*Influence // config of hot scheduler conf *hotRegionSchedulerConfig } @@ -271,64 +272,66 @@ func (h *hotScheduler) gcRegionPendings() { // it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store func summaryStoresLoad( storesLoads map[uint64][]float64, - storePendings map[uint64]Influence, + storePendings map[uint64]*Influence, storeHotPeers map[uint64][]*statistics.HotPeerStat, rwTy rwType, kind core.ResourceKind, ) map[uint64]*storeLoadDetail { // loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count) loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads)) - allByteSum := 0.0 - allKeySum := 0.0 + allLoadSum := make([]float64, statistics.DimLen) allCount := 0.0 - for id, loads := range storesLoads { - var byteRate, keyRate float64 + // Stores without byte rate statistics is not available to schedule. + for id, storeLoads := range storesLoads { + loads := make([]float64, statistics.DimLen) switch rwTy { case read: - byteRate, keyRate = loads[statistics.StoreReadBytes], loads[statistics.StoreReadKeys] + loads[statistics.ByteDim] = storeLoads[statistics.StoreReadBytes] + loads[statistics.KeyDim] = storeLoads[statistics.StoreReadKeys] case write: - byteRate, keyRate = loads[statistics.StoreWriteBytes], loads[statistics.StoreWriteKeys] + loads[statistics.ByteDim] = storeLoads[statistics.StoreWriteBytes] + loads[statistics.KeyDim] = storeLoads[statistics.StoreWriteKeys] } - // Find all hot peers first var hotPeers []*statistics.HotPeerStat { - byteSum := 0.0 - keySum := 0.0 + peerLoadSum := make([]float64, statistics.DimLen) + // TODO: To remove `filterHotPeers`, we need to: + // HotLeaders consider `Write{Bytes,Keys}`, so when we schedule `writeLeader`, all peers are leader. for _, peer := range filterHotPeers(kind, storeHotPeers[id]) { - byteSum += peer.GetByteRate() - keySum += peer.GetKeyRate() + for i := range peerLoadSum { + peerLoadSum[i] += peer.GetLoad(getRegionStatKind(rwTy, i)) + } hotPeers = append(hotPeers, peer.Clone()) } // Use sum of hot peers to estimate leader-only byte rate. // For write requests, Write{Bytes, Keys} is applied to all Peers at the same time, while the Leader and Follower are under different loads (usually the Leader consumes more CPU). // But none of the current dimension reflect this difference, so we create a new dimension to reflect it. if kind == core.LeaderKind && rwTy == write { - byteRate = byteSum - keyRate = keySum + loads = peerLoadSum } // Metric for debug. { ty := "byte-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(byteSum) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.ByteDim]) } { ty := "key-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.KeyDim]) } } - allByteSum += byteRate - allKeySum += keyRate + for i := range allLoadSum { + allLoadSum[i] += loads[i] + } allCount += float64(len(hotPeers)) // Build store load prediction from current load and pending influence. stLoadPred := (&storeLoad{ - ByteRate: byteRate, - KeyRate: keyRate, - Count: float64(len(hotPeers)), - }).ToLoadPred(storePendings[id]) + Loads: loads, + Count: float64(len(hotPeers)), + }).ToLoadPred(rwTy, storePendings[id]) // Construct store load info. loadDetail[id] = &storeLoadDetail{ @@ -336,28 +339,28 @@ func summaryStoresLoad( HotPeers: hotPeers, } } - storeLen := float64(len(storesLoads)) // store expectation byte/key rate and count for each store-load detail. for id, detail := range loadDetail { - byteExp := allByteSum / storeLen - keyExp := allKeySum / storeLen - countExp := allCount / storeLen - detail.LoadPred.Expect.ByteRate = byteExp - detail.LoadPred.Expect.KeyRate = keyExp - detail.LoadPred.Expect.Count = countExp + expectLoads := make([]float64, len(allLoadSum)) + for i := range expectLoads { + expectLoads[i] = allLoadSum[i] / storeLen + } + expectCount := allCount / storeLen + detail.LoadPred.Expect.Loads = expectLoads + detail.LoadPred.Expect.Count = expectCount // Debug { ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(byteExp) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.ByteDim]) } { ty := "exp-key-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keyExp) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.KeyDim]) } { ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(countExp) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectCount) } } return loadDetail @@ -479,13 +482,15 @@ func (bs *balanceSolver) init() { } // And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat - bs.maxSrc = &storeLoad{} + bs.maxSrc = &storeLoad{Loads: make([]float64, statistics.DimLen)} bs.minDst = &storeLoad{ - ByteRate: math.MaxFloat64, - KeyRate: math.MaxFloat64, - Count: math.MaxFloat64, + Loads: make([]float64, statistics.DimLen), + Count: math.MaxFloat64, } - maxCur := &storeLoad{} + for i := range bs.minDst.Loads { + bs.minDst.Loads[i] = math.MaxFloat64 + } + maxCur := &storeLoad{Loads: make([]float64, statistics.DimLen)} for _, detail := range bs.stLoadDetail { bs.maxSrc = maxLoad(bs.maxSrc, detail.LoadPred.min()) @@ -493,10 +498,14 @@ func (bs *balanceSolver) init() { maxCur = maxLoad(maxCur, &detail.LoadPred.Current) } + rankStepRatios := []float64{bs.sche.conf.GetByteRankStepRatio(), bs.sche.conf.GetKeyRankStepRatio()} + stepLoads := make([]float64, statistics.DimLen) + for i := range stepLoads { + stepLoads[i] = maxCur.Loads[i] * rankStepRatios[i] + } bs.rankStep = &storeLoad{ - ByteRate: maxCur.ByteRate * bs.sche.conf.GetByteRankStepRatio(), - KeyRate: maxCur.KeyRate * bs.sche.conf.GetKeyRankStepRatio(), - Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), + Loads: stepLoads, + Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), } } @@ -598,8 +607,10 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { if len(detail.HotPeers) == 0 { continue } - if detail.LoadPred.min().ByteRate > bs.sche.conf.GetSrcToleranceRatio()*detail.LoadPred.Expect.ByteRate && - detail.LoadPred.min().KeyRate > bs.sche.conf.GetSrcToleranceRatio()*detail.LoadPred.Expect.KeyRate { + minLoad := detail.LoadPred.min() + if slice.AllOf(minLoad.Loads, func(i int) bool { + return minLoad.Loads[i] > bs.sche.conf.GetSrcToleranceRatio()*detail.LoadPred.Expect.Loads[i] + }) { ret[id] = detail hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() } @@ -635,12 +646,14 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { byteSort := make([]*statistics.HotPeerStat, len(ret)) copy(byteSort, ret) sort.Slice(byteSort, func(i, j int) bool { - return byteSort[i].GetByteRate() > byteSort[j].GetByteRate() + k := getRegionStatKind(bs.rwTy, statistics.ByteDim) + return byteSort[i].GetLoad(k) > byteSort[j].GetLoad(k) }) keySort := make([]*statistics.HotPeerStat, len(ret)) copy(keySort, ret) sort.Slice(keySort, func(i, j int) bool { - return keySort[i].GetKeyRate() > keySort[j].GetKeyRate() + k := getRegionStatKind(bs.rwTy, statistics.KeyDim) + return byteSort[i].GetLoad(k) > byteSort[j].GetLoad(k) }) union := make(map[*statistics.HotPeerStat]struct{}, maxPeerNum) @@ -775,8 +788,10 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*co for _, store := range candidates { if filter.Target(bs.cluster.GetOpts(), store, filters) { detail := bs.stLoadDetail[store.GetID()] - if detail.LoadPred.max().ByteRate*dstToleranceRatio < detail.LoadPred.Expect.ByteRate && - detail.LoadPred.max().KeyRate*dstToleranceRatio < detail.LoadPred.Expect.KeyRate { + maxLoads := detail.LoadPred.max().Loads + if slice.AllOf(maxLoads, func(i int) bool { + return maxLoads[i]*dstToleranceRatio < detail.LoadPred.Expect.Loads[i] + }) { ret[store.GetID()] = bs.stLoadDetail[store.GetID()] hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc() } @@ -796,22 +811,32 @@ func (bs *balanceSolver) calcProgressiveRank() { if bs.rwTy == write && bs.opTy == transferLeader { // In this condition, CPU usage is the matter. // Only consider about key rate. - if srcLd.KeyRate-peer.GetKeyRate() >= dstLd.KeyRate+peer.GetKeyRate() { + srcKeyRate := srcLd.Loads[statistics.KeyDim] + dstKeyRate := dstLd.Loads[statistics.KeyDim] + peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, statistics.KeyDim)) + if srcKeyRate-peerKeyRate >= dstKeyRate+peerKeyRate { rank = -1 } } else { + // we use DecRatio(Decline Ratio) to expect that the dst store's (key/byte) rate should still be less + // than the src store's (key/byte) rate after scheduling one peer. getSrcDecRate := func(a, b float64) float64 { if a-b <= 0 { return 1 } return a - b } - // we use DecRatio(Decline Ratio) to expect that the dst store's (key/byte) rate should still be less - // than the src store's (key/byte) rate after scheduling one peer. - keyDecRatio := (dstLd.KeyRate + peer.GetKeyRate()) / getSrcDecRate(srcLd.KeyRate, peer.GetKeyRate()) - keyHot := peer.GetKeyRate() >= bs.sche.conf.GetMinHotKeyRate() - byteDecRatio := (dstLd.ByteRate + peer.GetByteRate()) / getSrcDecRate(srcLd.ByteRate, peer.GetByteRate()) - byteHot := peer.GetByteRate() > bs.sche.conf.GetMinHotByteRate() + checkHot := func(dim int) (bool, float64) { + srcRate := srcLd.Loads[dim] + dstRate := dstLd.Loads[dim] + peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, dim)) + decRatio := (dstRate + peerRate) / getSrcDecRate(srcRate, peerRate) + isHot := peerRate >= bs.sche.conf.GetMinHotKeyRate() + return isHot, decRatio + } + keyHot, keyDecRatio := checkHot(statistics.KeyDim) + byteHot, byteDecRatio := checkHot(statistics.ByteDim) + greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio() switch { case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio: @@ -858,14 +883,15 @@ func (bs *balanceSolver) betterThan(old *solution) bool { if bs.rwTy == write && bs.opTy == transferLeader { switch { - case bs.cur.srcPeerStat.GetKeyRate() > old.srcPeerStat.GetKeyRate(): + case bs.cur.srcPeerStat.GetLoad(statistics.RegionWriteKeys) > old.srcPeerStat.GetLoad(statistics.RegionWriteKeys): return true - case bs.cur.srcPeerStat.GetKeyRate() < old.srcPeerStat.GetKeyRate(): + case bs.cur.srcPeerStat.GetLoad(statistics.RegionWriteKeys) < old.srcPeerStat.GetLoad(statistics.RegionWriteKeys): return false } } else { - byteRkCmp := rankCmp(bs.cur.srcPeerStat.GetByteRate(), old.srcPeerStat.GetByteRate(), stepRank(0, 100)) - keyRkCmp := rankCmp(bs.cur.srcPeerStat.GetKeyRate(), old.srcPeerStat.GetKeyRate(), stepRank(0, 10)) + bk, kk := getRegionStatKind(bs.rwTy, statistics.ByteDim), getRegionStatKind(bs.rwTy, statistics.KeyDim) + byteRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(bk), old.srcPeerStat.GetLoad(bk), stepRank(0, 100)) + keyRkCmp := rankCmp(bs.cur.srcPeerStat.GetLoad(kk), old.srcPeerStat.GetLoad(kk), stepRank(0, 10)) switch bs.cur.progressiveRank { case -2: // greatDecRatio < byteDecRatio <= minorDecRatio && keyDecRatio <= greatDecRatio @@ -901,27 +927,26 @@ func (bs *balanceSolver) compareSrcStore(st1, st2 uint64) int { if bs.rwTy == write && bs.opTy == transferLeader { lpCmp = sliceLPCmp( minLPCmp(negLoadCmp(sliceLoadCmp( - stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.KeyRate, bs.rankStep.KeyRate)), - stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.ByteRate, bs.rankStep.ByteRate)), + stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])), + stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])), ))), diffCmp(sliceLoadCmp( stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)), - stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.KeyRate)), - stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.ByteRate)), + stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.Loads[statistics.KeyDim])), + stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])), )), ) } else { lpCmp = sliceLPCmp( minLPCmp(negLoadCmp(sliceLoadCmp( - stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.ByteRate, bs.rankStep.ByteRate)), - stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.KeyRate, bs.rankStep.KeyRate)), + stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])), + stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])), ))), diffCmp( - stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.ByteRate)), + stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])), ), ) } - lp1 := bs.stLoadDetail[st1].LoadPred lp2 := bs.stLoadDetail[st2].LoadPred return lpCmp(lp1, lp2) @@ -937,26 +962,25 @@ func (bs *balanceSolver) compareDstStore(st1, st2 uint64) int { if bs.rwTy == write && bs.opTy == transferLeader { lpCmp = sliceLPCmp( maxLPCmp(sliceLoadCmp( - stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.KeyRate, bs.rankStep.KeyRate)), - stLdRankCmp(stLdByteRate, stepRank(bs.minDst.ByteRate, bs.rankStep.ByteRate)), + stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])), + stLdRankCmp(stLdByteRate, stepRank(bs.minDst.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])), )), diffCmp(sliceLoadCmp( stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)), - stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.KeyRate)), - stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.ByteRate)), + stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.Loads[statistics.KeyDim])), + stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])), ))) } else { lpCmp = sliceLPCmp( maxLPCmp(sliceLoadCmp( - stLdRankCmp(stLdByteRate, stepRank(bs.minDst.ByteRate, bs.rankStep.ByteRate)), - stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.KeyRate, bs.rankStep.KeyRate)), + stLdRankCmp(stLdByteRate, stepRank(bs.minDst.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])), + stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])), )), diffCmp( - stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.ByteRate)), + stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])), ), ) } - lp1 := bs.stLoadDetail[st1].LoadPred lp2 := bs.stLoadDetail[st2].LoadPred return lpCmp(lp1, lp2) @@ -1038,9 +1062,8 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String())) infl := Influence{ - ByteRate: bs.cur.srcPeerStat.GetByteRate(), - KeyRate: bs.cur.srcPeerStat.GetKeyRate(), - Count: 1, + Loads: append(bs.cur.srcPeerStat.Loads[:0:0], bs.cur.srcPeerStat.Loads...), + Count: 1, } return []*operator.Operator{op}, []Influence{infl} @@ -1075,21 +1098,21 @@ func (h *hotScheduler) GetHotWriteStatus() *statistics.StoreHotPeersInfos { } } -func (h *hotScheduler) GetWritePendingInfluence() map[uint64]Influence { +func (h *hotScheduler) GetWritePendingInfluence() map[uint64]*Influence { return h.copyPendingInfluence(writePeer) } -func (h *hotScheduler) GetReadPendingInfluence() map[uint64]Influence { +func (h *hotScheduler) GetReadPendingInfluence() map[uint64]*Influence { return h.copyPendingInfluence(readLeader) } -func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]Influence { +func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]*Influence { h.RLock() defer h.RUnlock() pendingSum := h.pendingSums[ty] - ret := make(map[uint64]Influence, len(pendingSum)) + ret := make(map[uint64]*Influence, len(pendingSum)) for id, infl := range pendingSum { - ret[id] = infl + ret[id] = infl.add(infl, 0) // copy } return ret } @@ -1185,3 +1208,17 @@ func toResourceType(rwTy rwType, opTy opType) resourceType { } panic(fmt.Sprintf("invalid arguments for toResourceType: rwTy = %v, opTy = %v", rwTy, opTy)) } + +func getRegionStatKind(rwTy rwType, dim int) statistics.RegionStatKind { + switch { + case rwTy == read && dim == statistics.ByteDim: + return statistics.RegionReadBytes + case rwTy == read && dim == statistics.KeyDim: + return statistics.RegionReadKeys + case rwTy == write && dim == statistics.ByteDim: + return statistics.RegionWriteBytes + case rwTy == write && dim == statistics.KeyDim: + return statistics.RegionWriteKeys + } + return 0 +} diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 909cfce314d..45673d20173 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -683,7 +683,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { c.Assert(len(stats), Equals, 2) for _, ss := range stats { for _, s := range ss { - c.Assert(s.GetByteRate(), Equals, 512.0*KB) + c.Assert(s.GetLoad(statistics.RegionReadBytes), Equals, 512.0*KB) } } diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 7269d4b1190..33b33961255 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -137,14 +137,14 @@ func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) [] case read: s.stLoadInfos[readLeader] = summaryStoresLoad( storesLoads, - map[uint64]Influence{}, + map[uint64]*Influence{}, cluster.RegionReadStats(), read, core.LeaderKind) return s.randomSchedule(cluster, s.stLoadInfos[readLeader]) case write: s.stLoadInfos[writeLeader] = summaryStoresLoad( storesLoads, - map[uint64]Influence{}, + map[uint64]*Influence{}, cluster.RegionWriteStats(), write, core.LeaderKind) return s.randomSchedule(cluster, s.stLoadInfos[writeLeader]) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index af04b267897..0d8ad60b0c6 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -147,16 +147,17 @@ func getKeyRanges(args []string) ([]core.KeyRange, error) { // Influence records operator influence. type Influence struct { - ByteRate float64 - KeyRate float64 - Count float64 + Loads []float64 + Count float64 } -func (infl Influence) add(rhs *Influence, w float64) Influence { - infl.ByteRate += rhs.ByteRate * w - infl.KeyRate += rhs.KeyRate * w - infl.Count += rhs.Count * w - return infl +func (lhs *Influence) add(rhs *Influence, w float64) *Influence { + var infl Influence + for i := range lhs.Loads { + infl.Loads = append(infl.Loads, lhs.Loads[i]+rhs.Loads[i]*w) + } + infl.Count = infl.Count + rhs.Count*w + return &infl } // TODO: merge it into OperatorInfluence. @@ -177,42 +178,58 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) // summaryPendingInfluence calculate the summary pending Influence for each store and return storeID -> Influence // It makes each key/byte rate or count become (1+w) times to the origin value while f is the function to provide w(weight) -func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]Influence { - ret := map[uint64]Influence{} +func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]*Influence { + ret := make(map[uint64]*Influence) for p := range pendings { w := f(p.op) if w == 0 { delete(pendings, p) } + if _, ok := ret[p.to]; !ok { + ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))} + } ret[p.to] = ret[p.to].add(&p.origin, w) + if _, ok := ret[p.from]; !ok { + ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))} + } ret[p.from] = ret[p.from].add(&p.origin, -w) } return ret } type storeLoad struct { - ByteRate float64 - KeyRate float64 - Count float64 + Loads []float64 + Count float64 } -func (load *storeLoad) ToLoadPred(infl Influence) *storeLoadPred { - future := *load - future.ByteRate += infl.ByteRate - future.KeyRate += infl.KeyRate - future.Count += infl.Count +func (load storeLoad) ToLoadPred(rwTy rwType, infl *Influence) *storeLoadPred { + future := storeLoad{ + Loads: append(load.Loads[:0:0], load.Loads...), + Count: load.Count, + } + if infl != nil { + switch rwTy { + case read: + future.Loads[statistics.ByteDim] += infl.Loads[statistics.RegionReadBytes] + future.Loads[statistics.KeyDim] += infl.Loads[statistics.RegionReadKeys] + case write: + future.Loads[statistics.ByteDim] += infl.Loads[statistics.RegionWriteBytes] + future.Loads[statistics.KeyDim] += infl.Loads[statistics.RegionWriteKeys] + } + future.Count += infl.Count + } return &storeLoadPred{ - Current: *load, + Current: load, Future: future, } } func stLdByteRate(ld *storeLoad) float64 { - return ld.ByteRate + return ld.Loads[statistics.ByteDim] } func stLdKeyRate(ld *storeLoad) float64 { - return ld.KeyRate + return ld.Loads[statistics.KeyDim] } func stLdCount(ld *storeLoad) float64 { @@ -271,10 +288,13 @@ func (lp *storeLoadPred) max() *storeLoad { func (lp *storeLoadPred) diff() *storeLoad { mx, mn := lp.max(), lp.min() + loads := make([]float64, len(mx.Loads)) + for i := range loads { + loads[i] = mx.Loads[i] - mn.Loads[i] + } return &storeLoad{ - ByteRate: mx.ByteRate - mn.ByteRate, - KeyRate: mx.KeyRate - mn.KeyRate, - Count: mx.Count - mn.Count, + Loads: loads, + Count: mx.Count - mn.Count, } } @@ -310,18 +330,24 @@ func diffCmp(ldCmp storeLoadCmp) storeLPCmp { } func minLoad(a, b *storeLoad) *storeLoad { + loads := make([]float64, len(a.Loads)) + for i := range loads { + loads[i] = math.Min(a.Loads[i], b.Loads[i]) + } return &storeLoad{ - ByteRate: math.Min(a.ByteRate, b.ByteRate), - KeyRate: math.Min(a.KeyRate, b.KeyRate), - Count: math.Min(a.Count, b.Count), + Loads: loads, + Count: math.Min(a.Count, b.Count), } } func maxLoad(a, b *storeLoad) *storeLoad { + loads := make([]float64, len(a.Loads)) + for i := range loads { + loads[i] = math.Max(a.Loads[i], b.Loads[i]) + } return &storeLoad{ - ByteRate: math.Max(a.ByteRate, b.ByteRate), - KeyRate: math.Max(a.KeyRate, b.KeyRate), - Count: math.Max(a.Count, b.Count), + Loads: loads, + Count: math.Max(a.Count, b.Count), } } @@ -332,18 +358,18 @@ type storeLoadDetail struct { func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat { peers := make([]statistics.HotPeerStat, 0, len(li.HotPeers)) - var totalBytesRate, totalKeysRate float64 + totalLoads := make([]float64, statistics.RegionStatCount) for _, peer := range li.HotPeers { if peer.HotDegree > 0 { peers = append(peers, *peer.Clone()) - totalBytesRate += peer.ByteRate - totalKeysRate += peer.KeyRate + for i := range totalLoads { + totalLoads[i] += peer.GetLoad(statistics.RegionStatKind(i)) + } } } return &statistics.HotPeersStat{ - TotalBytesRate: math.Round(totalBytesRate), - TotalKeysRate: math.Round(totalKeysRate), - Count: len(peers), - Stats: peers, + TotalLoads: totalLoads, + Count: len(peers), + Stats: peers, } } diff --git a/server/statistics/flowkind.go b/server/statistics/flowkind.go index 75923e626df..001a07c6902 100644 --- a/server/statistics/flowkind.go +++ b/server/statistics/flowkind.go @@ -31,3 +31,14 @@ func (k FlowKind) String() string { } return "unimplemented" } + +// RegionStats returns hot items according to kind +func (k FlowKind) RegionStats() []RegionStatKind { + switch k { + case WriteFlow: + return []RegionStatKind{RegionWriteBytes, RegionWriteKeys} + case ReadFlow: + return []RegionStatKind{RegionReadBytes, RegionReadKeys} + } + return nil +} diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 16670f2972a..1df72721830 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -18,22 +18,24 @@ import ( "time" "github.com/tikv/pd/pkg/movingaverage" + "github.com/tikv/pd/pkg/slice" "go.uber.org/zap" ) +// Indicator dims. const ( - byteDim int = iota - keyDim - dimLen + ByteDim int = iota + KeyDim + DimLen ) type dimStat struct { - typ int + typ RegionStatKind Rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. } -func newDimStat(typ int) *dimStat { +func newDimStat(typ RegionStatKind) *dimStat { reportInterval := RegionHeartBeatReportInterval * time.Second return &dimStat{ typ: typ, @@ -47,12 +49,12 @@ func (d *dimStat) Add(delta float64, interval time.Duration) { d.Rolling.Add(delta, interval) } -func (d *dimStat) isLastAverageHot(thresholds [dimLen]float64) bool { - return d.LastAverage.Get() >= thresholds[d.typ] +func (d *dimStat) isLastAverageHot(threshold float64) bool { + return d.LastAverage.Get() >= threshold } -func (d *dimStat) isHot(thresholds [dimLen]float64) bool { - return d.Rolling.Get() >= thresholds[d.typ] +func (d *dimStat) isHot(threshold float64) bool { + return d.Rolling.Get() >= threshold } func (d *dimStat) isFull() bool { @@ -77,13 +79,11 @@ type HotPeerStat struct { // AntiCount used to eliminate some noise when remove region in cache AntiCount int `json:"anti_count"` - Kind FlowKind `json:"-"` - ByteRate float64 `json:"flow_bytes"` - KeyRate float64 `json:"flow_keys"` + Kind FlowKind `json:"-"` + Loads []float64 `json:"loads"` // rolling statistics, recording some recently added records. - rollingByteRate *dimStat - rollingKeyRate *dimStat + rollingLoads []*dimStat // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` @@ -93,7 +93,7 @@ type HotPeerStat struct { isNew bool justTransferLeader bool interval uint64 - thresholds [dimLen]float64 + thresholds []float64 peers []uint64 lastTransferLeaderTime time.Time } @@ -105,15 +105,7 @@ func (stat *HotPeerStat) ID() uint64 { // Less compares two HotPeerStat.Implementing TopNItem. func (stat *HotPeerStat) Less(k int, than TopNItem) bool { - rhs := than.(*HotPeerStat) - switch k { - case keyDim: - return stat.GetKeyRate() < rhs.GetKeyRate() - case byteDim: - fallthrough - default: - return stat.GetByteRate() < rhs.GetByteRate() - } + return stat.GetLoad(RegionStatKind(k)) < than.(*HotPeerStat).GetLoad(RegionStatKind(k)) } // Log is used to output some info @@ -122,12 +114,9 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi zap.Uint64("interval", stat.interval), zap.Uint64("region-id", stat.RegionID), zap.Uint64("store", stat.StoreID), - zap.Float64("byte-rate", stat.GetByteRate()), - zap.Float64("byte-rate-instant", stat.ByteRate), - zap.Float64("byte-rate-threshold", stat.thresholds[byteDim]), - zap.Float64("key-rate", stat.GetKeyRate()), - zap.Float64("key-rate-instant", stat.KeyRate), - zap.Float64("key-rate-threshold", stat.thresholds[keyDim]), + zap.Float64s("loads", stat.GetLoads()), + zap.Float64s("loads-instant", stat.Loads), + zap.Float64s("thresholds", stat.thresholds), zap.Int("hot-degree", stat.HotDegree), zap.Int("hot-anti-count", stat.AntiCount), zap.Bool("just-transfer-leader", stat.justTransferLeader), @@ -157,43 +146,48 @@ func (stat *HotPeerStat) IsNew() bool { return stat.isNew } -// GetByteRate returns denoised BytesRate if possible. -func (stat *HotPeerStat) GetByteRate() float64 { - if stat.rollingByteRate == nil { - return math.Round(stat.ByteRate) +// GetLoad returns denoised load if possible. +func (stat *HotPeerStat) GetLoad(k RegionStatKind) float64 { + if len(stat.rollingLoads) > int(k) { + return math.Round(stat.rollingLoads[int(k)].Get()) } - return math.Round(stat.rollingByteRate.Get()) + return math.Round(stat.Loads[int(k)]) } -// GetKeyRate returns denoised KeysRate if possible. -func (stat *HotPeerStat) GetKeyRate() float64 { - if stat.rollingKeyRate == nil { - return math.Round(stat.KeyRate) +// GetLoads returns denoised load if possible. +func (stat *HotPeerStat) GetLoads() []float64 { + regionStats := stat.Kind.RegionStats() + loads := make([]float64, len(regionStats)) + for i, k := range regionStats { + loads[i] = stat.GetLoad(k) } - return math.Round(stat.rollingKeyRate.Get()) + return loads } // GetThresholds returns thresholds -func (stat *HotPeerStat) GetThresholds() [dimLen]float64 { +func (stat *HotPeerStat) GetThresholds() []float64 { return stat.thresholds } // Clone clones the HotPeerStat func (stat *HotPeerStat) Clone() *HotPeerStat { ret := *stat - ret.ByteRate = stat.GetByteRate() - ret.rollingByteRate = nil - ret.KeyRate = stat.GetKeyRate() - ret.rollingKeyRate = nil + ret.Loads = make([]float64, RegionStatCount) + for i := RegionStatKind(0); i < RegionStatCount; i++ { + ret.Loads[i] = stat.GetLoad(i) // replace with denoised loads + } + ret.rollingLoads = nil return &ret } func (stat *HotPeerStat) isFullAndHot() bool { - return (stat.rollingByteRate.isFull() && stat.rollingByteRate.isLastAverageHot(stat.thresholds)) || - (stat.rollingKeyRate.isFull() && stat.rollingKeyRate.isLastAverageHot(stat.thresholds)) + return slice.AnyOf(stat.rollingLoads, func(i int) bool { + return (stat.rollingLoads[i].isFull() && stat.rollingLoads[i].isLastAverageHot(stat.thresholds[i])) + }) } func (stat *HotPeerStat) clearLastAverage() { - stat.rollingByteRate.clearLastAverage() - stat.rollingKeyRate.clearLastAverage() + for _, l := range stat.rollingLoads { + l.clearLastAverage() + } } diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 891c1ade553..aca38dee14c 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/movingaverage" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/core" "go.uber.org/zap" ) @@ -39,18 +40,12 @@ const ( hotRegionAntiCount = 2 ) -var ( - minHotThresholds = [2][dimLen]float64{ - WriteFlow: { - byteDim: 1 * 1024, - keyDim: 32, - }, - ReadFlow: { - byteDim: 8 * 1024, - keyDim: 128, - }, - } -) +var minHotThresholds = [RegionStatCount]float64{ + RegionWriteBytes: 1 * 1024, + RegionWriteKeys: 32, + RegionReadBytes: 8 * 1024, + RegionReadKeys: 128, +} // hotPeerCache saves the hot peer's statistics. type hotPeerCache struct { @@ -98,7 +93,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) { } else { peers, ok := f.peersOfStore[item.StoreID] if !ok { - peers = NewTopN(dimLen, TopNN, topNTTL) + peers = NewTopN(DimLen, TopNN, topNTTL) f.peersOfStore[item.StoreID] = peers } peers.Put(item) @@ -113,34 +108,55 @@ func (f *hotPeerCache) Update(item *HotPeerStat) { } } -func (f *hotPeerCache) collectRegionMetrics(byteRate, keyRate float64, interval uint64) { +func (f *hotPeerCache) collectRegionMetrics(loads []float64, interval uint64) { regionHeartbeatIntervalHist.Observe(float64(interval)) if interval == 0 { return } - if f.kind == ReadFlow { - readByteHist.Observe(byteRate) - readKeyHist.Observe(keyRate) + // TODO: use unified metrics. (keep backward compatibility at the same time) + for _, k := range f.kind.RegionStats() { + switch k { + case RegionReadBytes: + readByteHist.Observe(loads[int(k)]) + case RegionReadKeys: + readKeyHist.Observe(loads[int(k)]) + case RegionWriteBytes: + writeByteHist.Observe(loads[int(k)]) + case RegionWriteKeys: + writeKeyHist.Observe(loads[int(k)]) + } } - if f.kind == WriteFlow { - writeByteHist.Observe(byteRate) - writeKeyHist.Observe(keyRate) +} + +func (f *hotPeerCache) getRegionDeltaLoads(region *core.RegionInfo) []float64 { + ret := make([]float64, RegionStatCount) + for k := RegionStatKind(0); k < RegionStatCount; k++ { + switch k { + case RegionReadBytes: + ret[k] = float64(region.GetBytesRead()) + case RegionReadKeys: + ret[k] = float64(region.GetKeysRead()) + case RegionWriteBytes: + ret[k] = float64(region.GetBytesWritten()) + case RegionWriteKeys: + ret[k] = float64(region.GetKeysWritten()) + } } + return ret } // CheckRegionFlow checks the flow information of region. func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerStat) { - - bytes := float64(f.getRegionBytes(region)) - keys := float64(f.getRegionKeys(region)) - reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - byteRate := bytes / float64(interval) - keyRate := keys / float64(interval) + deltaLoads := f.getRegionDeltaLoads(region) + loads := make([]float64, len(deltaLoads)) + for i := range deltaLoads { + loads[i] = deltaLoads[i] / float64(interval) + } + f.collectRegionMetrics(loads, interval) - f.collectRegionMetrics(byteRate, keyRate, interval) // old region is in the front and new region is in the back // which ensures it will hit the cache if moving peer or transfer leader occurs with the same replica number @@ -170,8 +186,7 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS StoreID: storeID, RegionID: region.GetID(), Kind: f.kind, - ByteRate: byteRate, - KeyRate: keyRate, + Loads: loads, LastUpdateTime: time.Now(), needDelete: isExpired, isLeader: region.GetLeader().GetStoreId() == storeID, @@ -194,7 +209,7 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS } } - newItem = f.updateHotPeerStat(newItem, oldItem, bytes, keys, time.Duration(interval)*time.Second) + newItem = f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) if newItem != nil { ret = append(ret, newItem) } @@ -224,33 +239,13 @@ func (f *hotPeerCache) CollectMetrics(typ string) { store := storeTag(storeID) thresholds := f.calcHotThresholds(storeID) hotCacheStatusGauge.WithLabelValues("total_length", store, typ).Set(float64(peers.Len())) - hotCacheStatusGauge.WithLabelValues("byte-rate-threshold", store, typ).Set(thresholds[byteDim]) - hotCacheStatusGauge.WithLabelValues("key-rate-threshold", store, typ).Set(thresholds[keyDim]) + hotCacheStatusGauge.WithLabelValues("byte-rate-threshold", store, typ).Set(thresholds[ByteDim]) + hotCacheStatusGauge.WithLabelValues("key-rate-threshold", store, typ).Set(thresholds[KeyDim]) // for compatibility - hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(thresholds[byteDim]) + hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(thresholds[ByteDim]) } } -func (f *hotPeerCache) getRegionBytes(region *core.RegionInfo) uint64 { - switch f.kind { - case WriteFlow: - return region.GetBytesWritten() - case ReadFlow: - return region.GetBytesRead() - } - return 0 -} - -func (f *hotPeerCache) getRegionKeys(region *core.RegionInfo) uint64 { - switch f.kind { - case WriteFlow: - return region.GetKeysWritten() - case ReadFlow: - return region.GetKeysRead() - } - return 0 -} - func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat { if hotPeers, ok := f.peersOfStore[storeID]; ok { if v := hotPeers.Get(regionID); v != nil { @@ -270,18 +265,19 @@ func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64) return false } -func (f *hotPeerCache) calcHotThresholds(storeID uint64) [dimLen]float64 { - minThresholds := minHotThresholds[f.kind] +func (f *hotPeerCache) calcHotThresholds(storeID uint64) []float64 { + statKinds := f.kind.RegionStats() + mins := make([]float64, len(statKinds)) + for i, k := range statKinds { + mins[i] = minHotThresholds[k] + } tn, ok := f.peersOfStore[storeID] if !ok || tn.Len() < TopNN { - return minThresholds + return mins } - ret := [dimLen]float64{ - byteDim: tn.GetTopNMin(byteDim).(*HotPeerStat).GetByteRate(), - keyDim: tn.GetTopNMin(keyDim).(*HotPeerStat).GetKeyRate(), - } - for k := 0; k < dimLen; k++ { - ret[k] = math.Max(ret[k]*HotThresholdRatio, minThresholds[k]) + ret := make([]float64, len(statKinds)) + for i := range ret { + ret[i] = math.Max(tn.GetTopNMin(i).(*HotPeerStat).GetLoad(statKinds[i])*HotThresholdRatio, mins[i]) } return ret } @@ -378,16 +374,20 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, RegionHeartBeatReportInterval*time.Second) } -func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, keys float64, interval time.Duration) *HotPeerStat { +func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { if newItem.needDelete { return newItem } + regionStats := f.kind.RegionStats() + if oldItem == nil { if interval == 0 { return nil } - isHot := bytes/interval.Seconds() >= newItem.thresholds[byteDim] || keys/interval.Seconds() >= newItem.thresholds[keyDim] + isHot := slice.AnyOf(regionStats, func(i int) bool { + return deltaLoads[regionStats[i]]/interval.Seconds() >= newItem.thresholds[i] + }) if !isHot { return nil } @@ -396,18 +396,19 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k newItem.AntiCount = hotRegionAntiCount } newItem.isNew = true - newItem.rollingByteRate = newDimStat(byteDim) - newItem.rollingKeyRate = newDimStat(keyDim) - newItem.rollingByteRate.Add(bytes, interval) - newItem.rollingKeyRate.Add(keys, interval) - if newItem.rollingKeyRate.isFull() { - newItem.clearLastAverage() + newItem.rollingLoads = make([]*dimStat, len(regionStats)) + for i, k := range regionStats { + ds := newDimStat(k) + ds.Add(deltaLoads[k], interval) + if ds.isFull() { + ds.clearLastAverage() + } + newItem.rollingLoads[i] = ds } return newItem } - newItem.rollingByteRate = oldItem.rollingByteRate - newItem.rollingKeyRate = oldItem.rollingKeyRate + newItem.rollingLoads = oldItem.rollingLoads if newItem.justTransferLeader { // skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate @@ -419,10 +420,13 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k } newItem.lastTransferLeaderTime = oldItem.lastTransferLeaderTime - newItem.rollingByteRate.Add(bytes, interval) - newItem.rollingKeyRate.Add(keys, interval) - if !newItem.rollingKeyRate.isFull() { + for i, k := range regionStats { + newItem.rollingLoads[i].Add(deltaLoads[k], interval) + } + + isFull := newItem.rollingLoads[0].isFull() // The intervals of dims are the same, so it is only necessary to determine whether any of them + if !isFull { // not update hot degree and anti count newItem.HotDegree = oldItem.HotDegree newItem.AntiCount = oldItem.AntiCount diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index bb359084953..a0f8198b786 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -219,57 +219,57 @@ func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { cache := NewHotStoresStats(ReadFlow) // skip interval=0 - newItem := &HotPeerStat{needDelete: false, thresholds: [2]float64{0.0, 0.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 0, 0, 0) + newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0}} + newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0}, 0) c.Check(newItem, IsNil) // new peer, interval is larger than report interval, but no hot - newItem = &HotPeerStat{needDelete: false, thresholds: [2]float64{1.0, 1.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 0, 0, 60*time.Second) + newItem = &HotPeerStat{needDelete: false, thresholds: []float64{1.0, 1.0}} + newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0}, 60*time.Second) c.Check(newItem, IsNil) // new peer, interval is less than report interval - newItem = &HotPeerStat{needDelete: false, thresholds: [2]float64{0.0, 0.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 60, 60, 30*time.Second) + newItem = &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0}} + newItem = cache.updateHotPeerStat(newItem, nil, []float64{60.0, 60.0}, 30*time.Second) c.Check(newItem, NotNil) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is less than report interval oldItem := newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 30*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0}, 30*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2) // sum of interval is less than report interval oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 50*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0}, 50*time.Second) c.Check(newItem.HotDegree, Equals, 2) c.Check(newItem.AntiCount, Equals, 2) // sum of interval is larger than report interval, and cold oldItem = newItem - newItem.thresholds = [2]float64{10.0, 10.0} - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 60*time.Second) + newItem.thresholds = []float64{10.0, 10.0} + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0}, 60*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 1) // sum of interval is larger than report interval, and cold oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 60*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0}, 60*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) c.Check(newItem.needDelete, Equals, true) } func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { - byteRate := minHotThresholds[ReadFlow][byteDim] * 2 + byteRate := minHotThresholds[RegionReadBytes] * 2 expectThreshold := byteRate * HotThresholdRatio t.testMetrics(c, 120., byteRate, expectThreshold) t.testMetrics(c, 60., byteRate, expectThreshold) @@ -280,33 +280,34 @@ func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold float64) { cache := NewHotStoresStats(ReadFlow) - minThresholds := minHotThresholds[cache.kind] storeID := uint64(1) - c.Assert(byteRate, GreaterEqual, minThresholds[byteDim]) + c.Assert(byteRate, GreaterEqual, minHotThresholds[RegionReadBytes]) for i := uint64(1); i < TopNN+10; i++ { var oldItem *HotPeerStat for { thresholds := cache.calcHotThresholds(storeID) newItem := &HotPeerStat{ + Kind: cache.kind, StoreID: storeID, RegionID: i, needDelete: false, thresholds: thresholds, - ByteRate: byteRate, - KeyRate: 0, + Loads: make([]float64, DimLen), } + newItem.Loads[RegionReadBytes] = byteRate + newItem.Loads[RegionReadKeys] = 0 oldItem = cache.getOldHotPeerStat(i, storeID) - if oldItem != nil && oldItem.rollingByteRate.isHot(thresholds) == true { + if oldItem != nil && oldItem.rollingLoads[RegionReadBytes].isHot(thresholds[RegionReadBytes]) == true { break } - item := cache.updateHotPeerStat(newItem, oldItem, byteRate*interval, 0, time.Duration(interval)*time.Second) + item := cache.updateHotPeerStat(newItem, oldItem, []float64{byteRate * interval, 0.0}, time.Duration(interval)*time.Second) cache.Update(item) } thresholds := cache.calcHotThresholds(storeID) if i < TopNN { - c.Assert(thresholds[byteDim], Equals, minThresholds[byteDim]) + c.Assert(thresholds[RegionReadBytes], Equals, minHotThresholds[RegionReadBytes]) } else { - c.Assert(thresholds[byteDim], Equals, expectThreshold) + c.Assert(thresholds[RegionReadBytes], Equals, expectThreshold) } } } diff --git a/server/statistics/hot_regions_stat.go b/server/statistics/hot_regions_stat.go index a466c102562..b48afb28357 100644 --- a/server/statistics/hot_regions_stat.go +++ b/server/statistics/hot_regions_stat.go @@ -15,8 +15,7 @@ package statistics // HotPeersStat records all hot regions statistics type HotPeersStat struct { - TotalBytesRate float64 `json:"total_flow_bytes"` - TotalKeysRate float64 `json:"total_flow_keys"` - Count int `json:"regions_count"` - Stats []HotPeerStat `json:"statistics"` + TotalLoads []float64 `json:"total_loads"` + Count int `json:"regions_count"` + Stats []HotPeerStat `json:"statistics"` } diff --git a/server/statistics/kind.go b/server/statistics/kind.go index 978ec18ce80..b20309df3c6 100644 --- a/server/statistics/kind.go +++ b/server/statistics/kind.go @@ -13,6 +13,33 @@ package statistics +// RegionStatKind represents the statistics type of region. +type RegionStatKind int + +// Different region statistics kinds. +const ( + RegionReadBytes RegionStatKind = iota + RegionReadKeys + RegionWriteBytes + RegionWriteKeys + + RegionStatCount +) + +func (k RegionStatKind) String() string { + switch k { + case RegionReadBytes: + return "read_bytes" + case RegionReadKeys: + return "read_keys" + case RegionWriteBytes: + return "write_bytes" + case RegionWriteKeys: + return "write_keys" + } + return "unknown RegionStatKind" +} + // StoreStatKind represents the statistics type of store. type StoreStatKind int