Skip to content

Commit

Permalink
*: make hot region scheduler configurable (#1412)
Browse files Browse the repository at this point in the history
* *: make hot region scheduler configurable

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored and disksing committed Jan 24, 2019
1 parent d67c9ab commit 8d6c936
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 58 deletions.
12 changes: 8 additions & 4 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,14 @@ func (c *clusterInfo) GetRegion(regionID uint64) *core.RegionInfo {
func (c *clusterInfo) IsRegionHot(id uint64) bool {
c.RLock()
defer c.RUnlock()
return c.core.IsRegionHot(id, c.GetHotRegionLowThreshold())
return c.core.IsRegionHot(id, c.GetHotRegionCacheHitsThreshold())
}

// RandHotRegionFromStore randomly picks a hot region in specified store.
func (c *clusterInfo) RandHotRegionFromStore(store uint64, kind schedule.FlowKind) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
r := c.core.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionLowThreshold())
r := c.core.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionCacheHitsThreshold())
if r == nil {
return nil
}
Expand Down Expand Up @@ -636,6 +636,10 @@ func (c *clusterInfo) GetMergeScheduleLimit() uint64 {
return c.opt.GetMergeScheduleLimit(namespace.DefaultNamespace)
}

func (c *clusterInfo) GetHotRegionScheduleLimit() uint64 {
return c.opt.GetHotRegionScheduleLimit(namespace.DefaultNamespace)
}

func (c *clusterInfo) GetTolerantSizeRatio() float64 {
return c.opt.GetTolerantSizeRatio()
}
Expand Down Expand Up @@ -684,8 +688,8 @@ func (c *clusterInfo) GetLocationLabels() []string {
return c.opt.GetLocationLabels()
}

func (c *clusterInfo) GetHotRegionLowThreshold() int {
return c.opt.GetHotRegionLowThreshold()
func (c *clusterInfo) GetHotRegionCacheHitsThreshold() int {
return c.opt.GetHotRegionCacheHitsThreshold()
}

func (c *clusterInfo) IsRaftLearnerEnabled() bool {
Expand Down
51 changes: 36 additions & 15 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ type ScheduleConfig struct {
ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit,omitempty" json:"replica-schedule-limit"`
// MergeScheduleLimit is the max coexist merge schedules.
MergeScheduleLimit uint64 `toml:"merge-schedule-limit,omitempty" json:"merge-schedule-limit"`
// HotRegionScheduleLimit is the max coexist hot region schedules.
HotRegionScheduleLimit uint64 `toml:"hot-region-schedule-limit,omitempty" json:"hot-region-schedule-limit"`
// HotRegionCacheHitThreshold is the cache hits threshold of the hot region.
// If the number of times a region hits the hot cache is greater than this
// threshold, it is considered a hot region.
HotRegionCacheHitsThreshold uint64 `toml:"hot-region-cache-hits-threshold,omitempty" json:"hot-region-cache-hits-threshold"`
// TolerantSizeRatio is the ratio of buffer size for balance scheduler.
TolerantSizeRatio float64 `toml:"tolerant-size-ratio,omitempty" json:"tolerant-size-ratio"`
//
Expand Down Expand Up @@ -511,6 +517,8 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
RegionScheduleLimit: c.RegionScheduleLimit,
ReplicaScheduleLimit: c.ReplicaScheduleLimit,
MergeScheduleLimit: c.MergeScheduleLimit,
HotRegionScheduleLimit: c.HotRegionScheduleLimit,
HotRegionCacheHitsThreshold: c.HotRegionCacheHitsThreshold,
TolerantSizeRatio: c.TolerantSizeRatio,
LowSpaceRatio: c.LowSpaceRatio,
HighSpaceRatio: c.HighSpaceRatio,
Expand All @@ -526,21 +534,25 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
}

const (
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultTolerantSizeRatio = 5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultTolerantSizeRatio = 5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
// defaultHotRegionCacheHitsThreshold is the low hit number threshold of the
// hot region.
defautHotRegionCacheHitsThreshold = 3
)

func (c *ScheduleConfig) adjust(meta *configMetaData) error {
Expand Down Expand Up @@ -571,6 +583,12 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("merge-schedule-limit") {
adjustUint64(&c.MergeScheduleLimit, defaultMergeScheduleLimit)
}
if !meta.IsDefined("hot-region-schedule-limit") {
adjustUint64(&c.HotRegionScheduleLimit, defaultHotRegionScheduleLimit)
}
if !meta.IsDefined("hot-region-cache-hits-threshold") {
adjustUint64(&c.HotRegionCacheHitsThreshold, defautHotRegionCacheHitsThreshold)
}
if !meta.IsDefined("tolerant-size-ratio") {
adjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio)
}
Expand Down Expand Up @@ -670,6 +688,8 @@ type NamespaceConfig struct {
ReplicaScheduleLimit uint64 `json:"replica-schedule-limit"`
// MergeScheduleLimit is the max coexist merge schedules.
MergeScheduleLimit uint64 `json:"merge-schedule-limit"`
// HotRegionScheduleLimit is the max coexist hot region schedules.
HotRegionScheduleLimit uint64 `json:"hot-region-schedule-limit"`
// MaxReplicas is the number of replicas for each region.
MaxReplicas uint64 `json:"max-replicas"`
}
Expand All @@ -679,6 +699,7 @@ func (c *NamespaceConfig) adjust(opt *scheduleOption) {
adjustUint64(&c.RegionScheduleLimit, opt.GetRegionScheduleLimit(namespace.DefaultNamespace))
adjustUint64(&c.ReplicaScheduleLimit, opt.GetReplicaScheduleLimit(namespace.DefaultNamespace))
adjustUint64(&c.MergeScheduleLimit, opt.GetMergeScheduleLimit(namespace.DefaultNamespace))
adjustUint64(&c.HotRegionScheduleLimit, opt.GetHotRegionScheduleLimit(namespace.DefaultNamespace))
adjustUint64(&c.MaxReplicas, uint64(opt.GetMaxReplicas(namespace.DefaultNamespace)))
}

Expand Down
16 changes: 14 additions & 2 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ func (o *scheduleOption) GetMergeScheduleLimit(name string) uint64 {
return o.load().MergeScheduleLimit
}

func (o *scheduleOption) GetHotRegionScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
return n.GetHotRegionScheduleLimit()
}
return o.load().HotRegionScheduleLimit
}

func (o *scheduleOption) GetTolerantSizeRatio() float64 {
return o.load().TolerantSizeRatio
}
Expand Down Expand Up @@ -341,8 +348,8 @@ func (o *scheduleOption) adjustScheduleCfg(persistentCfg *Config) {
o.store(scheduleCfg)
}

func (o *scheduleOption) GetHotRegionLowThreshold() int {
return schedule.HotRegionLowThreshold
func (o *scheduleOption) GetHotRegionCacheHitsThreshold() int {
return int(o.load().HotRegionCacheHitsThreshold)
}

func (o *scheduleOption) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
Expand Down Expand Up @@ -437,3 +444,8 @@ func (n *namespaceOption) GetReplicaScheduleLimit() uint64 {
func (n *namespaceOption) GetMergeScheduleLimit() uint64 {
return n.load().MergeScheduleLimit
}

// GetHotRegionScheduleLimit returns the limit for hot region schedule.
func (n *namespaceOption) GetHotRegionScheduleLimit() uint64 {
return n.load().HotRegionScheduleLimit
}
5 changes: 0 additions & 5 deletions server/schedule/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ import (
"github.com/pingcap/pd/server/core"
)

var (
// HotRegionLowThreshold is the low threadshold of hot region
HotRegionLowThreshold = 3
)

const (
// RegionHeartBeatReportInterval is the heartbeat report interval of a region
RegionHeartBeatReportInterval = 60
Expand Down
56 changes: 35 additions & 21 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func (mc *MockCluster) LoadRegion(regionID uint64, followerIds ...uint64) {

// IsRegionHot checks if the region is hot
func (mc *MockCluster) IsRegionHot(id uint64) bool {
return mc.BasicCluster.IsRegionHot(id, mc.GetHotRegionLowThreshold())
return mc.BasicCluster.IsRegionHot(id, mc.GetHotRegionCacheHitsThreshold())
}

// RandHotRegionFromStore random picks a hot region in specify store.
func (mc *MockCluster) RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo {
r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionLowThreshold())
r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold())
if r == nil {
return nil
}
Expand Down Expand Up @@ -399,6 +399,11 @@ func (mc *MockCluster) GetMergeScheduleLimit() uint64 {
return mc.MockSchedulerOptions.GetMergeScheduleLimit(namespace.DefaultNamespace)
}

// GetHotRegionScheduleLimit mocks method.
func (mc *MockCluster) GetHotRegionScheduleLimit() uint64 {
return mc.MockSchedulerOptions.GetHotRegionScheduleLimit(namespace.DefaultNamespace)
}

// GetMaxReplicas mocks method.
func (mc *MockCluster) GetMaxReplicas() int {
return mc.MockSchedulerOptions.GetMaxReplicas(namespace.DefaultNamespace)
Expand All @@ -417,20 +422,22 @@ func (mc *MockCluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabe
}

const (
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 0
defaultMaxMergeRegionKeys = 0
defaultSplitMergeInterval = 0
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 0
defaultMaxMergeRegionKeys = 0
defaultSplitMergeInterval = 0
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
defaultHotRegionCacheHitsThreshold = 3
)

// MockSchedulerOptions is a mock of SchedulerOptions
Expand All @@ -440,6 +447,7 @@ type MockSchedulerOptions struct {
LeaderScheduleLimit uint64
ReplicaScheduleLimit uint64
MergeScheduleLimit uint64
HotRegionScheduleLimit uint64
MaxSnapshotCount uint64
MaxPendingPeerCount uint64
MaxMergeRegionSize uint64
Expand All @@ -448,7 +456,7 @@ type MockSchedulerOptions struct {
MaxStoreDownTime time.Duration
MaxReplicas int
LocationLabels []string
HotRegionLowThreshold int
HotRegionCacheHitsThreshold int
TolerantSizeRatio float64
LowSpaceRatio float64
HighSpaceRatio float64
Expand All @@ -469,13 +477,14 @@ func NewMockSchedulerOptions() *MockSchedulerOptions {
mso.LeaderScheduleLimit = defaultLeaderScheduleLimit
mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit
mso.MergeScheduleLimit = defaultMergeScheduleLimit
mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit
mso.MaxSnapshotCount = defaultMaxSnapshotCount
mso.MaxMergeRegionSize = defaultMaxMergeRegionSize
mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys
mso.SplitMergeInterval = defaultSplitMergeInterval
mso.MaxStoreDownTime = defaultMaxStoreDownTime
mso.MaxReplicas = defaultMaxReplicas
mso.HotRegionLowThreshold = HotRegionLowThreshold
mso.HotRegionCacheHitsThreshold = defaultHotRegionCacheHitsThreshold
mso.MaxPendingPeerCount = defaultMaxPendingPeerCount
mso.TolerantSizeRatio = defaultTolerantSizeRatio
mso.LowSpaceRatio = defaultLowSpaceRatio
Expand Down Expand Up @@ -503,6 +512,11 @@ func (mso *MockSchedulerOptions) GetMergeScheduleLimit(name string) uint64 {
return mso.MergeScheduleLimit
}

// GetHotRegionScheduleLimit mock method
func (mso *MockSchedulerOptions) GetHotRegionScheduleLimit(name string) uint64 {
return mso.HotRegionScheduleLimit
}

// GetMaxSnapshotCount mock method
func (mso *MockSchedulerOptions) GetMaxSnapshotCount() uint64 {
return mso.MaxSnapshotCount
Expand Down Expand Up @@ -543,9 +557,9 @@ func (mso *MockSchedulerOptions) GetLocationLabels() []string {
return mso.LocationLabels
}

// GetHotRegionLowThreshold mock method
func (mso *MockSchedulerOptions) GetHotRegionLowThreshold() int {
return mso.HotRegionLowThreshold
// GetHotRegionCacheHitsThreshold mock method
func (mso *MockSchedulerOptions) GetHotRegionCacheHitsThreshold() int {
return mso.HotRegionCacheHitsThreshold
}

// GetTolerantSizeRatio mock method
Expand Down
3 changes: 2 additions & 1 deletion server/schedule/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Options interface {
GetRegionScheduleLimit() uint64
GetReplicaScheduleLimit() uint64
GetMergeScheduleLimit() uint64
GetHotRegionScheduleLimit() uint64

GetMaxSnapshotCount() uint64
GetMaxPendingPeerCount() uint64
Expand All @@ -40,7 +41,7 @@ type Options interface {
GetMaxReplicas() int
GetLocationLabels() []string

GetHotRegionLowThreshold() int
GetHotRegionCacheHitsThreshold() int
GetTolerantSizeRatio() float64
GetLowSpaceRatio() float64
GetHighSpaceRatio() float64
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) {
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4)
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4)
opt.HotRegionLowThreshold = 0
opt.HotRegionCacheHitsThreshold = 0

// Will transfer a hot region from store 1, because the total count of peers
// which is hot for store 1 is more larger than other stores.
Expand Down Expand Up @@ -1193,7 +1193,7 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) {
tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
// lower than hot read flow rate, but higher than write flow rate
tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
opt.HotRegionLowThreshold = 0
opt.HotRegionCacheHitsThreshold = 0
c.Assert(tc.IsRegionHot(1), IsTrue)
c.Assert(tc.IsRegionHot(11), IsFalse)
// check randomly pick hot region
Expand Down
13 changes: 10 additions & 3 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,20 @@ func (h *balanceHotRegionsScheduler) IsScheduleAllowed(cluster schedule.Cluster)
return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster)
}

func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}

func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < h.limit &&
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.limit, cluster.GetHotRegionScheduleLimit()) &&
h.opController.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < h.limit &&
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.limit, cluster.GetHotRegionScheduleLimit()) &&
h.opController.OperatorCount(schedule.OpRegion) < cluster.GetRegionScheduleLimit()
}

Expand Down Expand Up @@ -205,7 +212,7 @@ func calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.Res
// HotDegree is the update times on the hot cache. If the heartbeat report
// the flow of the region exceeds the threshold, the scheduler will update the region in
// the hot cache and the hotdegree of the region will increase.
if r.HotDegree < cluster.GetHotRegionLowThreshold() {
if r.HotDegree < cluster.GetHotRegionCacheHitsThreshold() {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) {
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4)
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4)
opt.HotRegionLowThreshold = 0
opt.HotRegionCacheHitsThreshold = 0

// try to get an operator
var op []*schedule.Operator
Expand Down
10 changes: 6 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,12 @@ func (s *Server) GetNamespaceConfig(name string) *NamespaceConfig {
}

cfg := &NamespaceConfig{
LeaderScheduleLimit: s.scheduleOpt.GetLeaderScheduleLimit(name),
RegionScheduleLimit: s.scheduleOpt.GetRegionScheduleLimit(name),
ReplicaScheduleLimit: s.scheduleOpt.GetReplicaScheduleLimit(name),
MaxReplicas: uint64(s.scheduleOpt.GetMaxReplicas(name)),
LeaderScheduleLimit: s.scheduleOpt.GetLeaderScheduleLimit(name),
RegionScheduleLimit: s.scheduleOpt.GetRegionScheduleLimit(name),
ReplicaScheduleLimit: s.scheduleOpt.GetReplicaScheduleLimit(name),
HotRegionScheduleLimit: s.scheduleOpt.GetHotRegionScheduleLimit(name),
MergeScheduleLimit: s.scheduleOpt.GetMergeScheduleLimit(name),
MaxReplicas: uint64(s.scheduleOpt.GetMaxReplicas(name)),
}

return cfg
Expand Down
Loading

0 comments on commit 8d6c936

Please sign in to comment.