Skip to content

Commit

Permalink
Fix hot peer cache threshold unstable when interval unstable (#3417)
Browse files Browse the repository at this point in the history
* fix hot peer cache

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* add test

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot authored Mar 2, 2021
1 parent 0d801cd commit 0e15869
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 9 deletions.
11 changes: 8 additions & 3 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ func (d *dimStat) Add(delta float64, interval time.Duration) {
d.Rolling.Add(delta, interval)
}

func (d *dimStat) isLastAverageHot(thresholds [dimLen]float64) bool {
return d.LastAverage.Get() >= thresholds[d.typ]
}

func (d *dimStat) isHot(thresholds [dimLen]float64) bool {
return d.LastAverage.IsFull() && d.LastAverage.Get() >= thresholds[d.typ]
return d.Rolling.Get() >= thresholds[d.typ]
}

func (d *dimStat) isFull() bool {
Expand Down Expand Up @@ -156,8 +160,9 @@ func (stat *HotPeerStat) Clone() *HotPeerStat {
return &ret
}

func (stat *HotPeerStat) isHot() bool {
return stat.rollingByteRate.isHot(stat.thresholds) || stat.rollingKeyRate.isHot(stat.thresholds)
func (stat *HotPeerStat) isFullAndHot() bool {
return (stat.rollingByteRate.isFull() && stat.rollingByteRate.isLastAverageHot(stat.thresholds)) ||
(stat.rollingKeyRate.isFull() && stat.rollingKeyRate.isLastAverageHot(stat.thresholds))
}

func (stat *HotPeerStat) clearLastAverage() {
Expand Down
12 changes: 6 additions & 6 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,11 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k
if interval == 0 {
return nil
}
isHot := bytes/interval.Seconds() >= newItem.thresholds[byteDim] || keys/interval.Seconds() >= newItem.thresholds[keyDim]
if !isHot {
return nil
}
if interval.Seconds() >= RegionHeartBeatReportInterval {
isHot := bytes/interval.Seconds() >= newItem.thresholds[byteDim] || keys/interval.Seconds() >= newItem.thresholds[keyDim]
if !isHot {
return nil
}
newItem.HotDegree = 1
newItem.AntiCount = hotRegionAntiCount
}
Expand Down Expand Up @@ -423,14 +423,14 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k
newItem.AntiCount = oldItem.AntiCount
} else {
if f.isOldColdPeer(oldItem, newItem.StoreID) {
if newItem.isHot() {
if newItem.isFullAndHot() {
newItem.HotDegree = 1
newItem.AntiCount = hotRegionAntiCount
} else {
newItem.needDelete = true
}
} else {
if newItem.isHot() {
if newItem.isFullAndHot() {
newItem.HotDegree = oldItem.HotDegree + 1
newItem.AntiCount = hotRegionAntiCount
} else {
Expand Down
42 changes: 42 additions & 0 deletions server/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,45 @@ func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) {
c.Check(newItem.AntiCount, Equals, 0)
c.Check(newItem.needDelete, Equals, true)
}

func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) {
byteRate := minHotThresholds[ReadFlow][byteDim] * 2
expectThreshold := byteRate * HotThresholdRatio
t.testMetrics(c, 120., byteRate, expectThreshold)
t.testMetrics(c, 60., byteRate, expectThreshold)
t.testMetrics(c, 30., byteRate, expectThreshold)
t.testMetrics(c, 17., byteRate, expectThreshold)
t.testMetrics(c, 1., byteRate, expectThreshold)
}
func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold float64) {
cache := NewHotStoresStats(ReadFlow)
minThresholds := minHotThresholds[cache.kind]
storeID := uint64(1)
c.Assert(byteRate, GreaterEqual, minThresholds[byteDim])
for i := uint64(1); i < TopNN+10; i++ {
var oldItem *HotPeerStat
for {
thresholds := cache.calcHotThresholds(storeID)
newItem := &HotPeerStat{
StoreID: storeID,
RegionID: i,
needDelete: false,
thresholds: thresholds,
ByteRate: byteRate,
KeyRate: 0,
}
oldItem = cache.getOldHotPeerStat(i, storeID)
if oldItem != nil && oldItem.rollingByteRate.isHot(thresholds) == true {
break
}
item := cache.updateHotPeerStat(newItem, oldItem, byteRate*interval, 0, time.Duration(interval)*time.Second)
cache.Update(item)
}
thresholds := cache.calcHotThresholds(storeID)
if i < TopNN {
c.Assert(thresholds[byteDim], Equals, minThresholds[byteDim])
} else {
c.Assert(thresholds[byteDim], Equals, expectThreshold)
}
}
}

0 comments on commit 0e15869

Please sign in to comment.