From b6e7bfa67733514440f6eacde9125a7d1e904171 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 13 Sep 2021 17:35:52 +0800 Subject: [PATCH 1/4] scheduler: allow empty region to be scheduled and use a sperate tolerance config in scatter range scheduler Signed-off-by: lhy1024 --- pkg/mock/mockcluster/mockcluster.go | 4 ++++ server/cluster/cluster.go | 4 ++++ server/core/kind.go | 8 ++++++++ server/schedule/opt/healthy.go | 8 +++++++- server/schedule/opt/opts.go | 1 + server/schedule/range_cluster.go | 4 ++++ server/schedulers/balance_test.go | 27 +++++++++++++-------------- server/schedulers/scatter_range.go | 2 +- server/schedulers/utils.go | 10 +++++++++- 9 files changed, 51 insertions(+), 17 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 198d526cf6b..069ccbc07d6 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -69,6 +69,10 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { return clus } +func (mc *Cluster) GetClusterType() core.ClusterType { + return core.MockCluster +} + // GetOpts returns the cluster configuration. func (mc *Cluster) GetOpts() *config.PersistOptions { return mc.PersistOptions diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index f769d752660..b9da50db2ce 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -460,6 +460,10 @@ func (c *RaftCluster) SetStorage(s *core.Storage) { c.storage = s } +func (c *RaftCluster) GetClusterType() core.ClusterType { + return core.RaftCluster +} + // GetOpts returns cluster's configuration. func (c *RaftCluster) GetOpts() *config.PersistOptions { return c.opt diff --git a/server/core/kind.go b/server/core/kind.go index c28de0624c8..2e3e0a2dccd 100644 --- a/server/core/kind.go +++ b/server/core/kind.go @@ -129,3 +129,11 @@ func StringToKeyType(input string) KeyType { panic("invalid key type: " + input) } } + +type ClusterType int + +const ( + RaftCluster ClusterType = iota + RangeCluster + MockCluster +) diff --git a/server/schedule/opt/healthy.go b/server/schedule/opt/healthy.go index 420911a6557..7fdf192cd0d 100644 --- a/server/schedule/opt/healthy.go +++ b/server/schedule/opt/healthy.go @@ -53,7 +53,13 @@ func HealthAllowPending(cluster Cluster) func(*core.RegionInfo) bool { // AllowBalanceEmptyRegion returns a function that checks if a region is an empty region and can be balanced. func AllowBalanceEmptyRegion(cluster Cluster) func(*core.RegionInfo) bool { - return func(region *core.RegionInfo) bool { return IsEmptyRegionAllowBalance(cluster, region) } + switch cluster.GetClusterType() { + case core.RangeCluster: + // allow empty region to be scheduled in range cluster + return func(region *core.RegionInfo) bool { return true } + default: + return func(region *core.RegionInfo) bool { return IsEmptyRegionAllowBalance(cluster, region) } + } } // IsRegionReplicated checks if a region is fully replicated. When placement diff --git a/server/schedule/opt/opts.go b/server/schedule/opt/opts.go index c100b12b111..e98b071e11f 100644 --- a/server/schedule/opt/opts.go +++ b/server/schedule/opt/opts.go @@ -38,6 +38,7 @@ type Cluster interface { statistics.RegionStatInformer statistics.StoreStatInformer + GetClusterType() core.ClusterType GetOpts() *config.PersistOptions AllocID() (uint64, error) FitRegion(*core.RegionInfo) *placement.RegionFit diff --git a/server/schedule/range_cluster.go b/server/schedule/range_cluster.go index 9d51b3e4a20..6855003f8d5 100644 --- a/server/schedule/range_cluster.go +++ b/server/schedule/range_cluster.go @@ -40,6 +40,10 @@ func GenRangeCluster(cluster opt.Cluster, startKey, endKey []byte) *RangeCluster } } +func (r *RangeCluster) GetClusterType() core.ClusterType { + return core.RangeCluster +} + func (r *RangeCluster) updateStoreInfo(s *core.StoreInfo) *core.StoreInfo { id := s.GetID() diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 62f1b93ccc9..3dcb5153ddb 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -1053,28 +1053,29 @@ func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) { c.Assert(mb.IsScheduleAllowed(tc), IsFalse) } -var _ = Suite(&testScatterRangeLeaderSuite{}) +var _ = Suite(&testScatterRangeSuite{}) -type testScatterRangeLeaderSuite struct { +type testScatterRangeSuite struct { ctx context.Context cancel context.CancelFunc } -func (s *testScatterRangeLeaderSuite) SetUpSuite(c *C) { +func (s *testScatterRangeSuite) SetUpSuite(c *C) { s.ctx, s.cancel = context.WithCancel(context.Background()) } -func (s *testScatterRangeLeaderSuite) TearDownSuite(c *C) { +func (s *testScatterRangeSuite) TearDownSuite(c *C) { s.cancel() } -func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { +func (s *testScatterRangeSuite) TestBalance(c *C) { opt := config.NewTestOptions() // TODO: enable palcementrules opt.SetPlacementRuleEnabled(false) tc := mockcluster.NewCluster(s.ctx, opt) tc.DisableFeature(versioninfo.JointConsensus) - tc.SetTolerantSizeRatio(2.5) + // range cluster use a special tolerant ratio, cluster opt take no impact + tc.SetTolerantSizeRatio(10000) // Add stores 1,2,3,4,5. tc.AddRegionStore(1, 0) tc.AddRegionStore(2, 0) @@ -1099,17 +1100,16 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { }) id += 4 } - // empty case + // empty region case regions[49].EndKey = []byte("") for _, meta := range regions { leader := rand.Intn(4) % 3 regionInfo := core.NewRegionInfo( meta, meta.Peers[leader], - core.SetApproximateKeys(96), - core.SetApproximateSize(96), + core.SetApproximateKeys(1), + core.SetApproximateSize(1), ) - tc.Regions.SetRegion(regionInfo) } for i := 0; i < 100; i++ { @@ -1133,7 +1133,7 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { } } -func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) { +func (s *testScatterRangeSuite) TestBalanceLeaderLimit(c *C) { opt := config.NewTestOptions() opt.SetPlacementRuleEnabled(false) tc := mockcluster.NewCluster(s.ctx, opt) @@ -1164,7 +1164,6 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) { id += 4 } - // empty case regions[49].EndKey = []byte("") for _, meta := range regions { leader := rand.Intn(4) % 3 @@ -1209,7 +1208,7 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) { c.Check(maxLeaderCount-minLeaderCount, Greater, 10) } -func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) { +func (s *testScatterRangeSuite) TestConcurrencyUpdateConfig(c *C) { opt := config.NewTestOptions() tc := mockcluster.NewCluster(s.ctx, opt) oc := schedule.NewOperatorController(s.ctx, nil, nil) @@ -1235,7 +1234,7 @@ func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) { ch <- struct{}{} } -func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) { +func (s *testScatterRangeSuite) TestBalanceWhenRegionNotHeartbeat(c *C) { opt := config.NewTestOptions() tc := mockcluster.NewCluster(s.ctx, opt) // Add stores 1,2,3. diff --git a/server/schedulers/scatter_range.go b/server/schedulers/scatter_range.go index 867b62635d2..e5e9b066807 100644 --- a/server/schedulers/scatter_range.go +++ b/server/schedulers/scatter_range.go @@ -216,7 +216,7 @@ func (l *scatterRangeScheduler) Schedule(cluster opt.Cluster) []*operator.Operat schedulerCounter.WithLabelValues(l.GetName(), "schedule").Inc() // isolate a new cluster according to the key range c := schedule.GenRangeCluster(cluster, l.config.GetStartKey(), l.config.GetEndKey()) - c.SetTolerantSizeRatio(2) + c.SetTolerantSizeRatio(1) if l.allowBalanceLeader(cluster) { ops := l.balanceLeader.Schedule(c) if len(ops) > 0 { diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 65e80bb45ca..1c5c4b151e7 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -25,6 +25,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/opt" "github.com/tikv/pd/server/statistics" @@ -148,7 +149,14 @@ func (p *balancePlan) getTolerantResource() int64 { } func adjustTolerantRatio(cluster opt.Cluster, kind core.ScheduleKind) float64 { - tolerantSizeRatio := cluster.GetOpts().GetTolerantSizeRatio() + var tolerantSizeRatio float64 + switch cluster.GetClusterType() { + case core.RangeCluster: + // range cluster use a separate configuration + tolerantSizeRatio = cluster.(*schedule.RangeCluster).GetTolerantSizeRatio() + default: + tolerantSizeRatio = cluster.GetOpts().GetTolerantSizeRatio() + } if kind.Resource == core.LeaderKind && kind.Policy == core.ByCount { if tolerantSizeRatio == 0 { return leaderTolerantSizeRatio From 0ef1e7ef4fc63610a61313015363095374a0f3b9 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 13 Sep 2021 17:44:35 +0800 Subject: [PATCH 2/4] fix lint Signed-off-by: lhy1024 --- pkg/mock/mockcluster/mockcluster.go | 1 + server/cluster/cluster.go | 1 + server/core/kind.go | 4 ++++ server/schedule/range_cluster.go | 1 + server/schedulers/scatter_range.go | 2 +- 5 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 069ccbc07d6..a96aec60fe9 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -69,6 +69,7 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { return clus } +// GetClusterType returns the cluster type func (mc *Cluster) GetClusterType() core.ClusterType { return core.MockCluster } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b9da50db2ce..e0d2359ebc5 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -460,6 +460,7 @@ func (c *RaftCluster) SetStorage(s *core.Storage) { c.storage = s } +// GetClusterType returns the cluster type func (c *RaftCluster) GetClusterType() core.ClusterType { return core.RaftCluster } diff --git a/server/core/kind.go b/server/core/kind.go index 2e3e0a2dccd..639ca3f1d52 100644 --- a/server/core/kind.go +++ b/server/core/kind.go @@ -130,10 +130,14 @@ func StringToKeyType(input string) KeyType { } } +// ClusterType means the type of cluster type ClusterType int const ( + // RaftCluster means the ClusterType of RaftCluster RaftCluster ClusterType = iota + // RangeCluster means the ClusterType of RangeCluster RangeCluster + // MockCluster means the ClusterType of MockCluster MockCluster ) diff --git a/server/schedule/range_cluster.go b/server/schedule/range_cluster.go index 6855003f8d5..6f37a6ad3a3 100644 --- a/server/schedule/range_cluster.go +++ b/server/schedule/range_cluster.go @@ -40,6 +40,7 @@ func GenRangeCluster(cluster opt.Cluster, startKey, endKey []byte) *RangeCluster } } +// GetClusterType returns the cluster type func (r *RangeCluster) GetClusterType() core.ClusterType { return core.RangeCluster } diff --git a/server/schedulers/scatter_range.go b/server/schedulers/scatter_range.go index e5e9b066807..867b62635d2 100644 --- a/server/schedulers/scatter_range.go +++ b/server/schedulers/scatter_range.go @@ -216,7 +216,7 @@ func (l *scatterRangeScheduler) Schedule(cluster opt.Cluster) []*operator.Operat schedulerCounter.WithLabelValues(l.GetName(), "schedule").Inc() // isolate a new cluster according to the key range c := schedule.GenRangeCluster(cluster, l.config.GetStartKey(), l.config.GetEndKey()) - c.SetTolerantSizeRatio(1) + c.SetTolerantSizeRatio(2) if l.allowBalanceLeader(cluster) { ops := l.balanceLeader.Schedule(c) if len(ops) > 0 { From 861f7689150689f79f91a9bde70a8f90e76cf4d5 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 13 Sep 2021 19:10:01 +0800 Subject: [PATCH 3/4] address comments Signed-off-by: lhy1024 --- pkg/mock/mockcluster/mockcluster.go | 5 ----- server/cluster/cluster.go | 5 ----- server/core/kind.go | 11 ----------- server/schedule/opt/healthy.go | 8 +------- server/schedule/opt/opts.go | 1 - server/schedule/range_cluster.go | 5 ----- server/schedulers/balance_region.go | 19 +++++++++++++++---- server/schedulers/utils.go | 6 +++--- 8 files changed, 19 insertions(+), 41 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index a96aec60fe9..198d526cf6b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -69,11 +69,6 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { return clus } -// GetClusterType returns the cluster type -func (mc *Cluster) GetClusterType() core.ClusterType { - return core.MockCluster -} - // GetOpts returns the cluster configuration. func (mc *Cluster) GetOpts() *config.PersistOptions { return mc.PersistOptions diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e0d2359ebc5..f769d752660 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -460,11 +460,6 @@ func (c *RaftCluster) SetStorage(s *core.Storage) { c.storage = s } -// GetClusterType returns the cluster type -func (c *RaftCluster) GetClusterType() core.ClusterType { - return core.RaftCluster -} - // GetOpts returns cluster's configuration. func (c *RaftCluster) GetOpts() *config.PersistOptions { return c.opt diff --git a/server/core/kind.go b/server/core/kind.go index 639ca3f1d52..517514f293e 100644 --- a/server/core/kind.go +++ b/server/core/kind.go @@ -130,14 +130,3 @@ func StringToKeyType(input string) KeyType { } } -// ClusterType means the type of cluster -type ClusterType int - -const ( - // RaftCluster means the ClusterType of RaftCluster - RaftCluster ClusterType = iota - // RangeCluster means the ClusterType of RangeCluster - RangeCluster - // MockCluster means the ClusterType of MockCluster - MockCluster -) diff --git a/server/schedule/opt/healthy.go b/server/schedule/opt/healthy.go index 7fdf192cd0d..420911a6557 100644 --- a/server/schedule/opt/healthy.go +++ b/server/schedule/opt/healthy.go @@ -53,13 +53,7 @@ func HealthAllowPending(cluster Cluster) func(*core.RegionInfo) bool { // AllowBalanceEmptyRegion returns a function that checks if a region is an empty region and can be balanced. func AllowBalanceEmptyRegion(cluster Cluster) func(*core.RegionInfo) bool { - switch cluster.GetClusterType() { - case core.RangeCluster: - // allow empty region to be scheduled in range cluster - return func(region *core.RegionInfo) bool { return true } - default: - return func(region *core.RegionInfo) bool { return IsEmptyRegionAllowBalance(cluster, region) } - } + return func(region *core.RegionInfo) bool { return IsEmptyRegionAllowBalance(cluster, region) } } // IsRegionReplicated checks if a region is fully replicated. When placement diff --git a/server/schedule/opt/opts.go b/server/schedule/opt/opts.go index e98b071e11f..c100b12b111 100644 --- a/server/schedule/opt/opts.go +++ b/server/schedule/opt/opts.go @@ -38,7 +38,6 @@ type Cluster interface { statistics.RegionStatInformer statistics.StoreStatInformer - GetClusterType() core.ClusterType GetOpts() *config.PersistOptions AllocID() (uint64, error) FitRegion(*core.RegionInfo) *placement.RegionFit diff --git a/server/schedule/range_cluster.go b/server/schedule/range_cluster.go index 6f37a6ad3a3..9d51b3e4a20 100644 --- a/server/schedule/range_cluster.go +++ b/server/schedule/range_cluster.go @@ -40,11 +40,6 @@ func GenRangeCluster(cluster opt.Cluster, startKey, endKey []byte) *RangeCluster } } -// GetClusterType returns the cluster type -func (r *RangeCluster) GetClusterType() core.ClusterType { - return core.RangeCluster -} - func (r *RangeCluster) updateStoreInfo(s *core.StoreInfo) *core.StoreInfo { id := s.GetID() diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 27e039f8b21..f1ad5b38b56 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -149,23 +149,34 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera return stores[i].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), iOp) > stores[j].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), jOp) }) + + var allowBalanceEmptyRegion func(*core.RegionInfo) bool + + switch cluster.(type) { + case *schedule.RangeCluster: + // allow empty region to be scheduled in range cluster + allowBalanceEmptyRegion = func(region *core.RegionInfo) bool { return true } + default: + allowBalanceEmptyRegion = opt.AllowBalanceEmptyRegion(cluster) + } + for _, plan.source = range stores { for i := 0; i < balanceRegionRetryLimit; i++ { schedulerCounter.WithLabelValues(s.GetName(), "total").Inc() // Priority pick the region that has a pending peer. // Pending region may means the disk is overload, remove the pending region firstly. - plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster)) + plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion) if plan.region == nil { // Then pick the region that has a follower in the source store. - plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster)) + plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { // Then pick the region has the leader in the source store. - plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster)) + plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { // Finally pick learner. - plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster)) + plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc() diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 1c5c4b151e7..458d733658b 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -150,10 +150,10 @@ func (p *balancePlan) getTolerantResource() int64 { func adjustTolerantRatio(cluster opt.Cluster, kind core.ScheduleKind) float64 { var tolerantSizeRatio float64 - switch cluster.GetClusterType() { - case core.RangeCluster: + switch c := cluster.(type) { + case *schedule.RangeCluster: // range cluster use a separate configuration - tolerantSizeRatio = cluster.(*schedule.RangeCluster).GetTolerantSizeRatio() + tolerantSizeRatio = c.GetTolerantSizeRatio() default: tolerantSizeRatio = cluster.GetOpts().GetTolerantSizeRatio() } From e592319744098695804f32f3ab5dc741f0258eeb Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 13 Sep 2021 19:11:42 +0800 Subject: [PATCH 4/4] fix lint Signed-off-by: lhy1024 --- server/core/kind.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/core/kind.go b/server/core/kind.go index 517514f293e..c28de0624c8 100644 --- a/server/core/kind.go +++ b/server/core/kind.go @@ -129,4 +129,3 @@ func StringToKeyType(input string) KeyType { panic("invalid key type: " + input) } } -