diff --git a/CHANGELOG.md b/CHANGELOG.md index 77ac2758d9d..c5227cb3ee6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ + Fix the issue about the limit of the hot region [#1552](https://github.com/pingcap/pd/pull/1552) + Add a option about grpc gateway [#1596](https://github.com/pingcap/pd/pull/1596) + Add the missing schedule config items [#1601](https://github.com/pingcap/pd/pull/1601) ++ Fix the issue about checking the number of replicas before scheduling for hot region scheduler [#1609](https://github.com/pingcap/pd/pull/1609) ++ Set influence for the operator according to the region size [#1613](https://github.com/pingcap/pd/pull/1613) ++ Enlarge the default limit of the hot region scheduler [#1616](https://github.com/pingcap/pd/pull/1616) ++ Fix the issue about ignoring the pending peer when balancing regions [#1617](https://github.com/pingcap/pd/pull/1617) ## v3.0.0 diff --git a/conf/config.toml b/conf/config.toml index 93288a6ce1d..37b5df0f3d4 100644 --- a/conf/config.toml +++ b/conf/config.toml @@ -67,6 +67,7 @@ leader-schedule-limit = 4 region-schedule-limit = 64 replica-schedule-limit = 64 merge-schedule-limit = 8 +hot-region-schedule-limit = 4 #enable-one-way-merge = false #tolerant-size-ratio = 0.0 diff --git a/pkg/mock/mockoption/mockoption.go b/pkg/mock/mockoption/mockoption.go index cd55f712ade..df2baf560dd 100644 --- a/pkg/mock/mockoption/mockoption.go +++ b/pkg/mock/mockoption/mockoption.go @@ -28,10 +28,10 @@ const ( defaultSplitMergeInterval = 0 defaultMaxStoreDownTime = 30 * time.Minute defaultLeaderScheduleLimit = 4 - defaultRegionScheduleLimit = 4 - defaultReplicaScheduleLimit = 8 + defaultRegionScheduleLimit = 64 + defaultReplicaScheduleLimit = 64 defaultMergeScheduleLimit = 8 - defaultHotRegionScheduleLimit = 2 + defaultHotRegionScheduleLimit = 4 defaultStoreBalanceRate = 60 defaultTolerantSizeRatio = 2.5 defaultLowSpaceRatio = 0.8 diff --git a/server/cluster.go b/server/cluster.go index 12f359d900f..96887ab00dc 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -379,8 +379,8 @@ func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionI // UpdateStoreLabels updates a store's location labels. func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel) error { c.RLock() - defer c.RUnlock() store := c.cachedCluster.GetStore(storeID) + c.RUnlock() if store == nil { return errors.Errorf("invalid store ID %d, not found", storeID) } diff --git a/server/cluster_info.go b/server/cluster_info.go index f8d0295540a..c40028b8d04 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -415,6 +415,13 @@ func (c *clusterInfo) RandFollowerRegion(storeID uint64, opts ...core.RegionOpti return c.core.RandFollowerRegion(storeID, opts...) } +// RandPendingRegion returns a random region that has a pending peer on the store. +func (c *clusterInfo) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { + c.RLock() + defer c.RUnlock() + return c.core.RandPendingRegion(storeID, opts...) +} + // GetAverageRegionSize returns the average region approximate size. func (c *clusterInfo) GetAverageRegionSize() int64 { c.RLock() diff --git a/server/config.go b/server/config.go index 2308c7b2f42..e351134b7ee 100644 --- a/server/config.go +++ b/server/config.go @@ -576,11 +576,11 @@ const ( defaultSplitMergeInterval = 1 * time.Hour defaultPatrolRegionInterval = 100 * time.Millisecond defaultMaxStoreDownTime = 30 * time.Minute - defaultLeaderScheduleLimit = 8 + defaultLeaderScheduleLimit = 4 defaultRegionScheduleLimit = 64 defaultReplicaScheduleLimit = 64 defaultMergeScheduleLimit = 8 - defaultHotRegionScheduleLimit = 2 + defaultHotRegionScheduleLimit = 4 defaultStoreBalanceRate = 15 defaultTolerantSizeRatio = 0 defaultLowSpaceRatio = 0.8 diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index fd0020aa656..af6109de4fc 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -99,6 +99,11 @@ func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) * return bc.Regions.RandLeaderRegion(storeID, opts...) } +// RandPendingRegion returns a random region that has a pending peer on the store. +func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo { + return bc.Regions.RandPendingRegion(storeID, opts...) +} + // GetAverageRegionSize returns the average region approximate size. func (bc *BasicCluster) GetAverageRegionSize() int64 { return bc.Regions.GetAverageRegionSize() diff --git a/server/core/region.go b/server/core/region.go index 4e7e213070e..0a7097e998f 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -668,12 +668,17 @@ func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo { return randRegion(r.regions, opts...) } -// RandLeaderRegion get a store's leader region by random +// RandPendingRegion randomly gets a store's region with a pending peer. +func (r *RegionsInfo) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo { + return randRegion(r.pendingPeers[storeID], opts...) +} + +// RandLeaderRegion randomly gets a store's leader region. func (r *RegionsInfo) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo { return randRegion(r.leaders[storeID], opts...) } -// RandFollowerRegion get a store's follower region by random +// RandFollowerRegion randomly gets a store's follower region. func (r *RegionsInfo) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo { return randRegion(r.followers[storeID], opts...) } diff --git a/server/core/region_option.go b/server/core/region_option.go index bf772045ba0..99411b528bd 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -28,6 +28,13 @@ func HealthRegion() RegionOption { } } +// HealthRegionAllowPending checks if the region is healthy with allowing the pending peer. +func HealthRegionAllowPending() RegionOption { + return func(region *RegionInfo) bool { + return len(region.downPeers) == 0 && len(region.learners) == 0 + } +} + // RegionCreateOption used to create region. type RegionCreateOption func(region *RegionInfo) diff --git a/server/schedule/filters.go b/server/schedule/filters.go index e2b7b1c979a..928110eccf8 100644 --- a/server/schedule/filters.go +++ b/server/schedule/filters.go @@ -415,8 +415,15 @@ func (f StoreStateFilter) FilterTarget(opt Options, store *core.StoreInfo) bool return true } - if f.MoveRegion && f.filterMoveRegion(opt, store) { - return true + if f.MoveRegion { + // only target consider the pending peers because pending more means the disk is slower. + if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) { + return true + } + + if f.filterMoveRegion(opt, store) { + return true + } } return false } @@ -430,9 +437,6 @@ func (f StoreStateFilter) filterMoveRegion(opt Options, store *core.StoreInfo) b return true } - if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) { - return true - } if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() || uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() || uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() { diff --git a/server/schedule/merge_checker_test.go b/server/schedule/merge_checker_test.go index 788c5977bf4..e60832f7e82 100644 --- a/server/schedule/merge_checker_test.go +++ b/server/schedule/merge_checker_test.go @@ -14,16 +14,22 @@ package schedule import ( + "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pkg/mock/mockcluster" + "github.com/pingcap/pd/pkg/mock/mockhbstream" "github.com/pingcap/pd/pkg/mock/mockoption" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" ) +func TestChecker(t *testing.T) { + TestingT(t) +} + var _ = Suite(&testMergeCheckerSuite{}) type testMergeCheckerSuite struct { @@ -257,3 +263,42 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { }, }) } + +func (s *testMergeCheckerSuite) TestStorelimit(c *C) { + oc := NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream()) + s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour + s.cluster.ScheduleOptions.StoreBalanceRate = 60 + s.regions[2] = s.regions[2].Clone( + core.SetPeers([]*metapb.Peer{ + {Id: 109, StoreId: 2}, + {Id: 110, StoreId: 3}, + {Id: 111, StoreId: 6}, + }), + core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}), + ) + s.cluster.PutRegion(s.regions[2]) + ops := s.mc.Check(s.regions[2]) + c.Assert(ops, NotNil) + // The size of Region is less or equal than 1MB. + for i := 0; i < 50; i++ { + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } + } + s.regions[2] = s.regions[2].Clone( + core.SetApproximateSize(2), + core.SetApproximateKeys(2), + ) + s.cluster.PutRegion(s.regions[2]) + ops = s.mc.Check(s.regions[2]) + c.Assert(ops, NotNil) + // The size of Region is more than 1MB but no more than 20MB. + for i := 0; i < 5; i++ { + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } + } + c.Assert(oc.AddOperator(ops...), IsFalse) +} diff --git a/server/schedule/operator.go b/server/schedule/operator.go index 06af463aaf2..b1027a2c64b 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -39,6 +39,11 @@ const ( RegionOperatorWaitTime = 10 * time.Minute // RegionInfluence represents the influence of a operator step, which is used by ratelimit. RegionInfluence int64 = 1000 + // smallRegionInfluence represents the influence of a operator step + // when the region size is smaller than smallRegionThreshold, which is used by ratelimit. + smallRegionInfluence int64 = 200 + // smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it. + smallRegionThreshold int64 = 20 ) // OperatorStep describes the basic scheduling steps that can not be subdivided. @@ -98,9 +103,14 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool { func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(ap.ToStore) - to.RegionSize += region.GetApproximateSize() + regionSize := region.GetApproximateSize() + to.RegionSize += regionSize to.RegionCount++ - to.StepCost += RegionInfluence + if regionSize > smallRegionThreshold { + to.StepCost += RegionInfluence + } else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize { + to.StepCost += smallRegionInfluence + } } // AddLearner is an OperatorStep that adds a region learner peer. @@ -128,9 +138,14 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool { func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(al.ToStore) - to.RegionSize += region.GetApproximateSize() + regionSize := region.GetApproximateSize() + to.RegionSize += regionSize to.RegionCount++ - to.StepCost += RegionInfluence + if regionSize > smallRegionThreshold { + to.StepCost += RegionInfluence + } else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize { + to.StepCost += smallRegionInfluence + } } // PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter. diff --git a/server/schedule/operator_test.go b/server/schedule/operator_test.go index ebed4e45682..674c985d364 100644 --- a/server/schedule/operator_test.go +++ b/server/schedule/operator_test.go @@ -43,7 +43,7 @@ func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, pe leader = peer } } - regionInfo := core.NewRegionInfo(®ion, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) + regionInfo := core.NewRegionInfo(®ion, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50)) return regionInfo } @@ -124,71 +124,71 @@ func (s *testOperatorSuite) TestInfluence(c *C) { c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ LeaderSize: 0, LeaderCount: 0, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, RegionSize: 0, RegionCount: 0, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) RemovePeer{FromStore: 1}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, - RegionSize: -10, + RegionSize: -50, RegionCount: -1, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) MergeRegion{IsPassive: false}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, - RegionSize: -10, + RegionSize: -50, RegionCount: -1, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) MergeRegion{IsPassive: true}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -2, - RegionSize: -10, + RegionSize: -50, RegionCount: -2, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 0, StepCost: 1000, }) diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index 008f1914ee3..54d2bb9584e 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -25,22 +25,28 @@ import ( "go.uber.org/zap" ) -// Cluster provides an overview of a cluster's regions distribution. -type Cluster interface { +// RegionSetInformer provides access to a shared informer of regions. +// TODO: move to core package +type RegionSetInformer interface { RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo + RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo GetAverageRegionSize() int64 GetStoreRegionCount(storeID uint64) int + GetRegion(id uint64) *core.RegionInfo + GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) + ScanRegions(startKey []byte, limit int) []*core.RegionInfo +} +// Cluster provides an overview of a cluster's regions distribution. +type Cluster interface { + RegionSetInformer GetStores() []*core.StoreInfo GetStore(id uint64) *core.StoreInfo - GetRegion(id uint64) *core.RegionInfo + GetRegionStores(region *core.RegionInfo) []*core.StoreInfo GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo GetLeaderStore(region *core.RegionInfo) *core.StoreInfo - GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) - ScanRegions(startKey []byte, limit int) []*core.RegionInfo - BlockStore(id uint64) error UnblockStore(id uint64) diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 520da9ec834..9968d335ae6 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -94,10 +94,15 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. opInfluence := s.opController.GetOpInfluence(cluster) var hasPotentialTarget bool for i := 0; i < balanceRegionRetryLimit; i++ { - // Priority the region that has a follower in the source store. - region := cluster.RandFollowerRegion(sourceID, core.HealthRegion()) + // Priority picks the region that has a pending peer. + // Pending region may means the disk is overload, remove the pending region firstly. + region := cluster.RandPendingRegion(sourceID, core.HealthRegionAllowPending()) if region == nil { - // Then the region has the leader in the source store + // Then picks the region that has a follower in the source store. + region = cluster.RandFollowerRegion(sourceID, core.HealthRegion()) + } + if region == nil { + // Last, picks the region has the leader in the source store. region = cluster.RandLeaderRegion(sourceID, core.HealthRegion()) } if region == nil { diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 6447643d28c..d45c8ac1274 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -488,6 +488,35 @@ func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 3) } +func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) { + opt := mockoption.NewScheduleOptions() + tc := mockcluster.NewCluster(opt) + oc := schedule.NewOperatorController(nil, nil) + + newTestReplication(opt, 3, "zone", "rack", "host") + + sb, err := schedule.CreateScheduler("balance-region", oc) + c.Assert(err, IsNil) + + // Store 1 has the largest region score, so the balancer try to replace peer in store 1. + tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(2, 7, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(3, 15, map[string]string{"zone": "z1", "rack": "r2", "host": "h2"}) + // Store 4 has smaller region score than store 1 and more better place than store 2. + tc.AddLabelsStore(4, 10, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + + // set pending peer + tc.AddLeaderRegion(1, 1, 2, 3) + tc.AddLeaderRegion(2, 1, 2, 3) + tc.AddLeaderRegion(3, 2, 1, 3) + region := tc.GetRegion(3) + region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(1)})) + tc.PutRegion(region) + + c.Assert(sb.Schedule(tc)[0].RegionID(), Equals, uint64(3)) + testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 4) +} + var _ = Suite(&testReplicaCheckerSuite{}) type testReplicaCheckerSuite struct{} diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 2ad7002f8ff..f6163f72f61 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -288,7 +288,19 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) { rs := storesStat[srcStoreID].RegionsStat[i] srcRegion := cluster.GetRegion(rs.RegionID) - if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 { + if srcRegion == nil { + schedulerCounter.WithLabelValues(h.GetName(), "no_region").Inc() + continue + } + + if isRegionUnhealthy(srcRegion) { + schedulerCounter.WithLabelValues(h.GetName(), "unhealthy_replica").Inc() + continue + } + + if len(srcRegion.GetPeers()) != cluster.GetMaxReplicas() { + log.Debug("region has abnormal replica count", zap.String("scheduler", h.GetName()), zap.Uint64("region-id", srcRegion.GetID())) + schedulerCounter.WithLabelValues(h.GetName(), "abnormal_replica").Inc() continue } @@ -345,7 +357,13 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) { rs := storesStat[srcStoreID].RegionsStat[i] srcRegion := cluster.GetRegion(rs.RegionID) - if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 { + if srcRegion == nil { + schedulerCounter.WithLabelValues(h.GetName(), "no_region").Inc() + continue + } + + if isRegionUnhealthy(srcRegion) { + schedulerCounter.WithLabelValues(h.GetName(), "unhealthy_replica").Inc() continue } diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 96748748ec8..951e167a2a8 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -361,6 +361,34 @@ func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) { c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Not(Equals), 6) } +var _ = Suite(&testHotRegionSchedulerSuite{}) + +type testHotRegionSchedulerSuite struct{} + +func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) { + opt := mockoption.NewScheduleOptions() + opt.LeaderScheduleLimit = 0 + tc := mockcluster.NewCluster(opt) + hb, err := schedule.CreateScheduler("hot-read-region", schedule.NewOperatorController(nil, nil)) + c.Assert(err, IsNil) + + tc.AddRegionStore(1, 3) + tc.AddRegionStore(2, 2) + tc.AddRegionStore(3, 2) + + // Report store read bytes. + tc.UpdateStorageReadBytes(1, 75*1024*1024) + tc.UpdateStorageReadBytes(2, 45*1024*1024) + tc.UpdateStorageReadBytes(3, 45*1024*1024) + + tc.AddLeaderRegionWithReadInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*statistics.RegionHeartBeatReportInterval, 1, 3) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3) + opt.HotRegionCacheHitsThreshold = 0 + c.Assert(tc.IsRegionHot(1), IsTrue) + c.Assert(hb.Schedule(tc), IsNil) +} + var _ = Suite(&testEvictLeaderSuite{}) type testEvictLeaderSuite struct{} diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 31dcf030e0e..37ae6ce7c6e 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -49,6 +49,10 @@ func minDuration(a, b time.Duration) time.Duration { return b } +func isRegionUnhealthy(region *core.RegionInfo) bool { + return len(region.GetDownPeers()) != 0 || len(region.GetLearners()) != 0 +} + func shouldBalance(cluster schedule.Cluster, source, target *core.StoreInfo, region *core.RegionInfo, kind core.ResourceKind, opInfluence schedule.OpInfluence) bool { // The reason we use max(regionSize, averageRegionSize) to check is: // 1. prevent moving small regions between stores with close scores, leading to unnecessary balance. diff --git a/server/schedulers/utils_test.go b/server/schedulers/utils_test.go index b82f2da344d..e24d69b30cb 100644 --- a/server/schedulers/utils_test.go +++ b/server/schedulers/utils_test.go @@ -18,6 +18,9 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" ) func TestSchedulers(t *testing.T) { @@ -45,3 +48,32 @@ func (s *testMinMaxSuite) TestMinDuration(c *C) { c.Assert(minDuration(time.Second, time.Minute), Equals, time.Second) c.Assert(minDuration(time.Second, time.Second), Equals, time.Second) } + +var _ = Suite(&testRegionUnhealthySuite{}) + +type testRegionUnhealthySuite struct{} + +func (s *testRegionUnhealthySuite) TestIsRegionUnhealthy(c *C) { + peers := make([]*metapb.Peer, 0, 3) + for i := uint64(0); i < 2; i++ { + p := &metapb.Peer{ + Id: i, + StoreId: i, + } + peers = append(peers, p) + } + peers = append(peers, &metapb.Peer{ + Id: 2, + StoreId: 2, + IsLearner: true, + }) + + r1 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0], core.WithDownPeers([]*pdpb.PeerStats{{Peer: peers[1]}})) + r2 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0], core.WithPendingPeers([]*metapb.Peer{peers[1]})) + r3 := core.NewRegionInfo(&metapb.Region{Peers: peers[:3]}, peers[0], core.WithLearners([]*metapb.Peer{peers[2]})) + r4 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0]) + c.Assert(isRegionUnhealthy(r1), IsTrue) + c.Assert(isRegionUnhealthy(r2), IsFalse) + c.Assert(isRegionUnhealthy(r3), IsTrue) + c.Assert(isRegionUnhealthy(r4), IsFalse) +} diff --git a/server/statistics/store.go b/server/statistics/store.go index 32b324081a1..53fa77d0f5e 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -22,6 +22,7 @@ import ( // StoresStats is a cache hold hot regions. type StoresStats struct { + sync.RWMutex rollingStoresStats map[uint64]*RollingStoreStats bytesReadRate float64 bytesWriteRate float64 @@ -36,26 +37,36 @@ func NewStoresStats() *StoresStats { // CreateRollingStoreStats creates RollingStoreStats with a given store ID. func (s *StoresStats) CreateRollingStoreStats(storeID uint64) { + s.Lock() + defer s.Unlock() s.rollingStoresStats[storeID] = newRollingStoreStats() } // RemoveRollingStoreStats removes RollingStoreStats with a given store ID. func (s *StoresStats) RemoveRollingStoreStats(storeID uint64) { + s.Lock() + defer s.Unlock() delete(s.rollingStoresStats, storeID) } // GetRollingStoreStats gets RollingStoreStats with a given store ID. func (s *StoresStats) GetRollingStoreStats(storeID uint64) *RollingStoreStats { + s.RLock() + defer s.RUnlock() return s.rollingStoresStats[storeID] } // Observe records the current store status with a given store. func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats) { + s.RLock() + defer s.RUnlock() s.rollingStoresStats[storeID].Observe(stats) } // UpdateTotalBytesRate updates the total bytes write rate and read rate. func (s *StoresStats) UpdateTotalBytesRate(stores *core.StoresInfo) { + s.RLock() + defer s.RUnlock() var totalBytesWriteRate float64 var totalBytesReadRate float64 var writeRate, readRate float64 @@ -83,6 +94,8 @@ func (s *StoresStats) TotalBytesReadRate() float64 { // GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo. func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { writeRate, _ := stats.GetBytesRate() @@ -93,6 +106,8 @@ func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 { // GetStoresBytesReadStat returns the bytes read stat of all StoreInfo. func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { _, readRate := stats.GetBytesRate() @@ -103,6 +118,8 @@ func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 { // GetStoresKeysWriteStat returns the keys write stat of all StoreInfo. func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { res[storeID] = uint64(stats.GetKeysWriteRate()) @@ -112,6 +129,8 @@ func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 { // GetStoresKeysReadStat returns the bytes read stat of all StoreInfo. func (s *StoresStats) GetStoresKeysReadStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { res[storeID] = uint64(stats.GetKeysReadRate())