diff --git a/server/checker/merge_checker_test.go b/server/checker/merge_checker_test.go index e2a9c47d2e9..ab43f642750 100644 --- a/server/checker/merge_checker_test.go +++ b/server/checker/merge_checker_test.go @@ -14,17 +14,23 @@ package checker import ( + "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pkg/mock/mockcluster" + "github.com/pingcap/pd/pkg/mock/mockhbstream" "github.com/pingcap/pd/pkg/mock/mockoption" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" ) +func TestChecker(t *testing.T) { + TestingT(t) +} + var _ = Suite(&testMergeCheckerSuite{}) type testMergeCheckerSuite struct { @@ -258,3 +264,42 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { }, }) } + +func (s *testMergeCheckerSuite) TestStorelimit(c *C) { + oc := schedule.NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream()) + s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour + s.cluster.ScheduleOptions.StoreBalanceRate = 60 + s.regions[2] = s.regions[2].Clone( + core.SetPeers([]*metapb.Peer{ + {Id: 109, StoreId: 2}, + {Id: 110, StoreId: 3}, + {Id: 111, StoreId: 6}, + }), + core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}), + ) + s.cluster.PutRegion(s.regions[2]) + ops := s.mc.Check(s.regions[2]) + c.Assert(ops, NotNil) + // The size of Region is less or equal than 1MB. + for i := 0; i < 50; i++ { + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } + } + s.regions[2] = s.regions[2].Clone( + core.SetApproximateSize(2), + core.SetApproximateKeys(2), + ) + s.cluster.PutRegion(s.regions[2]) + ops = s.mc.Check(s.regions[2]) + c.Assert(ops, NotNil) + // The size of Region is more than 1MB but no more than 20MB. + for i := 0; i < 5; i++ { + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } + } + c.Assert(oc.AddOperator(ops...), IsFalse) +} diff --git a/server/cluster.go b/server/cluster.go index 12f359d900f..96887ab00dc 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -379,8 +379,8 @@ func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionI // UpdateStoreLabels updates a store's location labels. func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel) error { c.RLock() - defer c.RUnlock() store := c.cachedCluster.GetStore(storeID) + c.RUnlock() if store == nil { return errors.Errorf("invalid store ID %d, not found", storeID) } diff --git a/server/schedule/operator.go b/server/schedule/operator.go index dd3714aee9a..c1f65a17964 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -39,6 +39,11 @@ const ( RegionOperatorWaitTime = 10 * time.Minute // RegionInfluence represents the influence of a operator step, which is used by ratelimit. RegionInfluence int64 = 1000 + // smallRegionInfluence represents the influence of a operator step + // when the region size is smaller than smallRegionThreshold, which is used by ratelimit. + smallRegionInfluence int64 = 200 + // smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it. + smallRegionThreshold int64 = 20 ) // OperatorStep describes the basic scheduling steps that can not be subdivided. @@ -98,9 +103,14 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool { func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(ap.ToStore) - to.RegionSize += region.GetApproximateSize() + regionSize := region.GetApproximateSize() + to.RegionSize += regionSize to.RegionCount++ - to.StepCost += RegionInfluence + if regionSize > smallRegionThreshold { + to.StepCost += RegionInfluence + } else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize { + to.StepCost += smallRegionInfluence + } } // AddLearner is an OperatorStep that adds a region learner peer. @@ -128,9 +138,14 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool { func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(al.ToStore) - to.RegionSize += region.GetApproximateSize() + regionSize := region.GetApproximateSize() + to.RegionSize += regionSize to.RegionCount++ - to.StepCost += RegionInfluence + if regionSize > smallRegionThreshold { + to.StepCost += RegionInfluence + } else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize { + to.StepCost += smallRegionInfluence + } } // PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter. diff --git a/server/schedule/operator_test.go b/server/schedule/operator_test.go index ebed4e45682..674c985d364 100644 --- a/server/schedule/operator_test.go +++ b/server/schedule/operator_test.go @@ -43,7 +43,7 @@ func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, pe leader = peer } } - regionInfo := core.NewRegionInfo(®ion, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) + regionInfo := core.NewRegionInfo(®ion, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50)) return regionInfo } @@ -124,71 +124,71 @@ func (s *testOperatorSuite) TestInfluence(c *C) { c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ LeaderSize: 0, LeaderCount: 0, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, RegionSize: 0, RegionCount: 0, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) RemovePeer{FromStore: 1}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, - RegionSize: -10, + RegionSize: -50, RegionCount: -1, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) MergeRegion{IsPassive: false}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, - RegionSize: -10, + RegionSize: -50, RegionCount: -1, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) MergeRegion{IsPassive: true}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -2, - RegionSize: -10, + RegionSize: -50, RegionCount: -2, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 0, StepCost: 1000, }) diff --git a/server/statistics/store.go b/server/statistics/store.go index 32b324081a1..53fa77d0f5e 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -22,6 +22,7 @@ import ( // StoresStats is a cache hold hot regions. type StoresStats struct { + sync.RWMutex rollingStoresStats map[uint64]*RollingStoreStats bytesReadRate float64 bytesWriteRate float64 @@ -36,26 +37,36 @@ func NewStoresStats() *StoresStats { // CreateRollingStoreStats creates RollingStoreStats with a given store ID. func (s *StoresStats) CreateRollingStoreStats(storeID uint64) { + s.Lock() + defer s.Unlock() s.rollingStoresStats[storeID] = newRollingStoreStats() } // RemoveRollingStoreStats removes RollingStoreStats with a given store ID. func (s *StoresStats) RemoveRollingStoreStats(storeID uint64) { + s.Lock() + defer s.Unlock() delete(s.rollingStoresStats, storeID) } // GetRollingStoreStats gets RollingStoreStats with a given store ID. func (s *StoresStats) GetRollingStoreStats(storeID uint64) *RollingStoreStats { + s.RLock() + defer s.RUnlock() return s.rollingStoresStats[storeID] } // Observe records the current store status with a given store. func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats) { + s.RLock() + defer s.RUnlock() s.rollingStoresStats[storeID].Observe(stats) } // UpdateTotalBytesRate updates the total bytes write rate and read rate. func (s *StoresStats) UpdateTotalBytesRate(stores *core.StoresInfo) { + s.RLock() + defer s.RUnlock() var totalBytesWriteRate float64 var totalBytesReadRate float64 var writeRate, readRate float64 @@ -83,6 +94,8 @@ func (s *StoresStats) TotalBytesReadRate() float64 { // GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo. func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { writeRate, _ := stats.GetBytesRate() @@ -93,6 +106,8 @@ func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 { // GetStoresBytesReadStat returns the bytes read stat of all StoreInfo. func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { _, readRate := stats.GetBytesRate() @@ -103,6 +118,8 @@ func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 { // GetStoresKeysWriteStat returns the keys write stat of all StoreInfo. func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { res[storeID] = uint64(stats.GetKeysWriteRate()) @@ -112,6 +129,8 @@ func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 { // GetStoresKeysReadStat returns the bytes read stat of all StoreInfo. func (s *StoresStats) GetStoresKeysReadStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { res[storeID] = uint64(stats.GetKeysReadRate())