Skip to content

Commit

Permalink
statistics: merge HotCache and StoreStats (#3278)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>

Co-authored-by: Ti Prow Robot <71242396+ti-community-prow-bot@users.noreply.github.com>
  • Loading branch information
disksing and ti-chi-bot authored Dec 17, 2020
1 parent 361388c commit 6a10e75
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 35 deletions.
8 changes: 3 additions & 5 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ type Cluster struct {
*core.BasicCluster
*mockid.IDAllocator
*placement.RuleManager
*statistics.HotCache
*statistics.StoresStats
*statistics.HotStat
*config.PersistOptions
ID uint64
suspectRegions map[uint64]struct{}
Expand All @@ -56,8 +55,7 @@ func NewCluster(opts *config.PersistOptions) *Cluster {
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotCache: statistics.NewHotCache(),
StoresStats: statistics.NewStoresStats(),
HotStat: statistics.NewHotStat(),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
disabledFeatures: make(map[versioninfo.Feature]struct{}),
Expand Down Expand Up @@ -92,7 +90,7 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {

// GetStoresStats gets stores statistics.
func (mc *Cluster) GetStoresStats() *statistics.StoresStats {
return mc.StoresStats
return mc.HotStat.StoresStats
}

// GetStoreRegionCount gets region count with a given store.
Expand Down
54 changes: 26 additions & 28 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ type RaftCluster struct {

labelLevelStats *statistics.LabelStatistics
regionStats *statistics.RegionStatistics
storesStats *statistics.StoresStats
hotSpotCache *statistics.HotCache
hotStat *statistics.HotStat

coordinator *coordinator
suspectRegions *cache.TTLUint64 // suspectRegions are regions that may need fix
Expand Down Expand Up @@ -201,10 +200,9 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s
c.storage = storage
c.id = id
c.labelLevelStats = statistics.NewLabelStatistics()
c.storesStats = statistics.NewStoresStats()
c.hotStat = statistics.NewHotStat()
c.prepareChecker = newPrepareChecker()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.hotSpotCache = statistics.NewHotCache()
c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute)
c.suspectKeyRanges = cache.NewStringTTL(c.ctx, time.Minute, 3*time.Minute)
c.traceRegionFlow = opt.GetPDServerConfig().TraceRegionFlow
Expand Down Expand Up @@ -297,7 +295,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
zap.Duration("cost", time.Since(start)),
)
for _, store := range c.GetStores() {
c.storesStats.GetOrCreateRollingStoreStats(store.GetID())
c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
}
return c, nil
}
Expand Down Expand Up @@ -520,13 +518,13 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
}
}
if store := c.core.GetStore(newStore.GetID()); store != nil {
c.storesStats.UpdateStoreHeartbeatMetrics(store)
c.hotStat.UpdateStoreHeartbeatMetrics(store)
}
c.core.PutStore(newStore)
c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats())
c.storesStats.UpdateTotalBytesRate(c.core.GetStores)
c.storesStats.UpdateTotalKeysRate(c.core.GetStores)
c.storesStats.FilterUnhealthyStore(c)
c.hotStat.Observe(newStore.GetID(), newStore.GetStoreStats())
c.hotStat.UpdateTotalBytesRate(c.core.GetStores)
c.hotStat.UpdateTotalKeysRate(c.core.GetStores)
c.hotStat.FilterUnhealthyStore(c)

// c.limiter is nil before "start" is called
if c.limiter != nil && c.opt.GetStoreLimitMode() == "auto" {
Expand Down Expand Up @@ -680,10 +678,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

for _, writeItem := range writeItems {
c.hotSpotCache.Update(writeItem)
c.hotStat.Update(writeItem)
}
for _, readItem := range readItems {
c.hotSpotCache.Update(readItem)
c.hotStat.Update(readItem)
}
c.Unlock()

Expand Down Expand Up @@ -801,7 +799,7 @@ func (c *RaftCluster) RandLearnerRegion(storeID uint64, ranges []core.KeyRange,
func (c *RaftCluster) RandHotRegionFromStore(store uint64, kind statistics.FlowKind) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
r := c.hotSpotCache.RandHotRegionFromStore(store, kind, c.opt.GetHotRegionCacheHitsThreshold())
r := c.hotStat.RandHotRegionFromStore(store, kind, c.opt.GetHotRegionCacheHitsThreshold())
if r == nil {
return nil
}
Expand Down Expand Up @@ -850,7 +848,7 @@ func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.Region
func (c *RaftCluster) GetStoresStats() *statistics.StoresStats {
c.RLock()
defer c.RUnlock()
return c.storesStats
return c.hotStat.StoresStats
}

// DropCacheRegion removes a region from the cache.
Expand Down Expand Up @@ -888,7 +886,7 @@ func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo {
func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool {
c.RLock()
defer c.RUnlock()
return c.hotSpotCache.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
}

// GetAdjacentRegions returns regions' information that are adjacent with the specific region ID.
Expand Down Expand Up @@ -1149,7 +1147,7 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
}
}
c.core.PutStore(store)
c.storesStats.GetOrCreateRollingStoreStats(store.GetID())
c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
return nil
}

Expand Down Expand Up @@ -1226,15 +1224,15 @@ func (c *RaftCluster) deleteStoreLocked(store *core.StoreInfo) error {
}
}
c.core.DeleteStore(store)
c.storesStats.RemoveRollingStoreStats(store.GetID())
c.hotStat.RemoveRollingStoreStats(store.GetID())
return nil
}

func (c *RaftCluster) collectMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.opt)
stores := c.GetStores()
for _, s := range stores {
statsMap.Observe(s, c.storesStats)
statsMap.Observe(s, c.hotStat.StoresStats)
}
statsMap.Collect()

Expand Down Expand Up @@ -1264,7 +1262,7 @@ func (c *RaftCluster) collectClusterMetrics() {
c.regionStats.Collect()
c.labelLevelStats.Collect()
// collect hot cache metrics
c.hotSpotCache.CollectMetrics()
c.hotStat.CollectMetrics()
}

func (c *RaftCluster) resetClusterMetrics() {
Expand All @@ -1276,7 +1274,7 @@ func (c *RaftCluster) resetClusterMetrics() {
c.regionStats.Reset()
c.labelLevelStats.Reset()
// reset hot cache metrics
c.hotSpotCache.ResetMetrics()
c.hotStat.ResetMetrics()
}

func (c *RaftCluster) collectHealthStatus() {
Expand Down Expand Up @@ -1431,50 +1429,50 @@ func (c *RaftCluster) isPrepared() bool {
func (c *RaftCluster) GetStoresBytesWriteStat() map[uint64]float64 {
c.RLock()
defer c.RUnlock()
return c.storesStats.GetStoresBytesWriteStat()
return c.hotStat.GetStoresBytesWriteStat()
}

// GetStoresBytesReadStat returns the bytes read stat of all StoreInfo.
func (c *RaftCluster) GetStoresBytesReadStat() map[uint64]float64 {
c.RLock()
defer c.RUnlock()
return c.storesStats.GetStoresBytesReadStat()
return c.hotStat.GetStoresBytesReadStat()
}

// GetStoresKeysWriteStat returns the bytes write stat of all StoreInfo.
func (c *RaftCluster) GetStoresKeysWriteStat() map[uint64]float64 {
c.RLock()
defer c.RUnlock()
return c.storesStats.GetStoresKeysWriteStat()
return c.hotStat.GetStoresKeysWriteStat()
}

// GetStoresKeysReadStat returns the bytes read stat of all StoreInfo.
func (c *RaftCluster) GetStoresKeysReadStat() map[uint64]float64 {
c.RLock()
defer c.RUnlock()
return c.storesStats.GetStoresKeysReadStat()
return c.hotStat.GetStoresKeysReadStat()
}

// RegionReadStats returns hot region's read stats.
func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return c.hotSpotCache.RegionStats(statistics.ReadFlow)
return c.hotStat.RegionStats(statistics.ReadFlow)
}

// RegionWriteStats returns hot region's write stats.
func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return c.hotSpotCache.RegionStats(statistics.WriteFlow)
return c.hotStat.RegionStats(statistics.WriteFlow)
}

// CheckWriteStatus checks the write status, returns whether need update statistics and item.
func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotSpotCache.CheckWrite(region)
return c.hotStat.CheckWrite(region)
}

// CheckReadStatus checks the read status, returns whether need update statistics and item.
func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotSpotCache.CheckRead(region)
return c.hotStat.CheckRead(region)
}

// TODO: remove me.
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
}
c.Assert(cluster.putStoreLocked(store), IsNil)
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), NotNil)
c.Assert(cluster.hotStat.GetRollingStoreStats(store.GetID()), NotNil)
}

for _, store := range stores {
Expand All @@ -121,7 +121,7 @@ func (s *testClusterInfoSuite) TestFilterUnhealthyStore(c *C) {
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
c.Assert(cluster.putStoreLocked(newStore), IsNil)
c.Assert(cluster.HandleStoreHeartbeat(storeStats), IsNil)
c.Assert(cluster.storesStats.GetRollingStoreStats(store.GetID()), IsNil)
c.Assert(cluster.hotStat.GetRollingStoreStats(store.GetID()), IsNil)
}
}

Expand Down
28 changes: 28 additions & 0 deletions server/statistics/hotstat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

// HotStat contains cluster's hotspot statistics.
type HotStat struct {
*HotCache
*StoresStats
}

// NewHotStat creates the container to hold cluster's hotspot statistics.
func NewHotStat() *HotStat {
return &HotStat{
HotCache: NewHotCache(),
StoresStats: NewStoresStats(),
}
}

0 comments on commit 6a10e75

Please sign in to comment.