From f4b09ee89ebba7481468f90a54ca54ae29a35243 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Sat, 28 Mar 2020 19:57:11 +0800 Subject: [PATCH] schedulers: add some metrics for hotreigon (#2295) * schedulers: add some metrics for hotreigon Signed-off-by: nolouch --- server/cluster/cluster.go | 2 +- server/cluster/coordinator.go | 6 ++++++ server/schedulers/hot_region.go | 17 ++++++++++++++++- server/schedulers/metrics.go | 9 +++++++++ server/schedulers/utils.go | 1 + server/statistics/hot_regions_stat.go | 1 + server/statistics/store.go | 3 +++ tests/pdctl/hot/hot_test.go | 2 +- 8 files changed, 38 insertions(+), 3 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5f92e4844d9..eec1e2c07d4 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -44,7 +44,7 @@ import ( "go.uber.org/zap" ) -var backgroundJobInterval = time.Minute +var backgroundJobInterval = 10 * time.Second const ( clientTimeout = 3 * time.Second diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index bdbbd836b7d..d0c1702175d 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -402,18 +402,22 @@ func (c *coordinator) collectHotSpotMetrics() { stat, ok := status.AsPeer[storeID] if ok { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalBytesRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalBytesRate) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(0) } stat, ok = status.AsLeader[storeID] if ok { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalBytesRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalKeysRate) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(0) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0) } @@ -434,9 +438,11 @@ func (c *coordinator) collectHotSpotMetrics() { stat, ok := status.AsLeader[storeID] if ok { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalBytesRate) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalKeysRate) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count)) } else { hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(0) hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0) } diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 9bdf7d4bf31..8a1d1808d15 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -19,6 +19,7 @@ import ( "math/rand" "net/http" "sort" + "strconv" "sync" "time" @@ -70,7 +71,9 @@ const ( // HotWriteRegionType is hot write region scheduler type. HotWriteRegionType = "hot-write-region" - hotRegionLimitFactor = 0.75 + hotRegionLimitFactor = 0.75 + minHotScheduleInterval = time.Second + maxHotScheduleInterval = 20 * time.Second ) type hotScheduler struct { @@ -138,6 +141,13 @@ func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.conf.ServeHTTP(w, r) } +func (h *hotScheduler) GetMinInterval() time.Duration { + return minHotScheduleInterval +} +func (h *hotScheduler) GetNextInterval(interval time.Duration) time.Duration { + return intervalGrow(h.GetMinInterval(), maxHotScheduleInterval, exponentialGrowth) +} + func (h *hotScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster) } @@ -922,6 +932,9 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { operator.OpHotRegion, bs.cur.srcStoreID, dstPeer) + + op.Counters = append(op.Counters, balanceHotRegionCounter.WithLabelValues("move-peer", strconv.FormatUint(bs.cur.srcStoreID, 10)+"-out")) + op.Counters = append(op.Counters, balanceHotRegionCounter.WithLabelValues("move-peer", strconv.FormatUint(dstPeer.GetStoreId(), 10)+"-in")) case transferLeader: if bs.cur.region.GetStoreVoter(bs.cur.dstStoreID) == nil { return nil, nil @@ -934,6 +947,8 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { bs.cur.srcStoreID, bs.cur.dstStoreID, operator.OpHotRegion) + op.Counters = append(op.Counters, balanceHotRegionCounter.WithLabelValues("move-leader", strconv.FormatUint(bs.cur.srcStoreID, 10)+"-out")) + op.Counters = append(op.Counters, balanceHotRegionCounter.WithLabelValues("move-leader", strconv.FormatUint(bs.cur.dstStoreID, 10)+"-in")) } if err != nil { diff --git a/server/schedulers/metrics.go b/server/schedulers/metrics.go index fd2299543f8..768a5c1d92a 100644 --- a/server/schedulers/metrics.go +++ b/server/schedulers/metrics.go @@ -71,6 +71,14 @@ var balanceRegionCounter = prometheus.NewCounterVec( Help: "Counter of balance region scheduler.", }, []string{"type", "address", "store"}) +var balanceHotRegionCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "scheduler", + Name: "hot_region", + Help: "Counter of hot region scheduler.", + }, []string{"type", "store"}) + var balanceDirectionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", @@ -101,6 +109,7 @@ func init() { prometheus.MustRegister(hotPeerSummary) prometheus.MustRegister(balanceLeaderCounter) prometheus.MustRegister(balanceRegionCounter) + prometheus.MustRegister(balanceHotRegionCounter) prometheus.MustRegister(balanceDirectionCounter) prometheus.MustRegister(scatterRangeLeaderCounter) prometheus.MustRegister(scatterRangeRegionCounter) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 3fdaac73f86..b8f243cfda6 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -350,6 +350,7 @@ func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat { } return &statistics.HotPeersStat{ TotalBytesRate: li.LoadPred.Current.ByteRate, + TotalKeysRate: li.LoadPred.Current.KeyRate, Count: len(li.HotPeers), Stats: peers, } diff --git a/server/statistics/hot_regions_stat.go b/server/statistics/hot_regions_stat.go index f30ceaabb2b..74a54837819 100644 --- a/server/statistics/hot_regions_stat.go +++ b/server/statistics/hot_regions_stat.go @@ -16,6 +16,7 @@ package statistics // HotPeersStat records all hot regions statistics type HotPeersStat struct { TotalBytesRate float64 `json:"total_flow_bytes"` + TotalKeysRate float64 `json:"total_flow_keys"` Count int `json:"regions_count"` Stats []HotPeerStat `json:"statistics"` } diff --git a/server/statistics/store.go b/server/statistics/store.go index a420d9004e8..5267fe85f47 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -18,7 +18,9 @@ import ( "time" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" "github.com/pingcap/pd/v3/server/core" + "go.uber.org/zap" ) // StoresStats is a cache hold hot regions. @@ -298,6 +300,7 @@ func collect(records []*pdpb.RecordPair) float64 { func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) { statInterval := stats.GetInterval() interval := statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp() + log.Debug("update store stats", zap.Uint64("key-write", stats.KeysWritten), zap.Uint64("bytes-write", stats.BytesWritten), zap.Duration("interval", time.Duration(interval)*time.Second), zap.Uint64("store-id", stats.GetStoreId())) r.Lock() defer r.Unlock() r.bytesWriteRate.Add(float64(stats.BytesWritten), time.Duration(interval)*time.Second) diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 25cfc1c150c..5f230f000f8 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -115,7 +115,7 @@ func (s *hotTestSuite) TestHot(c *C) { hotReadRegionID, hotWriteRegionID, hotStoreId := uint64(3), uint64(2), uint64(1) pdctl.MustPutRegion(c, cluster, hotReadRegionID, hotStoreId, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval)) pdctl.MustPutRegion(c, cluster, hotWriteRegionID, hotStoreId, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval)) - time.Sleep(3200 * time.Millisecond) + time.Sleep(5000 * time.Millisecond) testHot(hotReadRegionID, hotStoreId, "read") testHot(hotWriteRegionID, hotStoreId, "write") }