From f8db5007eae436b76e7ac3225e05f104e28ad939 Mon Sep 17 00:00:00 2001 From: Luffbee Date: Fri, 21 Feb 2020 16:58:43 +0800 Subject: [PATCH] statistics: multidimensional hot peer cache (#2140) --- server/api/trend.go | 2 +- server/schedulers/hot_region.go | 10 +- server/schedulers/hot_test.go | 2 +- server/statistics/hot_peer.go | 53 +++++-- server/statistics/hot_peer_cache.go | 83 ++++++---- server/statistics/topn.go | 180 ++++++++++++++-------- server/statistics/topn_test.go | 229 +++++++++++++++++++++------- server/statistics/util.go | 7 - 8 files changed, 392 insertions(+), 174 deletions(-) diff --git a/server/api/trend.go b/server/api/trend.go index 35fb73107e0..c73f136bd09 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -147,7 +147,7 @@ func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, storeID if stat, ok := stats[storeID]; ok { storeFlow = stat.TotalBytesRate for _, flow := range stat.Stats { - regionFlows = append(regionFlows, flow.GetBytesRate()) + regionFlows = append(regionFlows, flow.GetByteRate()) } } return diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 98038a6f9c4..4db90c93ee1 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -274,7 +274,7 @@ func summaryStoresLoad( { hotSum := 0.0 for _, peer := range filterHotPeers(kind, minHotDegree, storeHotPeers[id]) { - hotSum += peer.GetBytesRate() + hotSum += peer.GetByteRate() hotPeers = append(hotPeers, peer.Clone()) } // Use sum of hot peers to estimate leader-only byte rate. @@ -628,12 +628,12 @@ func (bs *balanceSolver) selectDstStoreID(candidateIDs map[uint64]struct{}) uint } switch bs.opTy { case movePeer: - return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID]) + return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID]) case transferLeader: if bs.rwTy == write { - return selectDstStoreByCount(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID]) + return selectDstStoreByCount(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID]) } - return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetBytesRate(), bs.stLoadDetail[bs.srcStoreID]) + return selectDstStoreByByteRate(candidateLoadDetail, bs.srcPeerStat.GetByteRate(), bs.stLoadDetail[bs.srcStoreID]) default: return 0 } @@ -685,7 +685,7 @@ func (bs *balanceSolver) buildOperators() []*operator.Operator { schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"), schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String())) - infl := Influence{ByteRate: bs.srcPeerStat.GetBytesRate()} + infl := Influence{ByteRate: bs.srcPeerStat.GetByteRate()} if bs.opTy == transferLeader && bs.rwTy == write { infl.ByteRate = 0 } diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index b0aeeca1c8c..41f1e407db5 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -422,7 +422,7 @@ func (s *testHotReadRegionSchedulerSuite) TestSchedule(c *C) { c.Assert(len(stats), Equals, 2) for _, ss := range stats { for _, s := range ss { - c.Assert(s.BytesRate, Equals, 512.0*KB) + c.Assert(s.ByteRate, Equals, 512.0*KB) } } diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 59acb8c2056..aa565dbddcd 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -15,6 +15,12 @@ package statistics import "time" +const ( + byteDim int = iota + keyDim + dimLen +) + // HotPeerStat records each hot peer's statistics type HotPeerStat struct { StoreID uint64 `json:"store_id"` @@ -25,11 +31,13 @@ type HotPeerStat struct { // AntiCount used to eliminate some noise when remove region in cache AntiCount int - Kind FlowKind `json:"kind"` - BytesRate float64 `json:"flow_bytes"` - KeysRate float64 `json:"flow_keys"` - // RollingBytesRate is a rolling statistics, recording some recently added records. - RollingBytesRate MovingAvg + Kind FlowKind `json:"kind"` + ByteRate float64 `json:"flow_bytes"` + KeyRate float64 `json:"flow_keys"` + + // rolling statistics, recording some recently added records. + rollingByteRate MovingAvg + rollingKeyRate MovingAvg // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` @@ -47,9 +55,16 @@ func (stat *HotPeerStat) ID() uint64 { } // Less compares two HotPeerStat.Implementing TopNItem. -func (stat *HotPeerStat) Less(than TopNItem) bool { +func (stat *HotPeerStat) Less(k int, than TopNItem) bool { rhs := than.(*HotPeerStat) - return stat.BytesRate < rhs.BytesRate + switch k { + case keyDim: + return stat.GetKeyRate() < rhs.GetKeyRate() + case byteDim: + fallthrough + default: + return stat.GetByteRate() < rhs.GetByteRate() + } } // IsNeedDelete to delete the item in cache. @@ -67,18 +82,28 @@ func (stat *HotPeerStat) IsNew() bool { return stat.isNew } -// GetBytesRate returns denoised BytesRate if possible. -func (stat *HotPeerStat) GetBytesRate() float64 { - if stat.RollingBytesRate == nil { - return stat.BytesRate +// GetByteRate returns denoised BytesRate if possible. +func (stat *HotPeerStat) GetByteRate() float64 { + if stat.rollingByteRate == nil { + return stat.ByteRate + } + return stat.rollingByteRate.Get() +} + +// GetKeyRate returns denoised KeysRate if possible. +func (stat *HotPeerStat) GetKeyRate() float64 { + if stat.rollingKeyRate == nil { + return stat.KeyRate } - return stat.RollingBytesRate.Get() + return stat.rollingKeyRate.Get() } // Clone clones the HotPeerStat func (stat *HotPeerStat) Clone() *HotPeerStat { ret := *stat - ret.BytesRate = stat.GetBytesRate() - ret.RollingBytesRate = nil + ret.ByteRate = stat.GetByteRate() + ret.rollingByteRate = nil + ret.KeyRate = stat.GetKeyRate() + ret.rollingKeyRate = nil return &ret } diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 0f18a6eb934..4bdb812131b 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -28,12 +28,22 @@ const ( rollingWindowsSize = 5 - hotWriteRegionMinBytesRate = 1 * 1024 - hotReadRegionMinBytesRate = 8 * 1024 - hotRegionReportMinInterval = 3 - hotRegionAntiCount = 1 + hotRegionAntiCount = 2 +) + +var ( + minHotThresholds = [2][dimLen]float64{ + WriteFlow: { + byteDim: 1 * 1024, + keyDim: 32, + }, + ReadFlow: { + byteDim: 8 * 1024, + keyDim: 128, + }, + } ) // hotPeerCache saves the hot peer's statistics. @@ -79,7 +89,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) { } else { peers, ok := f.peersOfStore[item.StoreID] if !ok { - peers = NewTopN(topNN, topNTTL) + peers = NewTopN(dimLen, topNN, topNTTL) f.peersOfStore[item.StoreID] = peers } peers.Put(item) @@ -94,7 +104,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) { } // CheckRegionFlow checks the flow information of region. -func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresStats) (ret []*HotPeerStat) { +func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *StoresStats) (ret []*HotPeerStat) { storeIDs := f.getAllStoreIDs(region) totalBytes := float64(f.getTotalBytes(region)) @@ -103,8 +113,8 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresSta reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - bytesPerSec := totalBytes / float64(interval) - keysPerSec := totalKeys / float64(interval) + byteRate := totalBytes / float64(interval) + keyRate := totalKeys / float64(interval) for storeID := range storeIDs { isExpired := f.isRegionExpired(region, storeID) @@ -119,16 +129,15 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, stats *StoresSta StoreID: storeID, RegionID: region.GetID(), Kind: f.kind, - BytesRate: bytesPerSec, - KeysRate: keysPerSec, + ByteRate: byteRate, + KeyRate: keyRate, LastUpdateTime: time.Now(), Version: region.GetMeta().GetRegionEpoch().GetVersion(), needDelete: isExpired, isLeader: region.GetLeader().GetStoreId() == storeID, } - hotThreshold := f.calcHotThreshold(stats, storeID) - newItem = updateHotPeerStat(newItem, oldItem, bytesPerSec, hotThreshold) + newItem = f.updateHotPeerStat(newItem, oldItem, storesStats) if newItem != nil { ret = append(ret, newItem) } @@ -150,9 +159,12 @@ func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool func (f *hotPeerCache) CollectMetrics(stats *StoresStats, typ string) { for storeID, peers := range f.peersOfStore { store := storeTag(storeID) - threshold := f.calcHotThreshold(stats, storeID) + thresholds := f.calcHotThresholds(stats, storeID) hotCacheStatusGauge.WithLabelValues("total_length", store, typ).Set(float64(peers.Len())) - hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(threshold) + hotCacheStatusGauge.WithLabelValues("byte-rate-threshold", store, typ).Set(thresholds[byteDim]) + hotCacheStatusGauge.WithLabelValues("key-rate-threshold", store, typ).Set(thresholds[keyDim]) + // for compatibility + hotCacheStatusGauge.WithLabelValues("hotThreshold", store, typ).Set(thresholds[byteDim]) } } @@ -195,21 +207,20 @@ func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64) return false } -func (f *hotPeerCache) calcHotThreshold(stats *StoresStats, storeID uint64) float64 { - var minHotThreshold float64 - switch f.kind { - case WriteFlow: - minHotThreshold = hotWriteRegionMinBytesRate - case ReadFlow: - minHotThreshold = hotReadRegionMinBytesRate - } - +func (f *hotPeerCache) calcHotThresholds(stats *StoresStats, storeID uint64) [dimLen]float64 { + minThresholds := minHotThresholds[f.kind] tn, ok := f.peersOfStore[storeID] if !ok || tn.Len() < topNN { - return minHotThreshold + return minThresholds + } + ret := [dimLen]float64{ + byteDim: tn.GetTopNMin(byteDim).(*HotPeerStat).ByteRate, + keyDim: tn.GetTopNMin(keyDim).(*HotPeerStat).KeyRate, } - tnMin := tn.GetTopNMin().(*HotPeerStat).BytesRate - return math.Max(tnMin*hotThresholdRatio, minHotThreshold) + for k := 0; k < dimLen; k++ { + ret[k] = math.Max(ret[k]*hotThresholdRatio, minThresholds[k]) + } + return ret } // gets the storeIDs, including old region and new region @@ -259,20 +270,25 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb return false } -func updateHotPeerStat(newItem, oldItem *HotPeerStat, bytesRate float64, hotThreshold float64) *HotPeerStat { - isHot := bytesRate >= hotThreshold +func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, storesStats *StoresStats) *HotPeerStat { + thresholds := f.calcHotThresholds(storesStats, newItem.StoreID) + isHot := newItem.ByteRate >= thresholds[byteDim] || + newItem.KeyRate >= thresholds[keyDim] + if newItem.needDelete { return newItem } + if oldItem != nil { - newItem.RollingBytesRate = oldItem.RollingBytesRate + newItem.rollingByteRate = oldItem.rollingByteRate + newItem.rollingKeyRate = oldItem.rollingKeyRate if isHot { newItem.HotDegree = oldItem.HotDegree + 1 newItem.AntiCount = hotRegionAntiCount } else { newItem.HotDegree = oldItem.HotDegree - 1 newItem.AntiCount = oldItem.AntiCount - 1 - if newItem.AntiCount < 0 { + if newItem.AntiCount <= 0 { newItem.needDelete = true } } @@ -280,11 +296,14 @@ func updateHotPeerStat(newItem, oldItem *HotPeerStat, bytesRate float64, hotThre if !isHot { return nil } - newItem.RollingBytesRate = NewMedianFilter(rollingWindowsSize) + newItem.rollingByteRate = NewMedianFilter(rollingWindowsSize) + newItem.rollingKeyRate = NewMedianFilter(rollingWindowsSize) newItem.AntiCount = hotRegionAntiCount newItem.isNew = true } - newItem.RollingBytesRate.Add(bytesRate) + + newItem.rollingByteRate.Add(newItem.ByteRate) + newItem.rollingKeyRate.Add(newItem.KeyRate) return newItem } diff --git a/server/statistics/topn.go b/server/statistics/topn.go index 46f911e17a4..98d5c439ba4 100644 --- a/server/statistics/topn.go +++ b/server/statistics/topn.go @@ -16,6 +16,7 @@ package statistics import ( "container/heap" "container/list" + "fmt" "sync" "time" ) @@ -24,27 +25,31 @@ import ( type TopNItem interface { // ID is used to check identity. ID() uint64 - // Less tests whether the current item is less than the given argument. - Less(then TopNItem) bool + // Less tests whether the current item is less than the given argument in the `k`th dimension. + Less(k int, than TopNItem) bool } -// TopN maintains the N largest items. +// TopN maintains the N largest items of multiple dimensions. type TopN struct { rw sync.RWMutex - n int - topn *indexedHeap - rest *indexedHeap + topns []*singleTopN ttlLst *ttlList } -// NewTopN returns a TopN with given TTL. -func NewTopN(n int, ttl time.Duration) *TopN { - return &TopN{ - n: maxInt(n, 1), - topn: newTopNHeap(n), - rest: newRevTopNHeap(n), +// NewTopN returns a k-dimensional TopN with given TTL. +// NOTE: panic if k <= 0 or n <= 0. +func NewTopN(k, n int, ttl time.Duration) *TopN { + if k <= 0 || n <= 0 { + panic(fmt.Sprintf("invalid arguments for NewTopN: k = %d, n = %d", k, n)) + } + ret := &TopN{ + topns: make([]*singleTopN, k), ttlLst: newTTLList(ttl), } + for i := 0; i < k; i++ { + ret.topns[i] = newSingleTopN(i, n) + } + return ret } // Len returns number of all items. @@ -54,61 +59,46 @@ func (tn *TopN) Len() int { return tn.ttlLst.Len() } -// GetTopNMin returns the min item in top N. -func (tn *TopN) GetTopNMin() TopNItem { +// GetTopNMin returns the min item in top N of the `k`th dimension. +func (tn *TopN) GetTopNMin(k int) TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topn.Top() + return tn.topns[k].GetTopNMin() } -// GetAllTopN returns the top N items. -func (tn *TopN) GetAllTopN() []TopNItem { +// GetAllTopN returns the top N items of the `k`th dimension. +func (tn *TopN) GetAllTopN(k int) []TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topn.GetAll() + return tn.topns[k].GetAllTopN() } // GetAll returns all items. func (tn *TopN) GetAll() []TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - topn := tn.topn.GetAll() - return append(topn, tn.rest.GetAll()...) + return tn.topns[0].GetAll() } // Get returns the item with given id, nil if there is no such item. func (tn *TopN) Get(id uint64) TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - if item := tn.topn.Get(id); item != nil { - return item - } - return tn.rest.Get(id) + return tn.topns[0].Get(id) } // Put inserts item or updates the old item if it exists. func (tn *TopN) Put(item TopNItem) (isUpdate bool) { tn.rw.Lock() defer tn.rw.Unlock() - if tn.topn.Get(item.ID()) != nil { - isUpdate = true - tn.topn.Put(item) - } else { - isUpdate = tn.rest.Put(item) + for _, stn := range tn.topns { + isUpdate = stn.Put(item) } tn.ttlLst.Put(item.ID()) tn.maintain() return } -func (tn *TopN) removeItemLocked(id uint64) TopNItem { - item := tn.topn.Remove(id) - if item == nil { - item = tn.rest.Remove(id) - } - return item -} - // RemoveExpired deletes all expired items. func (tn *TopN) RemoveExpired() { tn.rw.Lock() @@ -117,59 +107,129 @@ func (tn *TopN) RemoveExpired() { } // Remove deletes the item by given ID and returns it. -func (tn *TopN) Remove(id uint64) TopNItem { +func (tn *TopN) Remove(id uint64) (item TopNItem) { tn.rw.Lock() defer tn.rw.Unlock() - item := tn.removeItemLocked(id) + for _, stn := range tn.topns { + item = stn.Remove(id) + } _ = tn.ttlLst.Remove(id) tn.maintain() - return item + return } -func (tn *TopN) promote() { - heap.Push(tn.topn, heap.Pop(tn.rest)) +func (tn *TopN) maintain() { + for _, id := range tn.ttlLst.TakeExpired() { + for _, stn := range tn.topns { + stn.Remove(id) + } + } } -func (tn *TopN) demote() { - heap.Push(tn.rest, heap.Pop(tn.topn)) +type singleTopN struct { + k int + n int + topn *indexedHeap + rest *indexedHeap } -func (tn *TopN) maintain() { - for _, id := range tn.ttlLst.takeExpired() { - _ = tn.removeItemLocked(id) +func newSingleTopN(k, n int) *singleTopN { + return &singleTopN{ + k: k, + n: n, + topn: newTopNHeap(k, n), + rest: newRevTopNHeap(k, n), } - for tn.topn.Len() < tn.n && tn.rest.Len() > 0 { - tn.promote() +} + +func (stn *singleTopN) Len() int { + return stn.topn.Len() + stn.rest.Len() +} + +func (stn *singleTopN) GetTopNMin() TopNItem { + return stn.topn.Top() +} + +func (stn *singleTopN) GetAllTopN() []TopNItem { + return stn.topn.GetAll() +} + +func (stn *singleTopN) GetAll() []TopNItem { + topn := stn.topn.GetAll() + return append(topn, stn.rest.GetAll()...) +} + +func (stn *singleTopN) Get(id uint64) TopNItem { + if item := stn.topn.Get(id); item != nil { + return item + } + return stn.rest.Get(id) +} + +func (stn *singleTopN) Put(item TopNItem) (isUpdate bool) { + if stn.topn.Get(item.ID()) != nil { + isUpdate = true + stn.topn.Put(item) + } else { + isUpdate = stn.rest.Put(item) + } + stn.maintain() + return +} + +func (stn *singleTopN) Remove(id uint64) TopNItem { + item := stn.topn.Remove(id) + if item == nil { + item = stn.rest.Remove(id) + } + stn.maintain() + return item +} + +func (stn *singleTopN) promote() { + heap.Push(stn.topn, heap.Pop(stn.rest)) +} + +func (stn *singleTopN) demote() { + heap.Push(stn.rest, heap.Pop(stn.topn)) +} + +func (stn *singleTopN) maintain() { + for stn.topn.Len() < stn.n && stn.rest.Len() > 0 { + stn.promote() } - rest1 := tn.rest.Top() + rest1 := stn.rest.Top() if rest1 == nil { return } - for top1 := tn.topn.Top(); top1.Less(rest1); { - tn.demote() - tn.promote() - rest1 = tn.rest.Top() - top1 = tn.topn.Top() + for topn1 := stn.topn.Top(); topn1.Less(stn.k, rest1); { + stn.demote() + stn.promote() + rest1 = stn.rest.Top() + topn1 = stn.topn.Top() } } // indexedHeap is a heap with index. type indexedHeap struct { + k int rev bool items []TopNItem index map[uint64]int } -func newTopNHeap(hint int) *indexedHeap { +func newTopNHeap(k, hint int) *indexedHeap { return &indexedHeap{ + k: k, rev: false, items: make([]TopNItem, 0, hint), index: map[uint64]int{}, } } -func newRevTopNHeap(hint int) *indexedHeap { +func newRevTopNHeap(k, hint int) *indexedHeap { return &indexedHeap{ + k: k, rev: true, items: make([]TopNItem, 0, hint), index: map[uint64]int{}, @@ -184,9 +244,9 @@ func (hp *indexedHeap) Len() int { // Implementing heap.Interface. func (hp *indexedHeap) Less(i, j int) bool { if !hp.rev { - return hp.items[i].Less(hp.items[j]) + return hp.items[i].Less(hp.k, hp.items[j]) } - return hp.items[j].Less(hp.items[i]) + return hp.items[j].Less(hp.k, hp.items[i]) } // Implementing heap.Interface. @@ -282,7 +342,7 @@ func (tl *ttlList) Len() int { return tl.lst.Len() } -func (tl *ttlList) takeExpired() []uint64 { +func (tl *ttlList) TakeExpired() []uint64 { expired := []uint64{} now := time.Now() for ele := tl.lst.Front(); ele != nil; ele = tl.lst.Front() { diff --git a/server/statistics/topn_test.go b/server/statistics/topn_test.go index 0668d82d22a..a5d865f6cbc 100644 --- a/server/statistics/topn_test.go +++ b/server/statistics/topn_test.go @@ -15,6 +15,7 @@ package statistics import ( "math/rand" + "sort" "time" . "github.com/pingcap/check" @@ -25,118 +26,238 @@ var _ = Suite(&testTopNSuite{}) type testTopNSuite struct{} type item struct { - id uint64 - value float64 + id uint64 + values []float64 } func (it *item) ID() uint64 { return it.id } -func (it *item) Less(than TopNItem) bool { - return it.value < than.(*item).value +func (it *item) Less(k int, than TopNItem) bool { + return it.values[k] < than.(*item).values[k] } func (s *testTopNSuite) TestPut(c *C) { const Total = 10000 + const K = 3 const N = 50 - tn := NewTopN(N, 1*time.Hour) - for _, x := range rand.Perm(Total) { - c.Assert(tn.Put(&item{id: uint64(x), value: float64(-x) + 1}), IsFalse) - } - for _, x := range rand.Perm(Total) { - c.Assert(tn.Put(&item{id: uint64(x), value: float64(-x)}), IsTrue) - } - c.Assert(tn.GetTopNMin().(*item), DeepEquals, &item{id: N - 1, value: 1 - N}) - topns := make([]float64, N) - for _, it := range tn.GetAllTopN() { - it := it.(*item) - topns[it.id] = it.value + tn := NewTopN(K, N, 1*time.Hour) + + putPerm(c, tn, K, Total, func(x int) float64 { + return float64(-x) + 1 + }, false /*insert*/) + + putPerm(c, tn, K, Total, func(x int) float64 { + return float64(-x) + }, true /*update*/) + + // check GetTopNMin + for k := 0; k < K; k++ { + c.Assert(tn.GetTopNMin(k).(*item).values[k], Equals, float64(1-N)) } - for i, v := range topns { - c.Assert(v, Equals, float64(-i)) + + { + topns := make([]float64, N) + // check GetAllTopN + for _, it := range tn.GetAllTopN(0) { + it := it.(*item) + topns[it.id] = it.values[0] + } + // check update worked + for i, v := range topns { + c.Assert(v, Equals, float64(-i)) + } } - all := make([]float64, Total) - for _, it := range tn.GetAll() { - it := it.(*item) - all[it.id] = it.value + + { + all := make([]float64, Total) + // check GetAll + for _, it := range tn.GetAll() { + it := it.(*item) + all[it.id] = it.values[0] + } + // check update worked + for i, v := range all { + c.Assert(v, Equals, float64(-i)) + } } - for i, v := range all { - c.Assert(v, Equals, float64(-i)) + + { // check all dimensions + for k := 1; k < K; k++ { + topn := make([]float64, 0, N) + for _, it := range tn.GetAllTopN(k) { + topn = append(topn, it.(*item).values[k]) + } + sort.Sort(sort.Reverse(sort.Float64Slice(topn))) + + all := make([]float64, 0, Total) + for _, it := range tn.GetAll() { + all = append(all, it.(*item).values[k]) + } + sort.Sort(sort.Reverse(sort.Float64Slice(all))) + + c.Assert(topn, DeepEquals, all[:N]) + } } + + // check Get for i := uint64(0); i < Total; i++ { it := tn.Get(i).(*item) c.Assert(it.id, Equals, i) - c.Assert(it.value, Equals, -float64(i)) + c.Assert(it.values[0], Equals, -float64(i)) + } +} + +func putPerm(c *C, tn *TopN, dimNum, total int, f func(x int) float64, isUpdate bool) { + { // insert + dims := make([][]int, dimNum) + for k := 0; k < dimNum; k++ { + dims[k] = rand.Perm(total) + } + for i := 0; i < total; i++ { + item := &item{ + id: uint64(dims[0][i]), + values: make([]float64, dimNum), + } + for k := 0; k < dimNum; k++ { + item.values[k] = f(dims[k][i]) + } + c.Assert(tn.Put(item), Equals, isUpdate) + } } } func (s *testTopNSuite) TestRemove(c *C) { const Total = 10000 + const K = 3 const N = 50 - tn := NewTopN(N, 1*time.Hour) - for _, x := range rand.Perm(Total) { - c.Assert(tn.Put(&item{id: uint64(x), value: float64(-x)}), IsFalse) - } + tn := NewTopN(K, N, 1*time.Hour) + + putPerm(c, tn, K, Total, func(x int) float64 { + return float64(-x) + }, false /*insert*/) + + // check Remove for i := 0; i < Total; i++ { if i%3 != 0 { it := tn.Remove(uint64(i)).(*item) c.Assert(it.id, Equals, uint64(i)) } } + + // check Remove worked for i := 0; i < Total; i++ { if i%3 != 0 { c.Assert(tn.Remove(uint64(i)), IsNil) } } - c.Assert(tn.GetTopNMin().(*item), DeepEquals, &item{id: 3 * (N - 1), value: 3 * (1 - N)}) - topns := make([]float64, N) - for _, it := range tn.GetAllTopN() { - it := it.(*item) - topns[it.id/3] = it.value - c.Assert(it.id%3, Equals, uint64(0)) - } - for i, v := range topns { - c.Assert(v, Equals, float64(-i*3)) + + c.Assert(tn.GetTopNMin(0).(*item).id, Equals, uint64(3*(N-1))) + + { + topns := make([]float64, N) + for _, it := range tn.GetAllTopN(0) { + it := it.(*item) + topns[it.id/3] = it.values[0] + c.Assert(it.id%3, Equals, uint64(0)) + } + for i, v := range topns { + c.Assert(v, Equals, float64(-i*3)) + } } - all := make([]float64, Total/3+1) - for _, it := range tn.GetAll() { - it := it.(*item) - all[it.id/3] = it.value - c.Assert(it.id%3, Equals, uint64(0)) + + { + all := make([]float64, Total/3+1) + for _, it := range tn.GetAll() { + it := it.(*item) + all[it.id/3] = it.values[0] + c.Assert(it.id%3, Equals, uint64(0)) + } + for i, v := range all { + c.Assert(v, Equals, float64(-i*3)) + } } - for i, v := range all { - c.Assert(v, Equals, float64(-i*3)) + + { // check all dimensions + for k := 1; k < K; k++ { + topn := make([]float64, 0, N) + for _, it := range tn.GetAllTopN(k) { + topn = append(topn, it.(*item).values[k]) + } + sort.Sort(sort.Reverse(sort.Float64Slice(topn))) + + all := make([]float64, 0, Total/3+1) + for _, it := range tn.GetAll() { + all = append(all, it.(*item).values[k]) + } + sort.Sort(sort.Reverse(sort.Float64Slice(all))) + + c.Assert(topn, DeepEquals, all[:N]) + } } + for i := uint64(0); i < Total; i += 3 { it := tn.Get(i).(*item) c.Assert(it.id, Equals, i) - c.Assert(it.value, Equals, -float64(i)) + c.Assert(it.values[0], Equals, -float64(i)) } } func (s *testTopNSuite) TestTTL(c *C) { const Total = 1000 + const K = 3 const N = 50 - tn := NewTopN(50, 900*time.Millisecond) - for _, x := range rand.Perm(Total) { - c.Assert(tn.Put(&item{id: uint64(x), value: float64(-x)}), IsFalse) - } + tn := NewTopN(K, 50, 900*time.Millisecond) + + putPerm(c, tn, K, Total, func(x int) float64 { + return float64(-x) + }, false /*insert*/) + time.Sleep(900 * time.Millisecond) - c.Assert(tn.Put(&item{id: 0, value: 100}), IsTrue) + { + item := &item{id: 0, values: []float64{100}} + for k := 1; k < K; k++ { + item.values = append(item.values, rand.NormFloat64()) + } + c.Assert(tn.Put(item), IsTrue) + } for i := 3; i < Total; i += 3 { - c.Assert(tn.Put(&item{id: uint64(i), value: float64(-i) + 100}), IsFalse) + item := &item{id: uint64(i), values: []float64{float64(-i) + 100}} + for k := 1; k < K; k++ { + item.values = append(item.values, rand.NormFloat64()) + } + c.Assert(tn.Put(item), IsFalse) } tn.RemoveExpired() + c.Assert(tn.Len(), Equals, Total/3+1) - items := tn.GetAllTopN() + items := tn.GetAllTopN(0) v := make([]float64, N) for _, it := range items { it := it.(*item) c.Assert(it.id%3, Equals, uint64(0)) - v[it.id/3] = it.value + v[it.id/3] = it.values[0] } for i, x := range v { c.Assert(x, Equals, float64(-i*3)+100) } + + { // check all dimensions + for k := 1; k < K; k++ { + topn := make([]float64, 0, N) + for _, it := range tn.GetAllTopN(k) { + topn = append(topn, it.(*item).values[k]) + } + sort.Sort(sort.Reverse(sort.Float64Slice(topn))) + + all := make([]float64, 0, Total/3+1) + for _, it := range tn.GetAll() { + all = append(all, it.(*item).values[k]) + } + sort.Sort(sort.Reverse(sort.Float64Slice(all))) + + c.Assert(topn, DeepEquals, all[:N]) + } + } } diff --git a/server/statistics/util.go b/server/statistics/util.go index 1855057092d..c8fda0fc802 100644 --- a/server/statistics/util.go +++ b/server/statistics/util.go @@ -82,10 +82,3 @@ func (r *MedianFilter) Set(n float64) { r.records[0] = n r.count = 1 } - -func maxInt(x int, y int) int { - if x > y { - return x - } - return y -}