Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: multidimensional hot peer cache #2140

Merged
merged 9 commits into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, storeID
if stat, ok := stats[storeID]; ok {
storeFlow = stat.TotalBytesRate
for _, flow := range stat.Stats {
regionFlows = append(regionFlows, flow.GetBytesRate())
regionFlows = append(regionFlows, flow.GetByteRate())
}
}
return
Expand Down
10 changes: 5 additions & 5 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func summaryStoresLoad(
{
hotSum := 0.0
for _, peer := range filterHotPeers(kind, minHotDegree, storeHotPeers[id]) {
hotSum += peer.GetBytesRate()
hotSum += peer.GetByteRate()
hotPeers = append(hotPeers, peer.Clone())
}
// Use sum of hot peers to estimate leader-only byte rate.
Expand Down Expand Up @@ -628,12 +628,12 @@ func (bs *balanceSolver) selectDstStoreID(candidateIDs map[uint64]struct{}) uint
}
switch bs.opTy {
case movePeer:
return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID])
return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID])
case transferLeader:
if bs.rwTy == write {
return selectDstStoreByCount(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID])
return selectDstStoreByCount(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID])
}
return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID])
return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID])
default:
return 0
}
Expand Down Expand Up @@ -685,7 +685,7 @@ func (bs *balanceSolver) buildOperators() []*operator.Operator {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String()))

infl := Influence{ByteRate: bs.srcPeerStat.GetBytesRate()}
infl := Influence{ByteRate: bs.srcPeerStat.GetByteRate()}
if bs.opTy == transferLeader && bs.rwTy == write {
infl.ByteRate = 0
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) {
c.Assert(len(stats), Equals, 2)
for _, ss := range stats {
for _, s := range ss {
c.Assert(s.BytesRate, Equals, 512.0*KB)
c.Assert(s.ByteRate, Equals, 512.0*KB)
}
}

Expand Down
53 changes: 39 additions & 14 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ package statistics

import "time"

const (
byteDim int = iota
keyDim
dimLen
)

// HotPeerStat records each hot peer's statistics
type HotPeerStat struct {
StoreID uint64 `json:"store_id"`
Expand All @@ -25,11 +31,13 @@ type HotPeerStat struct {
// AntiCount used to eliminate some noise when remove region in cache
AntiCount int

Kind FlowKind `json:"kind"`
BytesRate float64 `json:"flow_bytes"`
KeysRate float64 `json:"flow_keys"`
// RollingBytesRate is a rolling statistics, recording some recently added records.
RollingBytesRate MovingAvg
Kind FlowKind `json:"kind"`
ByteRate float64 `json:"flow_bytes"`
KeyRate float64 `json:"flow_keys"`

// rolling statistics, recording some recently added records.
rollingByteRate MovingAvg
rollingKeyRate MovingAvg

// LastUpdateTime used to calculate average write
LastUpdateTime time.Time `json:"last_update_time"`
Expand All @@ -47,9 +55,16 @@ func (stat *HotPeerStat) ID() uint64 {
}

// Less compares two HotPeerStat.Implementing TopNItem.
func (stat *HotPeerStat) Less(than TopNItem) bool {
func (stat *HotPeerStat) Less(k int, than TopNItem) bool {
rhs := than.(*HotPeerStat)
return stat.BytesRate < rhs.BytesRate
switch k {
case keyDim:
return stat.GetKeyRate() < rhs.GetKeyRate()
case byteDim:
fallthrough
default:
return stat.GetByteRate() < rhs.GetByteRate()
}
}

// IsNeedDelete to delete the item in cache.
Expand All @@ -67,18 +82,28 @@ func (stat *HotPeerStat) IsNew() bool {
return stat.isNew
}

// GetBytesRate returns denoised BytesRate if possible.
func (stat *HotPeerStat) GetBytesRate() float64 {
if stat.RollingBytesRate == nil {
return stat.BytesRate
// GetByteRate returns denoised BytesRate if possible.
func (stat *HotPeerStat) GetByteRate() float64 {
if stat.rollingByteRate == nil {
return stat.ByteRate
}
return stat.rollingByteRate.Get()
}

// GetKeyRate returns denoised KeysRate if possible.
func (stat *HotPeerStat) GetKeyRate() float64 {
if stat.rollingKeyRate == nil {
return stat.KeyRate
}
return stat.RollingBytesRate.Get()
return stat.rollingKeyRate.Get()
}

// Clone clones the HotPeerStat
func (stat *HotPeerStat) Clone() *HotPeerStat {
ret := *stat
ret.BytesRate = stat.GetBytesRate()
ret.RollingBytesRate = nil
ret.ByteRate = stat.GetByteRate()
ret.rollingByteRate = nil
ret.KeyRate = stat.GetKeyRate()
ret.rollingKeyRate = nil
return &ret
}
83 changes: 51 additions & 32 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@ const (

rollingWindowsSize = 5

hotWriteRegionMinBytesRate = 1 * 1024
hotReadRegionMinBytesRate = 8 * 1024

hotRegionReportMinInterval = 3

hotRegionAntiCount = 1
hotRegionAntiCount = 2
nolouch marked this conversation as resolved.
Show resolved Hide resolved
)

var (
minHotThresholds = [2][dimLen]float64{
WriteFlow: {
byteDim: 1 * 1024,
keyDim: 32,
},
ReadFlow: {
byteDim: 8 * 1024,
keyDim: 128,
},
}
)

// hotPeerCache saves the hot peer's statistics.
Expand Down Expand Up @@ -79,7 +89,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
} else {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
peers = NewTopN(topNN, topNTTL)
peers = NewTopN(dimLen, topNN, topNTTL)
f.peersOfStore[item.StoreID] = peers
}
peers.Put(item)
Expand All @@ -94,7 +104,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
}

// CheckRegionFlow checks the flow information of region.
func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresStats) (ret []*HotPeerStat) {
func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *StoresStats) (ret []*HotPeerStat) {
storeIDs := f.getAllStoreIDs(region)

totalBytes := float64(f.getTotalBytes(region))
Expand All @@ -103,8 +113,8 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresSta
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()

bytesPerSec := totalBytes / float64(interval)
keysPerSec := totalKeys / float64(interval)
byteRate := totalBytes / float64(interval)
keyRate := totalKeys / float64(interval)

for storeID := range storeIDs {
isExpired := f.isRegionExpired(region, storeID)
Expand All @@ -119,16 +129,15 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresSta
StoreID: storeID,
RegionID: region.GetID(),
Kind: f.kind,
BytesRate: bytesPerSec,
KeysRate: keysPerSec,
ByteRate: byteRate,
KeyRate: keyRate,
LastUpdateTime: time.Now(),
Version: region.GetMeta().GetRegionEpoch().GetVersion(),
needDelete: isExpired,
isLeader: region.GetLeader().GetStoreId() == storeID,
}

hotThreshold := f.calcHotThreshold(stats, storeID)
newItem = updateHotPeerStat(newItem, oldItem, bytesPerSec, hotThreshold)
newItem = f.updateHotPeerStat(newItem, oldItem, storesStats)
if newItem != nil {
ret = append(ret, newItem)
}
Expand All @@ -150,9 +159,12 @@ func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool
func (f *hotPeerCache) CollectMetrics(stats *StoresStats, typ string) {
for storeID, peers := range f.peersOfStore {
store := storeTag(storeID)
threshold := f.calcHotThreshold(stats, storeID)
thresholds := f.calcHotThresholds(stats, storeID)
hotCacheStatusGauge.WithLabelValues("total_length", store, typ).Set(float64(peers.Len()))
hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(threshold)
hotCacheStatusGauge.WithLabelValues("byte-rate-threshold", store, typ).Set(thresholds[byteDim])
hotCacheStatusGauge.WithLabelValues("key-rate-threshold", store, typ).Set(thresholds[keyDim])
// for compatibility
hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(thresholds[byteDim])
}
}

Expand Down Expand Up @@ -195,21 +207,20 @@ func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64)
return false
}

func (f *hotPeerCache) calcHotThreshold(stats *StoresStats, storeID uint64) float64 {
var minHotThreshold float64
switch f.kind {
case WriteFlow:
minHotThreshold = hotWriteRegionMinBytesRate
case ReadFlow:
minHotThreshold = hotReadRegionMinBytesRate
}

func (f *hotPeerCache) calcHotThresholds(stats *StoresStats, storeID uint64) [dimLen]float64 {
minThresholds := minHotThresholds[f.kind]
tn, ok := f.peersOfStore[storeID]
if !ok || tn.Len() < topNN {
return minHotThreshold
return minThresholds
}
ret := [dimLen]float64{
byteDim: tn.GetTopNMin(byteDim).(*HotPeerStat).ByteRate,
keyDim: tn.GetTopNMin(keyDim).(*HotPeerStat).KeyRate,
}
tnMin := tn.GetTopNMin().(*HotPeerStat).BytesRate
return math.Max(tnMin*hotThresholdRatio, minHotThreshold)
for k := 0; k < dimLen; k++ {
ret[k] = math.Max(ret[k]*hotThresholdRatio, minThresholds[k])
}
return ret
}

// gets the storeIDs, including old region and new region
Expand Down Expand Up @@ -259,32 +270,40 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
return false
}

func updateHotPeerStat(newItem, oldItem *HotPeerStat, bytesRate float64, hotThreshold float64) *HotPeerStat {
isHot := bytesRate >= hotThreshold
func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, storesStats *StoresStats) *HotPeerStat {
thresholds := f.calcHotThresholds(storesStats, newItem.StoreID)
isHot := newItem.ByteRate >= thresholds[byteDim] ||
newItem.KeyRate >= thresholds[keyDim]

if newItem.needDelete {
return newItem
}

if oldItem != nil {
newItem.RollingBytesRate = oldItem.RollingBytesRate
newItem.rollingByteRate = oldItem.rollingByteRate
newItem.rollingKeyRate = oldItem.rollingKeyRate
if isHot {
newItem.HotDegree = oldItem.HotDegree + 1
newItem.AntiCount = hotRegionAntiCount
} else {
newItem.HotDegree = oldItem.HotDegree - 1
newItem.AntiCount = oldItem.AntiCount - 1
if newItem.AntiCount < 0 {
if newItem.AntiCount <= 0 {
newItem.needDelete = true
}
}
} else {
if !isHot {
return nil
}
newItem.RollingBytesRate = NewMedianFilter(rollingWindowsSize)
newItem.rollingByteRate = NewMedianFilter(rollingWindowsSize)
newItem.rollingKeyRate = NewMedianFilter(rollingWindowsSize)
newItem.AntiCount = hotRegionAntiCount
newItem.isNew = true
}
newItem.RollingBytesRate.Add(bytesRate)

newItem.rollingByteRate.Add(newItem.ByteRate)
newItem.rollingKeyRate.Add(newItem.KeyRate)

return newItem
}
Loading