Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: make hot region scheduler configurable #1412

Merged
merged 4 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,14 @@ func (c *clusterInfo) GetRegion(regionID uint64) *core.RegionInfo {
func (c *clusterInfo) IsRegionHot(id uint64) bool {
c.RLock()
defer c.RUnlock()
return c.core.IsRegionHot(id, c.GetHotRegionLowThreshold())
return c.core.IsRegionHot(id, c.GetHotRegionCacheHitsThreshold())
}

// RandHotRegionFromStore randomly picks a hot region in specified store.
func (c *clusterInfo) RandHotRegionFromStore(store uint64, kind schedule.FlowKind) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
r := c.core.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionLowThreshold())
r := c.core.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionCacheHitsThreshold())
if r == nil {
return nil
}
Expand Down Expand Up @@ -636,6 +636,10 @@ func (c *clusterInfo) GetMergeScheduleLimit() uint64 {
return c.opt.GetMergeScheduleLimit(namespace.DefaultNamespace)
}

func (c *clusterInfo) GetHotRegionScheduleLimit() uint64 {
return c.opt.GetHotRegionScheduleLimit(namespace.DefaultNamespace)
}

func (c *clusterInfo) GetTolerantSizeRatio() float64 {
return c.opt.GetTolerantSizeRatio()
}
Expand Down Expand Up @@ -684,8 +688,8 @@ func (c *clusterInfo) GetLocationLabels() []string {
return c.opt.GetLocationLabels()
}

func (c *clusterInfo) GetHotRegionLowThreshold() int {
return c.opt.GetHotRegionLowThreshold()
func (c *clusterInfo) GetHotRegionCacheHitsThreshold() int {
return c.opt.GetHotRegionCacheHitsThreshold()
}

func (c *clusterInfo) IsRaftLearnerEnabled() bool {
Expand Down
51 changes: 36 additions & 15 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ type ScheduleConfig struct {
ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit,omitempty" json:"replica-schedule-limit"`
// MergeScheduleLimit is the max coexist merge schedules.
MergeScheduleLimit uint64 `toml:"merge-schedule-limit,omitempty" json:"merge-schedule-limit"`
// HotRegionScheduleLimit is the max coexist hot region schedules.
HotRegionScheduleLimit uint64 `toml:"hot-region-schedule-limit,omitempty" json:"hot-region-schedule-limit"`
// HotRegionCacheHitThreshold is the cache hits threshold of the hot region.
// If the number of times a region hits the hot cache is greater than this
// threshold, it is considered a hot region.
HotRegionCacheHitsThreshold uint64 `toml:"hot-region-cache-hits-threshold,omitempty" json:"hot-region-cache-hits-threshold"`
// TolerantSizeRatio is the ratio of buffer size for balance scheduler.
TolerantSizeRatio float64 `toml:"tolerant-size-ratio,omitempty" json:"tolerant-size-ratio"`
//
Expand Down Expand Up @@ -511,6 +517,8 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
RegionScheduleLimit: c.RegionScheduleLimit,
ReplicaScheduleLimit: c.ReplicaScheduleLimit,
MergeScheduleLimit: c.MergeScheduleLimit,
HotRegionScheduleLimit: c.HotRegionScheduleLimit,
HotRegionCacheHitsThreshold: c.HotRegionCacheHitsThreshold,
TolerantSizeRatio: c.TolerantSizeRatio,
LowSpaceRatio: c.LowSpaceRatio,
HighSpaceRatio: c.HighSpaceRatio,
Expand All @@ -526,21 +534,25 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
}

const (
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultTolerantSizeRatio = 5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultTolerantSizeRatio = 5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
// defaultHotRegionCacheHitsThreshold is the low hit number threshold of the
// hot region.
defautHotRegionCacheHitsThreshold = 3
)

func (c *ScheduleConfig) adjust(meta *configMetaData) error {
Expand Down Expand Up @@ -571,6 +583,12 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("merge-schedule-limit") {
adjustUint64(&c.MergeScheduleLimit, defaultMergeScheduleLimit)
}
if !meta.IsDefined("hot-region-schedule-limit") {
adjustUint64(&c.HotRegionScheduleLimit, defaultHotRegionScheduleLimit)
}
if !meta.IsDefined("hot-region-cache-hits-threshold") {
adjustUint64(&c.HotRegionCacheHitsThreshold, defautHotRegionCacheHitsThreshold)
}
if !meta.IsDefined("tolerant-size-ratio") {
adjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio)
}
Expand Down Expand Up @@ -670,6 +688,8 @@ type NamespaceConfig struct {
ReplicaScheduleLimit uint64 `json:"replica-schedule-limit"`
// MergeScheduleLimit is the max coexist merge schedules.
MergeScheduleLimit uint64 `json:"merge-schedule-limit"`
// HotRegionScheduleLimit is the max coexist hot region schedules.
HotRegionScheduleLimit uint64 `json:"hot-region-schedule-limit"`
// MaxReplicas is the number of replicas for each region.
MaxReplicas uint64 `json:"max-replicas"`
}
Expand All @@ -679,6 +699,7 @@ func (c *NamespaceConfig) adjust(opt *scheduleOption) {
adjustUint64(&c.RegionScheduleLimit, opt.GetRegionScheduleLimit(namespace.DefaultNamespace))
adjustUint64(&c.ReplicaScheduleLimit, opt.GetReplicaScheduleLimit(namespace.DefaultNamespace))
adjustUint64(&c.MergeScheduleLimit, opt.GetMergeScheduleLimit(namespace.DefaultNamespace))
adjustUint64(&c.HotRegionScheduleLimit, opt.GetHotRegionScheduleLimit(namespace.DefaultNamespace))
adjustUint64(&c.MaxReplicas, uint64(opt.GetMaxReplicas(namespace.DefaultNamespace)))
}

Expand Down
16 changes: 14 additions & 2 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ func (o *scheduleOption) GetMergeScheduleLimit(name string) uint64 {
return o.load().MergeScheduleLimit
}

func (o *scheduleOption) GetHotRegionScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
return n.GetHotRegionScheduleLimit()
}
return o.load().HotRegionScheduleLimit
}

func (o *scheduleOption) GetTolerantSizeRatio() float64 {
return o.load().TolerantSizeRatio
}
Expand Down Expand Up @@ -341,8 +348,8 @@ func (o *scheduleOption) adjustScheduleCfg(persistentCfg *Config) {
o.store(scheduleCfg)
}

func (o *scheduleOption) GetHotRegionLowThreshold() int {
return schedule.HotRegionLowThreshold
func (o *scheduleOption) GetHotRegionCacheHitsThreshold() int {
return int(o.load().HotRegionCacheHitsThreshold)
}

func (o *scheduleOption) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
Expand Down Expand Up @@ -437,3 +444,8 @@ func (n *namespaceOption) GetReplicaScheduleLimit() uint64 {
func (n *namespaceOption) GetMergeScheduleLimit() uint64 {
return n.load().MergeScheduleLimit
}

// GetHotRegionScheduleLimit returns the limit for hot region schedule.
func (n *namespaceOption) GetHotRegionScheduleLimit() uint64 {
return n.load().HotRegionScheduleLimit
}
5 changes: 0 additions & 5 deletions server/schedule/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ import (
"github.com/pingcap/pd/server/core"
)

var (
// HotRegionLowThreshold is the low threadshold of hot region
HotRegionLowThreshold = 3
)

const (
// RegionHeartBeatReportInterval is the heartbeat report interval of a region
RegionHeartBeatReportInterval = 60
Expand Down
56 changes: 35 additions & 21 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func (mc *MockCluster) LoadRegion(regionID uint64, followerIds ...uint64) {

// IsRegionHot checks if the region is hot
func (mc *MockCluster) IsRegionHot(id uint64) bool {
return mc.BasicCluster.IsRegionHot(id, mc.GetHotRegionLowThreshold())
return mc.BasicCluster.IsRegionHot(id, mc.GetHotRegionCacheHitsThreshold())
}

// RandHotRegionFromStore random picks a hot region in specify store.
func (mc *MockCluster) RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo {
r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionLowThreshold())
r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold())
if r == nil {
return nil
}
Expand Down Expand Up @@ -399,6 +399,11 @@ func (mc *MockCluster) GetMergeScheduleLimit() uint64 {
return mc.MockSchedulerOptions.GetMergeScheduleLimit(namespace.DefaultNamespace)
}

// GetHotRegionScheduleLimit mocks method.
func (mc *MockCluster) GetHotRegionScheduleLimit() uint64 {
return mc.MockSchedulerOptions.GetHotRegionScheduleLimit(namespace.DefaultNamespace)
}

// GetMaxReplicas mocks method.
func (mc *MockCluster) GetMaxReplicas() int {
return mc.MockSchedulerOptions.GetMaxReplicas(namespace.DefaultNamespace)
Expand All @@ -417,20 +422,22 @@ func (mc *MockCluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabe
}

const (
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 0
defaultMaxMergeRegionKeys = 0
defaultSplitMergeInterval = 0
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 0
defaultMaxMergeRegionKeys = 0
defaultSplitMergeInterval = 0
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
defaultHotRegionCacheHitsThreshold = 3
)

// MockSchedulerOptions is a mock of SchedulerOptions
Expand All @@ -440,6 +447,7 @@ type MockSchedulerOptions struct {
LeaderScheduleLimit uint64
ReplicaScheduleLimit uint64
MergeScheduleLimit uint64
HotRegionScheduleLimit uint64
MaxSnapshotCount uint64
MaxPendingPeerCount uint64
MaxMergeRegionSize uint64
Expand All @@ -448,7 +456,7 @@ type MockSchedulerOptions struct {
MaxStoreDownTime time.Duration
MaxReplicas int
LocationLabels []string
HotRegionLowThreshold int
HotRegionCacheHitsThreshold int
TolerantSizeRatio float64
LowSpaceRatio float64
HighSpaceRatio float64
Expand All @@ -469,13 +477,14 @@ func NewMockSchedulerOptions() *MockSchedulerOptions {
mso.LeaderScheduleLimit = defaultLeaderScheduleLimit
mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit
mso.MergeScheduleLimit = defaultMergeScheduleLimit
mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit
mso.MaxSnapshotCount = defaultMaxSnapshotCount
mso.MaxMergeRegionSize = defaultMaxMergeRegionSize
mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys
mso.SplitMergeInterval = defaultSplitMergeInterval
mso.MaxStoreDownTime = defaultMaxStoreDownTime
mso.MaxReplicas = defaultMaxReplicas
mso.HotRegionLowThreshold = HotRegionLowThreshold
mso.HotRegionCacheHitsThreshold = defaultHotRegionCacheHitsThreshold
mso.MaxPendingPeerCount = defaultMaxPendingPeerCount
mso.TolerantSizeRatio = defaultTolerantSizeRatio
mso.LowSpaceRatio = defaultLowSpaceRatio
Expand Down Expand Up @@ -503,6 +512,11 @@ func (mso *MockSchedulerOptions) GetMergeScheduleLimit(name string) uint64 {
return mso.MergeScheduleLimit
}

// GetHotRegionScheduleLimit mock method
func (mso *MockSchedulerOptions) GetHotRegionScheduleLimit(name string) uint64 {
return mso.HotRegionScheduleLimit
}

// GetMaxSnapshotCount mock method
func (mso *MockSchedulerOptions) GetMaxSnapshotCount() uint64 {
return mso.MaxSnapshotCount
Expand Down Expand Up @@ -543,9 +557,9 @@ func (mso *MockSchedulerOptions) GetLocationLabels() []string {
return mso.LocationLabels
}

// GetHotRegionLowThreshold mock method
func (mso *MockSchedulerOptions) GetHotRegionLowThreshold() int {
return mso.HotRegionLowThreshold
// GetHotRegionCacheHitsThreshold mock method
func (mso *MockSchedulerOptions) GetHotRegionCacheHitsThreshold() int {
return mso.HotRegionCacheHitsThreshold
}

// GetTolerantSizeRatio mock method
Expand Down
3 changes: 2 additions & 1 deletion server/schedule/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Options interface {
GetRegionScheduleLimit() uint64
GetReplicaScheduleLimit() uint64
GetMergeScheduleLimit() uint64
GetHotRegionScheduleLimit() uint64

GetMaxSnapshotCount() uint64
GetMaxPendingPeerCount() uint64
Expand All @@ -40,7 +41,7 @@ type Options interface {
GetMaxReplicas() int
GetLocationLabels() []string

GetHotRegionLowThreshold() int
GetHotRegionCacheHitsThreshold() int
GetTolerantSizeRatio() float64
GetLowSpaceRatio() float64
GetHighSpaceRatio() float64
Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) {
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4)
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4)
opt.HotRegionLowThreshold = 0
opt.HotRegionCacheHitsThreshold = 0

// Will transfer a hot region from store 1, because the total count of peers
// which is hot for store 1 is more larger than other stores.
Expand Down Expand Up @@ -1193,7 +1193,7 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) {
tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
// lower than hot read flow rate, but higher than write flow rate
tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
opt.HotRegionLowThreshold = 0
opt.HotRegionCacheHitsThreshold = 0
c.Assert(tc.IsRegionHot(1), IsTrue)
c.Assert(tc.IsRegionHot(11), IsFalse)
// check randomly pick hot region
Expand Down
13 changes: 10 additions & 3 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,20 @@ func (h *balanceHotRegionsScheduler) IsScheduleAllowed(cluster schedule.Cluster)
return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster)
}

func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}
nolouch marked this conversation as resolved.
Show resolved Hide resolved

func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < h.limit &&
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.limit, cluster.GetHotRegionScheduleLimit()) &&
h.opController.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < h.limit &&
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.limit, cluster.GetHotRegionScheduleLimit()) &&
h.opController.OperatorCount(schedule.OpRegion) < cluster.GetRegionScheduleLimit()
}

Expand Down Expand Up @@ -205,7 +212,7 @@ func calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.Res
// HotDegree is the update times on the hot cache. If the heartbeat report
// the flow of the region exceeds the threshold, the scheduler will update the region in
// the hot cache and the hotdegree of the region will increase.
if r.HotDegree < cluster.GetHotRegionLowThreshold() {
if r.HotDegree < cluster.GetHotRegionCacheHitsThreshold() {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) {
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4)
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4)
opt.HotRegionLowThreshold = 0
opt.HotRegionCacheHitsThreshold = 0

// try to get an operator
var op []*schedule.Operator
Expand Down
10 changes: 6 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,12 @@ func (s *Server) GetNamespaceConfig(name string) *NamespaceConfig {
}

cfg := &NamespaceConfig{
LeaderScheduleLimit: s.scheduleOpt.GetLeaderScheduleLimit(name),
RegionScheduleLimit: s.scheduleOpt.GetRegionScheduleLimit(name),
ReplicaScheduleLimit: s.scheduleOpt.GetReplicaScheduleLimit(name),
MaxReplicas: uint64(s.scheduleOpt.GetMaxReplicas(name)),
LeaderScheduleLimit: s.scheduleOpt.GetLeaderScheduleLimit(name),
RegionScheduleLimit: s.scheduleOpt.GetRegionScheduleLimit(name),
ReplicaScheduleLimit: s.scheduleOpt.GetReplicaScheduleLimit(name),
HotRegionScheduleLimit: s.scheduleOpt.GetHotRegionScheduleLimit(name),
MergeScheduleLimit: s.scheduleOpt.GetMergeScheduleLimit(name),
MaxReplicas: uint64(s.scheduleOpt.GetMaxReplicas(name)),
}

return cfg
Expand Down
Loading