Skip to content

Commit

Permalink
statistics: multidimensional hot peer cache (#2140)
Browse files Browse the repository at this point in the history
  • Loading branch information
Luffbee authored Feb 21, 2020
1 parent 33155fe commit f8db500
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 174 deletions.
2 changes: 1 addition & 1 deletion server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, storeID
if stat, ok := stats[storeID]; ok {
storeFlow = stat.TotalBytesRate
for _, flow := range stat.Stats {
regionFlows = append(regionFlows, flow.GetBytesRate())
regionFlows = append(regionFlows, flow.GetByteRate())
}
}
return
Expand Down
10 changes: 5 additions & 5 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func summaryStoresLoad(
{
hotSum := 0.0
for _, peer := range filterHotPeers(kind, minHotDegree, storeHotPeers[id]) {
hotSum += peer.GetBytesRate()
hotSum += peer.GetByteRate()
hotPeers = append(hotPeers, peer.Clone())
}
// Use sum of hot peers to estimate leader-only byte rate.
Expand Down Expand Up @@ -628,12 +628,12 @@ func (bs *balanceSolver) selectDstStoreID(candidateIDs map[uint64]struct{}) uint
}
switch bs.opTy {
case movePeer:
return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID])
return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID])
case transferLeader:
if bs.rwTy == write {
return selectDstStoreByCount(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID])
return selectDstStoreByCount(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID])
}
return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID])
return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID])
default:
return 0
}
Expand Down Expand Up @@ -685,7 +685,7 @@ func (bs *balanceSolver) buildOperators() []*operator.Operator {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String()))

infl := Influence{ByteRate: bs.srcPeerStat.GetBytesRate()}
infl := Influence{ByteRate: bs.srcPeerStat.GetByteRate()}
if bs.opTy == transferLeader && bs.rwTy == write {
infl.ByteRate = 0
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) {
c.Assert(len(stats), Equals, 2)
for _, ss := range stats {
for _, s := range ss {
c.Assert(s.BytesRate, Equals, 512.0*KB)
c.Assert(s.ByteRate, Equals, 512.0*KB)
}
}

Expand Down
53 changes: 39 additions & 14 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ package statistics

import "time"

const (
byteDim int = iota
keyDim
dimLen
)

// HotPeerStat records each hot peer's statistics
type HotPeerStat struct {
StoreID uint64 `json:"store_id"`
Expand All @@ -25,11 +31,13 @@ type HotPeerStat struct {
// AntiCount used to eliminate some noise when remove region in cache
AntiCount int

Kind FlowKind `json:"kind"`
BytesRate float64 `json:"flow_bytes"`
KeysRate float64 `json:"flow_keys"`
// RollingBytesRate is a rolling statistics, recording some recently added records.
RollingBytesRate MovingAvg
Kind FlowKind `json:"kind"`
ByteRate float64 `json:"flow_bytes"`
KeyRate float64 `json:"flow_keys"`

// rolling statistics, recording some recently added records.
rollingByteRate MovingAvg
rollingKeyRate MovingAvg

// LastUpdateTime used to calculate average write
LastUpdateTime time.Time `json:"last_update_time"`
Expand All @@ -47,9 +55,16 @@ func (stat *HotPeerStat) ID() uint64 {
}

// Less compares two HotPeerStat.Implementing TopNItem.
func (stat *HotPeerStat) Less(than TopNItem) bool {
func (stat *HotPeerStat) Less(k int, than TopNItem) bool {
rhs := than.(*HotPeerStat)
return stat.BytesRate < rhs.BytesRate
switch k {
case keyDim:
return stat.GetKeyRate() < rhs.GetKeyRate()
case byteDim:
fallthrough
default:
return stat.GetByteRate() < rhs.GetByteRate()
}
}

// IsNeedDelete to delete the item in cache.
Expand All @@ -67,18 +82,28 @@ func (stat *HotPeerStat) IsNew() bool {
return stat.isNew
}

// GetBytesRate returns denoised BytesRate if possible.
func (stat *HotPeerStat) GetBytesRate() float64 {
if stat.RollingBytesRate == nil {
return stat.BytesRate
// GetByteRate returns denoised BytesRate if possible.
func (stat *HotPeerStat) GetByteRate() float64 {
if stat.rollingByteRate == nil {
return stat.ByteRate
}
return stat.rollingByteRate.Get()
}

// GetKeyRate returns denoised KeysRate if possible.
func (stat *HotPeerStat) GetKeyRate() float64 {
if stat.rollingKeyRate == nil {
return stat.KeyRate
}
return stat.RollingBytesRate.Get()
return stat.rollingKeyRate.Get()
}

// Clone clones the HotPeerStat
func (stat *HotPeerStat) Clone() *HotPeerStat {
ret := *stat
ret.BytesRate = stat.GetBytesRate()
ret.RollingBytesRate = nil
ret.ByteRate = stat.GetByteRate()
ret.rollingByteRate = nil
ret.KeyRate = stat.GetKeyRate()
ret.rollingKeyRate = nil
return &ret
}
83 changes: 51 additions & 32 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@ const (

rollingWindowsSize = 5

hotWriteRegionMinBytesRate = 1 * 1024
hotReadRegionMinBytesRate = 8 * 1024

hotRegionReportMinInterval = 3

hotRegionAntiCount = 1
hotRegionAntiCount = 2
)

var (
minHotThresholds = [2][dimLen]float64{
WriteFlow: {
byteDim: 1 * 1024,
keyDim: 32,
},
ReadFlow: {
byteDim: 8 * 1024,
keyDim: 128,
},
}
)

// hotPeerCache saves the hot peer's statistics.
Expand Down Expand Up @@ -79,7 +89,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
} else {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
peers = NewTopN(topNN, topNTTL)
peers = NewTopN(dimLen, topNN, topNTTL)
f.peersOfStore[item.StoreID] = peers
}
peers.Put(item)
Expand All @@ -94,7 +104,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
}

// CheckRegionFlow checks the flow information of region.
func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresStats) (ret []*HotPeerStat) {
func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *StoresStats) (ret []*HotPeerStat) {
storeIDs := f.getAllStoreIDs(region)

totalBytes := float64(f.getTotalBytes(region))
Expand All @@ -103,8 +113,8 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresSta
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()

bytesPerSec := totalBytes / float64(interval)
keysPerSec := totalKeys / float64(interval)
byteRate := totalBytes / float64(interval)
keyRate := totalKeys / float64(interval)

for storeID := range storeIDs {
isExpired := f.isRegionExpired(region, storeID)
Expand All @@ -119,16 +129,15 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresSta
StoreID: storeID,
RegionID: region.GetID(),
Kind: f.kind,
BytesRate: bytesPerSec,
KeysRate: keysPerSec,
ByteRate: byteRate,
KeyRate: keyRate,
LastUpdateTime: time.Now(),
Version: region.GetMeta().GetRegionEpoch().GetVersion(),
needDelete: isExpired,
isLeader: region.GetLeader().GetStoreId() == storeID,
}

hotThreshold := f.calcHotThreshold(stats, storeID)
newItem = updateHotPeerStat(newItem, oldItem, bytesPerSec, hotThreshold)
newItem = f.updateHotPeerStat(newItem, oldItem, storesStats)
if newItem != nil {
ret = append(ret, newItem)
}
Expand All @@ -150,9 +159,12 @@ func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool
func (f *hotPeerCache) CollectMetrics(stats *StoresStats, typ string) {
for storeID, peers := range f.peersOfStore {
store := storeTag(storeID)
threshold := f.calcHotThreshold(stats, storeID)
thresholds := f.calcHotThresholds(stats, storeID)
hotCacheStatusGauge.WithLabelValues("total_length", store, typ).Set(float64(peers.Len()))
hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(threshold)
hotCacheStatusGauge.WithLabelValues("byte-rate-threshold", store, typ).Set(thresholds[byteDim])
hotCacheStatusGauge.WithLabelValues("key-rate-threshold", store, typ).Set(thresholds[keyDim])
// for compatibility
hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(thresholds[byteDim])
}
}

Expand Down Expand Up @@ -195,21 +207,20 @@ func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64)
return false
}

func (f *hotPeerCache) calcHotThreshold(stats *StoresStats, storeID uint64) float64 {
var minHotThreshold float64
switch f.kind {
case WriteFlow:
minHotThreshold = hotWriteRegionMinBytesRate
case ReadFlow:
minHotThreshold = hotReadRegionMinBytesRate
}

func (f *hotPeerCache) calcHotThresholds(stats *StoresStats, storeID uint64) [dimLen]float64 {
minThresholds := minHotThresholds[f.kind]
tn, ok := f.peersOfStore[storeID]
if !ok || tn.Len() < topNN {
return minHotThreshold
return minThresholds
}
ret := [dimLen]float64{
byteDim: tn.GetTopNMin(byteDim).(*HotPeerStat).ByteRate,
keyDim: tn.GetTopNMin(keyDim).(*HotPeerStat).KeyRate,
}
tnMin := tn.GetTopNMin().(*HotPeerStat).BytesRate
return math.Max(tnMin*hotThresholdRatio, minHotThreshold)
for k := 0; k < dimLen; k++ {
ret[k] = math.Max(ret[k]*hotThresholdRatio, minThresholds[k])
}
return ret
}

// gets the storeIDs, including old region and new region
Expand Down Expand Up @@ -259,32 +270,40 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
return false
}

func updateHotPeerStat(newItem, oldItem *HotPeerStat, bytesRate float64, hotThreshold float64) *HotPeerStat {
isHot := bytesRate >= hotThreshold
func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, storesStats *StoresStats) *HotPeerStat {
thresholds := f.calcHotThresholds(storesStats, newItem.StoreID)
isHot := newItem.ByteRate >= thresholds[byteDim] ||
newItem.KeyRate >= thresholds[keyDim]

if newItem.needDelete {
return newItem
}

if oldItem != nil {
newItem.RollingBytesRate = oldItem.RollingBytesRate
newItem.rollingByteRate = oldItem.rollingByteRate
newItem.rollingKeyRate = oldItem.rollingKeyRate
if isHot {
newItem.HotDegree = oldItem.HotDegree + 1
newItem.AntiCount = hotRegionAntiCount
} else {
newItem.HotDegree = oldItem.HotDegree - 1
newItem.AntiCount = oldItem.AntiCount - 1
if newItem.AntiCount < 0 {
if newItem.AntiCount <= 0 {
newItem.needDelete = true
}
}
} else {
if !isHot {
return nil
}
newItem.RollingBytesRate = NewMedianFilter(rollingWindowsSize)
newItem.rollingByteRate = NewMedianFilter(rollingWindowsSize)
newItem.rollingKeyRate = NewMedianFilter(rollingWindowsSize)
newItem.AntiCount = hotRegionAntiCount
newItem.isNew = true
}
newItem.RollingBytesRate.Add(bytesRate)

newItem.rollingByteRate.Add(newItem.ByteRate)
newItem.rollingKeyRate.Add(newItem.KeyRate)

return newItem
}
Loading

0 comments on commit f8db500

Please sign in to comment.