Skip to content

Commit

Permalink
statistics: use []float64 to represent hot peer loads (#3591)
Browse files Browse the repository at this point in the history
* reformat

Signed-off-by: lhy1024 <admin@liudos.us>

* update

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* update trend

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* address comments

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot authored Apr 28, 2021
1 parent b07be86 commit 043a1cb
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 294 deletions.
11 changes: 5 additions & 6 deletions server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {
if err != nil {
return nil, err
}

trendStores := make([]trendStore, 0, len(stores))
for _, store := range stores {
info := newStoreInfo(h.svr.GetScheduleConfig(), store)
Expand All @@ -141,21 +140,21 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {
LastHeartbeatTS: info.Status.LastHeartbeatTS,
Uptime: info.Status.Uptime,
}
s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, store.GetID())
s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, store.GetID())
s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, statistics.RegionReadBytes, store.GetID())
s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, statistics.RegionWriteBytes, store.GetID())
trendStores = append(trendStores, s)
}
return trendStores, nil
}

func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, storeID uint64) (storeFlow float64, regionFlows []float64) {
func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, regionStatKind statistics.RegionStatKind, storeID uint64) (storeFlow float64, regionFlows []float64) {
if stats == nil {
return
}
if stat, ok := stats[storeID]; ok {
storeFlow = stat.TotalBytesRate
storeFlow = stat.TotalLoads[regionStatKind]
for _, flow := range stat.Stats {
regionFlows = append(regionFlows, flow.GetByteRate())
regionFlows = append(regionFlows, flow.GetLoad(regionStatKind))
}
}
return
Expand Down
34 changes: 18 additions & 16 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ func (c *coordinator) stop() {
type hasHotStatus interface {
GetHotReadStatus() *statistics.StoreHotPeersInfos
GetHotWriteStatus() *statistics.StoreHotPeersInfos
GetWritePendingInfluence() map[uint64]schedulers.Influence
GetReadPendingInfluence() map[uint64]schedulers.Influence
GetWritePendingInfluence() map[uint64]*schedulers.Influence
GetReadPendingInfluence() map[uint64]*schedulers.Influence
}

func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos {
Expand Down Expand Up @@ -512,8 +512,8 @@ func (c *coordinator) collectHotSpotMetrics() {
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsPeer[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalLoads[statistics.RegionWriteBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalLoads[statistics.RegionWriteKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0)
Expand All @@ -523,20 +523,21 @@ func (c *coordinator) collectHotSpotMetrics() {

stat, ok = status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalLoads[statistics.RegionWriteBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalLoads[statistics.RegionWriteKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0)
}

infl := pendings[storeID]
// TODO: add to tidb-ansible after merging pending influence into operator influence.
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.ByteRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.KeyRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count)
if infl := pendings[storeID]; infl != nil {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count)
}
}

// Collects hot read region metrics.
Expand All @@ -548,19 +549,20 @@ func (c *coordinator) collectHotSpotMetrics() {
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalLoads[statistics.RegionReadBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalLoads[statistics.RegionReadKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
}

infl := pendings[storeID]
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.ByteRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.KeyRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count)
if infl := pendings[storeID]; infl != nil {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count)
}
}
}

Expand Down
Loading

0 comments on commit 043a1cb

Please sign in to comment.