From cb2397b92353554c9bc446030fb09e2bd261a4fb Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 3 Jul 2019 17:56:27 +0800 Subject: [PATCH 1/3] not consider store limit when executing merge Signed-off-by: Ryan Leung --- server/checker/merge_checker_test.go | 36 +++++++++++++++++++++++----- server/schedule/operator.go | 4 ++-- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/server/checker/merge_checker_test.go b/server/checker/merge_checker_test.go index e2a9c47d2e9..dee8c030b00 100644 --- a/server/checker/merge_checker_test.go +++ b/server/checker/merge_checker_test.go @@ -14,17 +14,23 @@ package checker import ( + "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pkg/mock/mockcluster" + "github.com/pingcap/pd/pkg/mock/mockhbstream" "github.com/pingcap/pd/pkg/mock/mockoption" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" ) +func TestChecker(t *testing.T) { + TestingT(t) +} + var _ = Suite(&testMergeCheckerSuite{}) type testMergeCheckerSuite struct { @@ -152,10 +158,10 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { ops := s.mc.Check(s.regions[2]) s.checkSteps(c, ops[0], []schedule.OperatorStep{ schedule.TransferLeader{FromStore: 6, ToStore: 5}, - schedule.AddLearner{ToStore: 1, PeerID: 1}, + schedule.AddLightLearner{ToStore: 1, PeerID: 1}, schedule.PromoteLearner{ToStore: 1, PeerID: 1}, schedule.RemovePeer{FromStore: 2}, - schedule.AddLearner{ToStore: 4, PeerID: 2}, + schedule.AddLightLearner{ToStore: 4, PeerID: 2}, schedule.PromoteLearner{ToStore: 4, PeerID: 2}, schedule.RemovePeer{FromStore: 6}, schedule.MergeRegion{ @@ -185,7 +191,7 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { s.cluster.PutRegion(s.regions[2]) ops = s.mc.Check(s.regions[2]) s.checkSteps(c, ops[0], []schedule.OperatorStep{ - schedule.AddLearner{ToStore: 4, PeerID: 3}, + schedule.AddLightLearner{ToStore: 4, PeerID: 3}, schedule.PromoteLearner{ToStore: 4, PeerID: 3}, schedule.RemovePeer{FromStore: 6}, schedule.MergeRegion{ @@ -234,13 +240,13 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { s.cluster.PutRegion(s.regions[2]) ops = s.mc.Check(s.regions[2]) s.checkSteps(c, ops[0], []schedule.OperatorStep{ - schedule.AddLearner{ToStore: 1, PeerID: 4}, + schedule.AddLightLearner{ToStore: 1, PeerID: 4}, schedule.PromoteLearner{ToStore: 1, PeerID: 4}, schedule.RemovePeer{FromStore: 3}, - schedule.AddLearner{ToStore: 4, PeerID: 5}, + schedule.AddLightLearner{ToStore: 4, PeerID: 5}, schedule.PromoteLearner{ToStore: 4, PeerID: 5}, schedule.RemovePeer{FromStore: 6}, - schedule.AddLearner{ToStore: 5, PeerID: 6}, + schedule.AddLightLearner{ToStore: 5, PeerID: 6}, schedule.PromoteLearner{ToStore: 5, PeerID: 6}, schedule.TransferLeader{FromStore: 2, ToStore: 1}, schedule.RemovePeer{FromStore: 2}, @@ -258,3 +264,21 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { }, }) } + +func (s *testMergeCheckerSuite) TestStorelimit(c *C) { + oc := schedule.NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream()) + s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour + s.cluster.ScheduleOptions.StoreBalanceRate = 0.0 + s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{ + {Id: 109, StoreId: 2}, + {Id: 110, StoreId: 3}, + {Id: 111, StoreId: 6}, + }), core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2})) + s.cluster.PutRegion(s.regions[2]) + ops := s.mc.Check(s.regions[2]) + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } + c.Assert(oc.AddOperator(ops...), IsTrue) +} diff --git a/server/schedule/operator.go b/server/schedule/operator.go index dd3714aee9a..17cdb1304e6 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -749,11 +749,11 @@ func matchPeerSteps(cluster Cluster, source *core.RegionInfo, target *core.Regio } if cluster.IsRaftLearnerEnabled() { addSteps = append(addSteps, - AddLearner{ToStore: storeID, PeerID: peer.Id}, + AddLightLearner{ToStore: storeID, PeerID: peer.Id}, PromoteLearner{ToStore: storeID, PeerID: peer.Id}, ) } else { - addSteps = append(addSteps, AddPeer{ToStore: storeID, PeerID: peer.Id}) + addSteps = append(addSteps, AddLightPeer{ToStore: storeID, PeerID: peer.Id}) } toAdds = append(toAdds, addSteps) From 66e1d75ad4a698582f959eccd88608476f23bae8 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 4 Jul 2019 13:05:57 +0800 Subject: [PATCH 2/3] set influence according to region size Signed-off-by: Ryan Leung --- server/checker/merge_checker_test.go | 53 +++++++++++++++++++--------- server/schedule/operator.go | 26 ++++++++++---- server/schedule/operator_test.go | 34 +++++++++--------- 3 files changed, 74 insertions(+), 39 deletions(-) diff --git a/server/checker/merge_checker_test.go b/server/checker/merge_checker_test.go index dee8c030b00..ab43f642750 100644 --- a/server/checker/merge_checker_test.go +++ b/server/checker/merge_checker_test.go @@ -158,10 +158,10 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { ops := s.mc.Check(s.regions[2]) s.checkSteps(c, ops[0], []schedule.OperatorStep{ schedule.TransferLeader{FromStore: 6, ToStore: 5}, - schedule.AddLightLearner{ToStore: 1, PeerID: 1}, + schedule.AddLearner{ToStore: 1, PeerID: 1}, schedule.PromoteLearner{ToStore: 1, PeerID: 1}, schedule.RemovePeer{FromStore: 2}, - schedule.AddLightLearner{ToStore: 4, PeerID: 2}, + schedule.AddLearner{ToStore: 4, PeerID: 2}, schedule.PromoteLearner{ToStore: 4, PeerID: 2}, schedule.RemovePeer{FromStore: 6}, schedule.MergeRegion{ @@ -191,7 +191,7 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { s.cluster.PutRegion(s.regions[2]) ops = s.mc.Check(s.regions[2]) s.checkSteps(c, ops[0], []schedule.OperatorStep{ - schedule.AddLightLearner{ToStore: 4, PeerID: 3}, + schedule.AddLearner{ToStore: 4, PeerID: 3}, schedule.PromoteLearner{ToStore: 4, PeerID: 3}, schedule.RemovePeer{FromStore: 6}, schedule.MergeRegion{ @@ -240,13 +240,13 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { s.cluster.PutRegion(s.regions[2]) ops = s.mc.Check(s.regions[2]) s.checkSteps(c, ops[0], []schedule.OperatorStep{ - schedule.AddLightLearner{ToStore: 1, PeerID: 4}, + schedule.AddLearner{ToStore: 1, PeerID: 4}, schedule.PromoteLearner{ToStore: 1, PeerID: 4}, schedule.RemovePeer{FromStore: 3}, - schedule.AddLightLearner{ToStore: 4, PeerID: 5}, + schedule.AddLearner{ToStore: 4, PeerID: 5}, schedule.PromoteLearner{ToStore: 4, PeerID: 5}, schedule.RemovePeer{FromStore: 6}, - schedule.AddLightLearner{ToStore: 5, PeerID: 6}, + schedule.AddLearner{ToStore: 5, PeerID: 6}, schedule.PromoteLearner{ToStore: 5, PeerID: 6}, schedule.TransferLeader{FromStore: 2, ToStore: 1}, schedule.RemovePeer{FromStore: 2}, @@ -268,17 +268,38 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { func (s *testMergeCheckerSuite) TestStorelimit(c *C) { oc := schedule.NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream()) s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour - s.cluster.ScheduleOptions.StoreBalanceRate = 0.0 - s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{ - {Id: 109, StoreId: 2}, - {Id: 110, StoreId: 3}, - {Id: 111, StoreId: 6}, - }), core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2})) + s.cluster.ScheduleOptions.StoreBalanceRate = 60 + s.regions[2] = s.regions[2].Clone( + core.SetPeers([]*metapb.Peer{ + {Id: 109, StoreId: 2}, + {Id: 110, StoreId: 3}, + {Id: 111, StoreId: 6}, + }), + core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}), + ) s.cluster.PutRegion(s.regions[2]) ops := s.mc.Check(s.regions[2]) - c.Assert(oc.AddOperator(ops...), IsTrue) - for _, op := range ops { - oc.RemoveOperator(op) + c.Assert(ops, NotNil) + // The size of Region is less or equal than 1MB. + for i := 0; i < 50; i++ { + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } + } + s.regions[2] = s.regions[2].Clone( + core.SetApproximateSize(2), + core.SetApproximateKeys(2), + ) + s.cluster.PutRegion(s.regions[2]) + ops = s.mc.Check(s.regions[2]) + c.Assert(ops, NotNil) + // The size of Region is more than 1MB but no more than 20MB. + for i := 0; i < 5; i++ { + c.Assert(oc.AddOperator(ops...), IsTrue) + for _, op := range ops { + oc.RemoveOperator(op) + } } - c.Assert(oc.AddOperator(ops...), IsTrue) + c.Assert(oc.AddOperator(ops...), IsFalse) } diff --git a/server/schedule/operator.go b/server/schedule/operator.go index 17cdb1304e6..7d69feb6a51 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -39,6 +39,10 @@ const ( RegionOperatorWaitTime = 10 * time.Minute // RegionInfluence represents the influence of a operator step, which is used by ratelimit. RegionInfluence int64 = 1000 + // smallRegionInfluence represents the influence of a operator step, which is used by ratelimit. + smallRegionInfluence int64 = 200 + // smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it. + smallRegionThreshold int64 = 20 ) // OperatorStep describes the basic scheduling steps that can not be subdivided. @@ -98,9 +102,14 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool { func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(ap.ToStore) - to.RegionSize += region.GetApproximateSize() + regionSize := region.GetApproximateSize() + to.RegionSize += regionSize to.RegionCount++ - to.StepCost += RegionInfluence + if regionSize > smallRegionThreshold { + to.StepCost += RegionInfluence + } else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize { + to.StepCost += smallRegionInfluence + } } // AddLearner is an OperatorStep that adds a region learner peer. @@ -128,9 +137,14 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool { func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(al.ToStore) - to.RegionSize += region.GetApproximateSize() + regionSize := region.GetApproximateSize() + to.RegionSize += regionSize to.RegionCount++ - to.StepCost += RegionInfluence + if regionSize > smallRegionThreshold { + to.StepCost += RegionInfluence + } else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize { + to.StepCost += smallRegionInfluence + } } // PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter. @@ -749,11 +763,11 @@ func matchPeerSteps(cluster Cluster, source *core.RegionInfo, target *core.Regio } if cluster.IsRaftLearnerEnabled() { addSteps = append(addSteps, - AddLightLearner{ToStore: storeID, PeerID: peer.Id}, + AddLearner{ToStore: storeID, PeerID: peer.Id}, PromoteLearner{ToStore: storeID, PeerID: peer.Id}, ) } else { - addSteps = append(addSteps, AddLightPeer{ToStore: storeID, PeerID: peer.Id}) + addSteps = append(addSteps, AddPeer{ToStore: storeID, PeerID: peer.Id}) } toAdds = append(toAdds, addSteps) diff --git a/server/schedule/operator_test.go b/server/schedule/operator_test.go index ebed4e45682..674c985d364 100644 --- a/server/schedule/operator_test.go +++ b/server/schedule/operator_test.go @@ -43,7 +43,7 @@ func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, pe leader = peer } } - regionInfo := core.NewRegionInfo(®ion, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) + regionInfo := core.NewRegionInfo(®ion, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50)) return regionInfo } @@ -124,71 +124,71 @@ func (s *testOperatorSuite) TestInfluence(c *C) { c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ LeaderSize: 0, LeaderCount: 0, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, RegionSize: 0, RegionCount: 0, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) RemovePeer{FromStore: 1}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, - RegionSize: -10, + RegionSize: -50, RegionCount: -1, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) MergeRegion{IsPassive: false}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -1, - RegionSize: -10, + RegionSize: -50, RegionCount: -1, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 1, StepCost: 1000, }) MergeRegion{IsPassive: true}.Influence(opInfluence, region) c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{ - LeaderSize: -10, + LeaderSize: -50, LeaderCount: -2, - RegionSize: -10, + RegionSize: -50, RegionCount: -2, StepCost: 0, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ - LeaderSize: 10, + LeaderSize: 50, LeaderCount: 1, - RegionSize: 10, + RegionSize: 50, RegionCount: 0, StepCost: 1000, }) From f01f45d357bf8b1723875aab333de4cdac66191b Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 4 Jul 2019 15:59:11 +0800 Subject: [PATCH 3/3] fix race problems Signed-off-by: Ryan Leung --- server/cluster.go | 2 +- server/schedule/operator.go | 3 ++- server/statistics/store.go | 19 +++++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/server/cluster.go b/server/cluster.go index 12f359d900f..96887ab00dc 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -379,8 +379,8 @@ func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionI // UpdateStoreLabels updates a store's location labels. func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel) error { c.RLock() - defer c.RUnlock() store := c.cachedCluster.GetStore(storeID) + c.RUnlock() if store == nil { return errors.Errorf("invalid store ID %d, not found", storeID) } diff --git a/server/schedule/operator.go b/server/schedule/operator.go index 7d69feb6a51..c1f65a17964 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -39,7 +39,8 @@ const ( RegionOperatorWaitTime = 10 * time.Minute // RegionInfluence represents the influence of a operator step, which is used by ratelimit. RegionInfluence int64 = 1000 - // smallRegionInfluence represents the influence of a operator step, which is used by ratelimit. + // smallRegionInfluence represents the influence of a operator step + // when the region size is smaller than smallRegionThreshold, which is used by ratelimit. smallRegionInfluence int64 = 200 // smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it. smallRegionThreshold int64 = 20 diff --git a/server/statistics/store.go b/server/statistics/store.go index 32b324081a1..53fa77d0f5e 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -22,6 +22,7 @@ import ( // StoresStats is a cache hold hot regions. type StoresStats struct { + sync.RWMutex rollingStoresStats map[uint64]*RollingStoreStats bytesReadRate float64 bytesWriteRate float64 @@ -36,26 +37,36 @@ func NewStoresStats() *StoresStats { // CreateRollingStoreStats creates RollingStoreStats with a given store ID. func (s *StoresStats) CreateRollingStoreStats(storeID uint64) { + s.Lock() + defer s.Unlock() s.rollingStoresStats[storeID] = newRollingStoreStats() } // RemoveRollingStoreStats removes RollingStoreStats with a given store ID. func (s *StoresStats) RemoveRollingStoreStats(storeID uint64) { + s.Lock() + defer s.Unlock() delete(s.rollingStoresStats, storeID) } // GetRollingStoreStats gets RollingStoreStats with a given store ID. func (s *StoresStats) GetRollingStoreStats(storeID uint64) *RollingStoreStats { + s.RLock() + defer s.RUnlock() return s.rollingStoresStats[storeID] } // Observe records the current store status with a given store. func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats) { + s.RLock() + defer s.RUnlock() s.rollingStoresStats[storeID].Observe(stats) } // UpdateTotalBytesRate updates the total bytes write rate and read rate. func (s *StoresStats) UpdateTotalBytesRate(stores *core.StoresInfo) { + s.RLock() + defer s.RUnlock() var totalBytesWriteRate float64 var totalBytesReadRate float64 var writeRate, readRate float64 @@ -83,6 +94,8 @@ func (s *StoresStats) TotalBytesReadRate() float64 { // GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo. func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { writeRate, _ := stats.GetBytesRate() @@ -93,6 +106,8 @@ func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 { // GetStoresBytesReadStat returns the bytes read stat of all StoreInfo. func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { _, readRate := stats.GetBytesRate() @@ -103,6 +118,8 @@ func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 { // GetStoresKeysWriteStat returns the keys write stat of all StoreInfo. func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { res[storeID] = uint64(stats.GetKeysWriteRate()) @@ -112,6 +129,8 @@ func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 { // GetStoresKeysReadStat returns the bytes read stat of all StoreInfo. func (s *StoresStats) GetStoresKeysReadStat() map[uint64]uint64 { + s.RLock() + defer s.RUnlock() res := make(map[uint64]uint64, len(s.rollingStoresStats)) for storeID, stats := range s.rollingStoresStats { res[storeID] = uint64(stats.GetKeysReadRate())