Skip to content

Commit

Permalink
cluster: calculate StoreWriteRate based on the stats of RegionHeartbe…
Browse files Browse the repository at this point in the history
…at (#3819)

* cluster: calculate StoreWriteRate based on the stats of RegionHeartbeat

* add StoreRegionWriteBytes and StoreRegionWriteKeys
* count the write rate in the region tree
* observe region stats on a regular basis

Signed-off-by: HunDunDM <hundundm@gmail.com>

* address comment

Signed-off-by: HunDunDM <hundundm@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HunDunDM and ti-chi-bot authored Jul 29, 2021
1 parent 85d02ac commit 78460ca
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 40 deletions.
25 changes: 18 additions & 7 deletions server/api/hot_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"

"github.com/tikv/pd/server"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/statistics"
"github.com/unrolled/render"
)
Expand Down Expand Up @@ -131,13 +132,23 @@ func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request)
QueryWriteStats: make(map[uint64]float64),
QueryReadStats: make(map[uint64]float64),
}
for id, loads := range h.GetStoresLoads() {
stats.BytesWriteStats[id] = loads[statistics.StoreWriteBytes]
stats.BytesReadStats[id] = loads[statistics.StoreReadBytes]
stats.KeysWriteStats[id] = loads[statistics.StoreWriteKeys]
stats.KeysReadStats[id] = loads[statistics.StoreReadKeys]
stats.QueryWriteStats[id] = loads[statistics.StoreWriteQuery]
stats.QueryReadStats[id] = loads[statistics.StoreReadQuery]
stores, _ := h.GetStores()
storesLoads := h.GetStoresLoads()
for _, store := range stores {
id := store.GetID()
if loads, ok := storesLoads[id]; ok {
if core.IsTiFlashStore(store.GetMeta()) {
stats.BytesWriteStats[id] = loads[statistics.StoreRegionsWriteBytes]
stats.KeysWriteStats[id] = loads[statistics.StoreRegionsWriteKeys]
} else {
stats.BytesWriteStats[id] = loads[statistics.StoreWriteBytes]
stats.KeysWriteStats[id] = loads[statistics.StoreWriteKeys]
}
stats.BytesReadStats[id] = loads[statistics.StoreReadBytes]
stats.KeysReadStats[id] = loads[statistics.StoreReadKeys]
stats.QueryWriteStats[id] = loads[statistics.StoreWriteQuery]
stats.QueryReadStats[id] = loads[statistics.StoreReadQuery]
}
}
h.rd.JSON(w, http.StatusOK, stats)
}
26 changes: 25 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,13 @@ func (c *RaftCluster) Start(s Server) error {
c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager)
c.limiter = NewStoreLimiter(s.GetPersistOptions())

c.wg.Add(4)
c.wg.Add(5)
go c.runCoordinator()
failpoint.Inject("highFrequencyClusterJobs", func() {
backgroundJobInterval = 100 * time.Microsecond
})
go c.runBackgroundJobs(backgroundJobInterval)
go c.runStatsBackgroundJobs()
go c.syncRegions()
go c.runReplicationMode()
c.running = true
Expand Down Expand Up @@ -325,6 +326,29 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
}
}

func (c *RaftCluster) runStatsBackgroundJobs() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(statistics.RegionsStatsObserveInterval)
defer ticker.Stop()

for {
select {
case <-c.quit:
log.Info("statistics background jobs has been stopped")
return
case <-ticker.C:
c.RLock()
storeIDs, writeBytesRates, writeKeysRates := c.core.GetStoresWriteRate()
c.RUnlock()
c.Lock()
c.hotStat.ObserveRegionsStats(storeIDs, writeBytesRates, writeKeysRates)
c.Unlock()
}
}
}

func (c *RaftCluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
Expand Down
31 changes: 30 additions & 1 deletion server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (bc *BasicCluster) GetStoreLeaderRegionSize(storeID uint64) int64 {
func (bc *BasicCluster) GetStoreRegionSize(storeID uint64) int64 {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.GetStoreLeaderRegionSize(storeID) + bc.Regions.GetStoreFollowerRegionSize(storeID) + bc.Regions.GetStoreLearnerRegionSize(storeID)
return bc.Regions.GetStoreRegionSize(storeID)
}

// GetAverageRegionSize returns the average region approximate size.
Expand All @@ -268,6 +268,35 @@ func (bc *BasicCluster) GetAverageRegionSize() int64 {
return bc.Regions.GetAverageRegionSize()
}

func (bc *BasicCluster) getWriteRate(
f func(storeID uint64) (bytesRate, keysRate float64),
) (storeIDs []uint64, bytesRates, keysRates []float64) {
bc.RLock()
defer bc.RUnlock()
count := len(bc.Stores.stores)
storeIDs = make([]uint64, 0, count)
bytesRates = make([]float64, 0, count)
keysRates = make([]float64, 0, count)
for _, store := range bc.Stores.stores {
id := store.GetID()
bytesRate, keysRate := f(id)
storeIDs = append(storeIDs, id)
bytesRates = append(bytesRates, bytesRate)
keysRates = append(keysRates, keysRate)
}
return
}

// GetStoresLeaderWriteRate get total write rate of each store's leaders.
func (bc *BasicCluster) GetStoresLeaderWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) {
return bc.getWriteRate(bc.Regions.GetStoreLeaderWriteRate)
}

// GetStoresWriteRate get total write rate of each store's regions.
func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) {
return bc.getWriteRate(bc.Regions.GetStoreWriteRate)
}

// PutStore put a store.
func (bc *BasicCluster) PutStore(store *StoreInfo) {
bc.Lock()
Expand Down
32 changes: 32 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const (
// They need to be filtered so as not to affect downstream.
// (flow size >= 1024TB)
ImpossibleFlowSize = 1 << 50
// Only statistics within this interval limit are valid.
statsReportMinInterval = 3 // 3s
statsReportMaxInterval = 5 * 60 // 5min
)

// RegionFromHeartbeat constructs a Region from region heartbeat.
Expand Down Expand Up @@ -433,6 +436,16 @@ func (r *RegionInfo) GetKeysRead() uint64 {
return r.readKeys
}

// GetWriteRate returns the write rate of the region.
func (r *RegionInfo) GetWriteRate() (bytesRate, keysRate float64) {
reportInterval := r.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
if interval >= statsReportMinInterval && interval <= statsReportMaxInterval {
return float64(r.writtenBytes) / float64(interval), float64(r.writtenKeys) / float64(interval)
}
return 0, 0
}

// GetLeader returns the leader of the region.
func (r *RegionInfo) GetLeader() *metapb.Peer {
return r.leader
Expand Down Expand Up @@ -893,6 +906,25 @@ func (r *RegionsInfo) GetStoreRegionSize(storeID uint64) int64 {
return r.GetStoreLeaderRegionSize(storeID) + r.GetStoreFollowerRegionSize(storeID) + r.GetStoreLearnerRegionSize(storeID)
}

// GetStoreLeaderWriteRate get total write rate of store's leaders
func (r *RegionsInfo) GetStoreLeaderWriteRate(storeID uint64) (bytesRate, keysRate float64) {
return r.leaders[storeID].TotalWriteRate()
}

// GetStoreWriteRate get total write rate of store's regions
func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate float64) {
storeBytesRate, storeKeysRate := r.leaders[storeID].TotalWriteRate()
bytesRate += storeBytesRate
keysRate += storeKeysRate
storeBytesRate, storeKeysRate = r.followers[storeID].TotalWriteRate()
bytesRate += storeBytesRate
keysRate += storeKeysRate
storeBytesRate, storeKeysRate = r.learners[storeID].TotalWriteRate()
bytesRate += storeBytesRate
keysRate += storeKeysRate
return
}

// GetMetaRegions gets a set of metapb.Region from regionMap
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
regions := make([]*metapb.Region, 0, r.regions.Len())
Expand Down
37 changes: 34 additions & 3 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,31 @@ func (s *testRegionInfoSuite) TestRegionRoundingFlow(c *C) {
}
}

func (s *testRegionInfoSuite) TestRegionWriteRate(c *C) {
testcases := []struct {
bytes uint64
keys uint64
interval uint64
expectBytesRate float64
expectKeysRate float64
}{
{0, 0, 0, 0, 0},
{10, 3, 0, 0, 0},
{0, 0, 1, 0, 0},
{10, 3, 1, 0, 0},
{0, 0, 5, 0, 0},
{10, 3, 5, 2, 0.6},
{0, 0, 500, 0, 0},
{10, 3, 500, 0, 0},
}
for _, t := range testcases {
r := NewRegionInfo(&metapb.Region{Id: 100}, nil, SetWrittenBytes(t.bytes), SetWrittenKeys(t.keys), SetReportInterval(t.interval))
bytesRate, keysRate := r.GetWriteRate()
c.Assert(bytesRate, Equals, t.expectBytesRate)
c.Assert(keysRate, Equals, t.expectKeysRate)
}
}

var _ = Suite(&testRegionMapSuite{})

type testRegionMapSuite struct{}
Expand Down Expand Up @@ -331,15 +356,21 @@ func (*testRegionKey) TestSetRegion(c *C) {
c.Assert(regions.GetRegion(18), IsNil)

// Test update keys and size of region.
region = region.Clone()
region.approximateKeys = 20
region.approximateSize = 30
region = region.Clone(
SetApproximateKeys(20),
SetApproximateSize(30),
SetWrittenBytes(40),
SetWrittenKeys(10),
SetReportInterval(5))
regions.SetRegion(region)
checkRegions(c, regions)
c.Assert(regions.tree.length(), Equals, 96)
c.Assert(len(regions.GetRegions()), Equals, 96)
c.Assert(regions.GetRegion(201), NotNil)
c.Assert(regions.tree.TotalSize(), Equals, int64(30))
bytesRate, keysRate := regions.tree.TotalWriteRate()
c.Assert(bytesRate, Equals, float64(8))
c.Assert(keysRate, Equals, float64(2))
}

func (*testRegionKey) TestShouldRemoveFromSubTree(c *C) {
Expand Down
35 changes: 31 additions & 4 deletions server/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ const (
type regionTree struct {
tree *btree.BTree
// Statistics
totalSize int64
totalSize int64
totalWriteBytesRate float64
totalWriteKeysRate float64
}

func newRegionTree() *regionTree {
return &regionTree{
tree: btree.New(defaultBTreeDegree),
totalSize: 0,
tree: btree.New(defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
}
}

Expand Down Expand Up @@ -100,15 +104,21 @@ func (t *regionTree) getOverlaps(region *RegionInfo) []*RegionInfo {
func (t *regionTree) update(item *regionItem) []*RegionInfo {
region := item.region
t.totalSize += region.approximateSize
overlaps := t.getOverlaps(region)
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate

overlaps := t.getOverlaps(region)
for _, old := range overlaps {
log.Debug("overlapping region",
zap.Uint64("region-id", old.GetID()),
logutil.ZapRedactStringer("delete-region", RegionToHexMeta(old.GetMeta())),
logutil.ZapRedactStringer("update-region", RegionToHexMeta(region.GetMeta())))
t.tree.Delete(&regionItem{old})
t.totalSize -= old.approximateSize
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
}

t.tree.ReplaceOrInsert(item)
Expand All @@ -118,7 +128,14 @@ func (t *regionTree) update(item *regionItem) []*RegionInfo {
// updateStat is used to update statistics when regionItem.region is directly replaced.
func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate

t.totalSize -= origin.approximateSize
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
}

// remove removes a region if the region is in the tree.
Expand All @@ -134,6 +151,9 @@ func (t *regionTree) remove(region *RegionInfo) btree.Item {
}

t.totalSize -= region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate

return t.tree.Delete(result)
}
Expand Down Expand Up @@ -297,6 +317,13 @@ func (t *regionTree) TotalSize() int64 {
return t.totalSize
}

func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) {
if t.length() == 0 {
return 0, 0
}
return t.totalWriteBytesRate, t.totalWriteKeysRate
}

func init() {
rand.Seed(time.Now().UnixNano())
}
7 changes: 7 additions & 0 deletions server/statistics/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
StoreDiskReadRate
StoreDiskWriteRate

StoreRegionsWriteBytes // Same as StoreWriteBytes, but it is counted by RegionHeartbeat.
StoreRegionsWriteKeys // Same as StoreWriteKeys, but it is counted by RegionHeartbeat.

StoreStatCount
)

Expand All @@ -84,6 +87,10 @@ func (k StoreStatKind) String() string {
return "store_disk_read_rate"
case StoreDiskWriteRate:
return "store_disk_write_rate"
case StoreRegionsWriteBytes:
return "store_regions_write_bytes"
case StoreRegionsWriteKeys:
return "store_regions_write_keys"
}

return "unknown StoreStatKind"
Expand Down
Loading

0 comments on commit 78460ca

Please sign in to comment.