Skip to content

Commit

Permalink
revise
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Apr 27, 2021
1 parent dbd617f commit 6e91990
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 33 deletions.
4 changes: 2 additions & 2 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type dimStat struct {
}

func newDimStat(typ int) *dimStat {
reportInterval := RegionHeartBeatReportInterval * time.Second
reportInterval := HotStatReportInterval * time.Second
return &dimStat{
typ: typ,
Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval),
Expand Down Expand Up @@ -140,7 +140,7 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi

// IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule
func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool {
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*RegionHeartBeatReportInterval)
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*HotStatReportInterval)
}

// IsNeedDelete to delete the item in cache.
Expand Down
64 changes: 33 additions & 31 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ const (
HotThresholdRatio = 0.8
// ReportInterval indicates the interval between each data reporting
// TODO: change into StoreHeartBeatReportInterval when we use store heartbeat to report data
ReportInterval = RegionHeartBeatReportInterval
topNTTL = 3 * ReportInterval * time.Second
HotStatReportInterval = RegionHeartBeatReportInterval
topNTTL = 3 * HotStatReportInterval * time.Second

rollingWindowsSize = 5

Expand Down Expand Up @@ -123,24 +123,18 @@ func (f *hotPeerCache) collectPeerMetrics(byteRate, keyRate float64, interval ui

// CheckRegionFlow checks the flow information of region.
func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerStat) {
regionID := region.GetID()
storeIDs := f.getAllStoreIDs(region)
var peers []uint64
for _, peer := range region.GetPeers() {
peers = append(peers, peer.StoreId)
}

reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()

{
// TODO: collect metrics by peer stat
bytes := float64(f.getRegionBytes(region))
keys := float64(f.getRegionKeys(region))
byteRate := bytes / float64(interval)
keyRate := keys / float64(interval)
f.collectRegionMetrics(byteRate, keyRate, interval)
}

regionID := region.GetID()
storeIDs := f.getAllStoreIDs(region)
for _, storeID := range storeIDs {
peer := region.GetStorePeer(storeID)
var item *HotPeerStat
Expand All @@ -152,15 +146,16 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS
SetWriteBytes(region.GetBytesWritten())
item = f.CheckPeerFlow(peerInfo, region, interval)
} else {
// Not in region anymore
item = f.getOldHotPeerStat(regionID, storeID)
item.needDelete = true
item = f.markExpiredItem(regionID, storeID)
}
if item != nil {
ret = append(ret, item)
}
}

var peers []uint64
for _, peer := range region.GetPeers() {
peers = append(peers, peer.StoreId)
}
log.Debug("region heartbeat info",
zap.String("type", f.kind.String()),
zap.Uint64("region", region.GetID()),
Expand Down Expand Up @@ -355,7 +350,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
}

func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian {
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, RegionHeartBeatReportInterval*time.Second)
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, HotStatReportInterval*time.Second)
}

func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, keys float64, interval time.Duration) *HotPeerStat {
Expand All @@ -371,7 +366,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k
if !isHot {
return nil
}
if interval.Seconds() >= RegionHeartBeatReportInterval {
if interval.Seconds() >= HotStatReportInterval {
newItem.HotDegree = 1
newItem.AntiCount = hotRegionAntiCount
}
Expand All @@ -389,6 +384,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k
newItem.rollingByteRate = oldItem.rollingByteRate
newItem.rollingKeyRate = oldItem.rollingKeyRate

// TODO: don't inherit hot degree after transfer leader when we report peer by store heartbeat.
if newItem.justTransferLeader {
// skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate
// maintain anticount and hotdegree to avoid store threshold and hot peer are unstable.
Expand Down Expand Up @@ -453,6 +449,22 @@ func (f *hotPeerCache) getRegionKeys(region *core.RegionInfo) uint64 {
return 0
}

// TODO: remove it in future
func (f *hotPeerCache) collectRegionMetrics(byteRate, keyRate float64, interval uint64) {
regionHeartbeatIntervalHist.Observe(float64(interval))
if interval == 0 {
return
}
if f.kind == ReadFlow {
readByteHist.Observe(byteRate)
readKeyHist.Observe(keyRate)
}
if f.kind == WriteFlow {
writeByteHist.Observe(byteRate)
writeKeyHist.Observe(keyRate)
}
}

// TODO: remove it in future
// gets the storeIDs, including old region and new region
func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 {
Expand Down Expand Up @@ -482,18 +494,8 @@ func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 {
return ret
}

// TODO: remove it in future
func (f *hotPeerCache) collectRegionMetrics(byteRate, keyRate float64, interval uint64) {
regionHeartbeatIntervalHist.Observe(float64(interval))
if interval == 0 {
return
}
if f.kind == ReadFlow {
readByteHist.Observe(byteRate)
readKeyHist.Observe(keyRate)
}
if f.kind == WriteFlow {
writeByteHist.Observe(byteRate)
writeKeyHist.Observe(keyRate)
}
func (f *hotPeerCache) markExpiredItem(regionID, storeID uint64) *HotPeerStat {
item := f.getOldHotPeerStat(regionID, storeID)
item.needDelete = true
return item
}

0 comments on commit 6e91990

Please sign in to comment.