diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 318de0429614..ce217bac8d51 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -34,7 +34,7 @@ type dimStat struct { } func newDimStat(typ int) *dimStat { - reportInterval := RegionHeartBeatReportInterval * time.Second + reportInterval := HotStatReportInterval * time.Second return &dimStat{ typ: typ, Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), @@ -140,7 +140,7 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi // IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool { - return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*RegionHeartBeatReportInterval) + return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*HotStatReportInterval) } // IsNeedDelete to delete the item in cache. diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index b2755322f2a0..6bf25c5eab0d 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -31,8 +31,8 @@ const ( HotThresholdRatio = 0.8 // ReportInterval indicates the interval between each data reporting // TODO: change into StoreHeartBeatReportInterval when we use store heartbeat to report data - ReportInterval = RegionHeartBeatReportInterval - topNTTL = 3 * ReportInterval * time.Second + HotStatReportInterval = RegionHeartBeatReportInterval + topNTTL = 3 * HotStatReportInterval * time.Second rollingWindowsSize = 5 @@ -123,24 +123,18 @@ func (f *hotPeerCache) collectPeerMetrics(byteRate, keyRate float64, interval ui // CheckRegionFlow checks the flow information of region. func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerStat) { - regionID := region.GetID() - storeIDs := f.getAllStoreIDs(region) - var peers []uint64 - for _, peer := range region.GetPeers() { - peers = append(peers, peer.StoreId) - } - reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - { + // TODO: collect metrics by peer stat bytes := float64(f.getRegionBytes(region)) keys := float64(f.getRegionKeys(region)) byteRate := bytes / float64(interval) keyRate := keys / float64(interval) f.collectRegionMetrics(byteRate, keyRate, interval) } - + regionID := region.GetID() + storeIDs := f.getAllStoreIDs(region) for _, storeID := range storeIDs { peer := region.GetStorePeer(storeID) var item *HotPeerStat @@ -152,15 +146,16 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS SetWriteBytes(region.GetBytesWritten()) item = f.CheckPeerFlow(peerInfo, region, interval) } else { - // Not in region anymore - item = f.getOldHotPeerStat(regionID, storeID) - item.needDelete = true + item = f.markExpiredItem(regionID, storeID) } if item != nil { ret = append(ret, item) } } - + var peers []uint64 + for _, peer := range region.GetPeers() { + peers = append(peers, peer.StoreId) + } log.Debug("region heartbeat info", zap.String("type", f.kind.String()), zap.Uint64("region", region.GetID()), @@ -355,7 +350,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb } func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { - return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, RegionHeartBeatReportInterval*time.Second) + return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, HotStatReportInterval*time.Second) } func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, keys float64, interval time.Duration) *HotPeerStat { @@ -371,7 +366,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k if !isHot { return nil } - if interval.Seconds() >= RegionHeartBeatReportInterval { + if interval.Seconds() >= HotStatReportInterval { newItem.HotDegree = 1 newItem.AntiCount = hotRegionAntiCount } @@ -389,6 +384,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k newItem.rollingByteRate = oldItem.rollingByteRate newItem.rollingKeyRate = oldItem.rollingKeyRate + // TODO: don't inherit hot degree after transfer leader when we report peer by store heartbeat. if newItem.justTransferLeader { // skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate // maintain anticount and hotdegree to avoid store threshold and hot peer are unstable. @@ -453,6 +449,22 @@ func (f *hotPeerCache) getRegionKeys(region *core.RegionInfo) uint64 { return 0 } +// TODO: remove it in future +func (f *hotPeerCache) collectRegionMetrics(byteRate, keyRate float64, interval uint64) { + regionHeartbeatIntervalHist.Observe(float64(interval)) + if interval == 0 { + return + } + if f.kind == ReadFlow { + readByteHist.Observe(byteRate) + readKeyHist.Observe(keyRate) + } + if f.kind == WriteFlow { + writeByteHist.Observe(byteRate) + writeKeyHist.Observe(keyRate) + } +} + // TODO: remove it in future // gets the storeIDs, including old region and new region func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { @@ -482,18 +494,8 @@ func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { return ret } -// TODO: remove it in future -func (f *hotPeerCache) collectRegionMetrics(byteRate, keyRate float64, interval uint64) { - regionHeartbeatIntervalHist.Observe(float64(interval)) - if interval == 0 { - return - } - if f.kind == ReadFlow { - readByteHist.Observe(byteRate) - readKeyHist.Observe(keyRate) - } - if f.kind == WriteFlow { - writeByteHist.Observe(byteRate) - writeKeyHist.Observe(keyRate) - } +func (f *hotPeerCache) markExpiredItem(regionID, storeID uint64) *HotPeerStat { + item := f.getOldHotPeerStat(regionID, storeID) + item.needDelete = true + return item }