Skip to content

Commit

Permalink
scheduler: consider evict leader when calc expect (#3967)
Browse files Browse the repository at this point in the history
* update

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot authored Aug 12, 2021
1 parent bf39734 commit 11c55c2
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 24 deletions.
10 changes: 10 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,16 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
mc.PutStore(newStore)
}

// SetStoreEvictLeader set store whether evict leader.
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.PauseLeaderTransfer()))
} else {
mc.PutStore(store.Clone(core.ResumeLeaderTransfer()))
}
}

// UpdateStoreRegionWeight updates store region weight.
func (mc *Cluster) UpdateStoreRegionWeight(storeID uint64, weight float64) {
store := mc.GetStore(storeID)
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
var schedulePeerPr = 0.66

// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
var pendingAmpFactor = 8.0
var pendingAmpFactor = 2.0

type hotScheduler struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// KeyPriority indicates hot-region-scheduler prefer key dim
KeyPriority = "key"
// QueryPriority indicates hot-region-scheduler prefer query dim
QueryPriority = "qps"
QueryPriority = "query"

// Scheduling has a bigger impact on TiFlash, so it needs to be corrected in configuration items
// In the default config, the TiKV difference is 1.05*1.05-1 = 0.1025, and the TiFlash difference is 1.15*1.15-1 = 0.3225
Expand Down
28 changes: 15 additions & 13 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) {
//| 2 | 4.5MB |
//| 3 | 4.5MB |
//| 4 | 6MB |
//| 5 | 0MB |
//| 5 | 0MB(Evict)|
//| 6 | 0MB |
//| 7 | n/a (Down)|
//| 8 | n/a | <- TiFlash is always 0.
Expand All @@ -465,6 +465,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) {
3: 4.5 * MB * statistics.StoreHeartBeatReportInterval,
4: 6 * MB * statistics.StoreHeartBeatReportInterval,
}
tc.SetStoreEvictLeader(5, true)
tikvBytesSum, tikvKeysSum, tikvQuerySum := 0.0, 0.0, 0.0
for i := aliveTiKVStartID; i <= aliveTiKVLastID; i++ {
tikvBytesSum += float64(storesBytes[i]) / 10
Expand All @@ -478,13 +479,14 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) {
}
{ // Check the load expect
aliveTiKVCount := float64(aliveTiKVLastID - aliveTiKVStartID + 1)
allowLeaderTiKVCount := aliveTiKVCount - 1 // store 5 with evict leader
aliveTiFlashCount := float64(aliveTiFlashLastID - aliveTiFlashStartID + 1)
tc.ObserveRegionsStats()
c.Assert(len(hb.Schedule(tc)) == 0, IsFalse)
c.Assert(
loadsEqual(
hb.stLoadInfos[writeLeader][1].LoadPred.Expect.Loads,
[]float64{hotRegionBytesSum / aliveTiKVCount, hotRegionKeysSum / aliveTiKVCount, tikvQuerySum / aliveTiKVCount}),
[]float64{hotRegionBytesSum / allowLeaderTiKVCount, hotRegionKeysSum / allowLeaderTiKVCount, tikvQuerySum / allowLeaderTiKVCount}),
IsTrue)
c.Assert(tikvQuerySum != hotRegionQuerySum, IsTrue)
c.Assert(
Expand Down Expand Up @@ -549,7 +551,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithQuery(c *C) {
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"qps", "byte"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{QueryPriority, BytePriority}

tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
Expand Down Expand Up @@ -1905,9 +1907,9 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) {
{statistics.ByteDim, statistics.KeyDim},
})
// config error value
hb.(*hotScheduler).conf.ReadPriorities = []string{"hahaha"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"hahaha", "byte"}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{"qps", "byte", "key"}
hb.(*hotScheduler).conf.ReadPriorities = []string{"error"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"error", BytePriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{QueryPriority, BytePriority, KeyPriority}
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.QueryDim, statistics.ByteDim},
{statistics.KeyDim, statistics.ByteDim},
Expand All @@ -1921,18 +1923,18 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) {
{statistics.ByteDim, statistics.KeyDim},
})
// config byte and key
hb.(*hotScheduler).conf.ReadPriorities = []string{"key", "byte"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"byte", "key"}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{"key", "byte"}
hb.(*hotScheduler).conf.ReadPriorities = []string{KeyPriority, BytePriority}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{BytePriority, KeyPriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{KeyPriority, BytePriority}
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.KeyDim, statistics.ByteDim},
{statistics.ByteDim, statistics.KeyDim},
{statistics.KeyDim, statistics.ByteDim},
})
// config query in low version
hb.(*hotScheduler).conf.ReadPriorities = []string{"qps", "byte"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"qps", "byte"}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{"qps", "byte"}
hb.(*hotScheduler).conf.ReadPriorities = []string{QueryPriority, BytePriority}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{QueryPriority, BytePriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{QueryPriority, BytePriority}
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.ByteDim, statistics.KeyDim},
{statistics.KeyDim, statistics.ByteDim},
Expand All @@ -1941,7 +1943,7 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) {
// config error value
hb.(*hotScheduler).conf.ReadPriorities = []string{"error", "error"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{"qps", "byte", "key"}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{QueryPriority, BytePriority, KeyPriority}
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.ByteDim, statistics.KeyDim},
{statistics.KeyDim, statistics.ByteDim},
Expand Down
32 changes: 25 additions & 7 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ type storeCollector interface {
// Engine returns the type of Store.
Engine() string
// Filter determines whether the Store needs to be handled by itself.
Filter(info *storeSummaryInfo) bool
Filter(info *storeSummaryInfo, kind core.ResourceKind) bool
// GetLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind.
GetLoads(storeLoads, peerLoadSum []float64, rwTy rwType, kind core.ResourceKind) (loads []float64)
}
Expand All @@ -522,8 +522,17 @@ func (c tikvCollector) Engine() string {
return core.EngineTiKV
}

func (c tikvCollector) Filter(info *storeSummaryInfo) bool {
return !info.IsTiFlash
func (c tikvCollector) Filter(info *storeSummaryInfo, kind core.ResourceKind) bool {
if info.IsTiFlash {
return false
}
switch kind {
case core.LeaderKind:
return info.Store.AllowLeaderTransfer()
case core.RegionKind:
return true
}
return false
}

func (c tikvCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy rwType, kind core.ResourceKind) (loads []float64) {
Expand Down Expand Up @@ -564,8 +573,14 @@ func (c tiflashCollector) Engine() string {
return core.EngineTiFlash
}

func (c tiflashCollector) Filter(info *storeSummaryInfo) bool {
return info.IsTiFlash
func (c tiflashCollector) Filter(info *storeSummaryInfo, kind core.ResourceKind) bool {
switch kind {
case core.LeaderKind:
return false
case core.RegionKind:
return info.IsTiFlash
}
return false
}

func (c tiflashCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy rwType, kind core.ResourceKind) (loads []float64) {
Expand Down Expand Up @@ -640,12 +655,11 @@ func summaryStoresLoadByEngine(
allStoreCount := 0
allHotPeersCount := 0

// Stores without byte rate statistics is not available to schedule.
for _, info := range storeInfos {
store := info.Store
id := store.GetID()
storeLoads, ok := storesLoads[id]
if !ok || !collector.Filter(info) {
if !ok || !collector.Filter(info, kind) {
continue
}

Expand Down Expand Up @@ -691,6 +705,10 @@ func summaryStoresLoadByEngine(
})
}

if allStoreCount == 0 {
return loadDetail
}

expectCount := float64(allHotPeersCount) / float64(allStoreCount)
expectLoads := make([]float64, len(allStoreLoadSum))
for i := range expectLoads {
Expand Down
4 changes: 2 additions & 2 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) {
"minor-dec-ratio": 0.99,
"src-tolerance-ratio": 1.05,
"dst-tolerance-ratio": 1.05,
"read-priorities": []interface{}{"qps", "byte"},
"read-priorities": []interface{}{"query", "byte"},
"write-leader-priorities": []interface{}{"key", "byte"},
"write-peer-priorities": []interface{}{"byte", "key"},
"strict-picking-store": "true",
Expand Down Expand Up @@ -316,7 +316,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) {
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)
// cannot set qps as write-peer-priorities
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "qps,byte"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)

Expand Down

0 comments on commit 11c55c2

Please sign in to comment.