Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: use pending amp in hot region scheduler #3926

Merged
merged 9 commits into from
Aug 7, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (

// schedulePeerPr the probability of schedule the hot peer.
var schedulePeerPr = 0.66
var pendingAmpFactor = 8.0
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved

type hotScheduler struct {
name string
Expand Down Expand Up @@ -631,14 +632,18 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.rwTy == write && bs.opTy == transferLeader:
case bs.isForWriteLeader():
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}
return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcStoreID, bs.best.dstStoreID, bs.infl, maxZombieDur)
}

func (bs *balanceSolver) isForWriteLeader() bool {
return bs.rwTy == write && bs.opTy == transferLeader
}

// filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than
// its expectation * ratio, the store would be selected as hot source store
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
Expand Down Expand Up @@ -868,39 +873,66 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *storeLo
// calcProgressiveRank calculates `bs.cur.progressiveRank`.
// See the comments of `solution.progressiveRank` for more about progressive rank.
func (bs *balanceSolver) calcProgressiveRank() {
srcLd := bs.stLoadDetail[bs.cur.srcStoreID].LoadPred.min()
dstLd := bs.stLoadDetail[bs.cur.dstStoreID].LoadPred.max()
src := bs.stLoadDetail[bs.cur.srcStoreID].LoadPred
dst := bs.stLoadDetail[bs.cur.dstStoreID].LoadPred
bs.cur.progressiveRank = 0
srcLd := src.min()
dstLd := dst.max()
peer := bs.cur.srcPeerStat
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider key rate or query rate.

if bs.isForWriteLeader() {
if !bs.isTolerance(src, dst, bs.firstPriority) {
return
}
srcRate := srcLd.Loads[bs.firstPriority]
dstRate := dstLd.Loads[bs.firstPriority]
peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.firstPriority))
if srcRate-peerRate >= dstRate+peerRate {
rank = -1
bs.cur.progressiveRank = -1
}
} else {
firstPriorityDimHot, firstPriorityDecRatio, secondPriorityDimHot, secondPriorityDecRatio := bs.getHotDecRatioByPriorities(srcLd, dstLd, peer)
greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio()
switch {
case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio:
// If belong to the case, two dim will be more balanced, the best choice.
rank = -3
if !bs.isTolerance(src, dst, bs.firstPriority) || !bs.isTolerance(src, dst, bs.secondPriority) {
return
}
bs.cur.progressiveRank = -3
bs.firstPriorityIsBetter = true
bs.secondPriorityIsBetter = true
case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio:
// If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced.
rank = -2
if !bs.isTolerance(src, dst, bs.secondPriority) {
return
}
bs.cur.progressiveRank = -2
bs.secondPriorityIsBetter = true
case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio:
// If belong to the case, first priority dim will be more balanced, ignore the second priority dim.
rank = -1
if !bs.isTolerance(src, dst, bs.firstPriority) {
return
}
bs.cur.progressiveRank = -1
bs.firstPriorityIsBetter = true
}
}
bs.cur.progressiveRank = rank
}

lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// isTolerance checks source store and target store by checking the difference value with pendingAmpFactor * pendingPeer.
// This will make the hot region scheduling slow even serializely running when each 2 store's pending influence is close.
func (bs *balanceSolver) isTolerance(src, dst *storeLoadPred, dim int) bool {
srcRate := src.Current.Loads[dim]
dstRate := dst.Current.Loads[dim]
if srcRate <= dstRate {
return false
}
pendingAmp := (1 + pendingAmpFactor*srcRate/(srcRate-dstRate))
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
srcPending := src.pending().Loads[dim]
dstPending := dst.pending().Loads[dim]
hotPendingStatus.WithLabelValues(bs.rwTy.String(), strconv.FormatUint(bs.cur.srcStoreID, 10), strconv.FormatUint(bs.cur.dstStoreID, 10)).Set(pendingAmp)
return srcRate-pendingAmp*srcPending > dstRate+pendingAmp*dstPending
}

func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *storeLoad, peer *statistics.HotPeerStat) (bool, float64, bool, float64) {
Expand Down Expand Up @@ -965,7 +997,7 @@ func (bs *balanceSolver) betterThan(old *solution) bool {
if bs.cur.srcPeerStat != old.srcPeerStat {
// compare region

if bs.rwTy == write && bs.opTy == transferLeader {
if bs.isForWriteLeader() {
kind := getRegionStatKind(write, bs.firstPriority)
switch {
case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind):
Expand Down Expand Up @@ -1024,7 +1056,7 @@ func (bs *balanceSolver) compareSrcStore(st1, st2 uint64) int {
if st1 != st2 {
// compare source store
var lpCmp storeLPCmp
if bs.rwTy == write && bs.opTy == transferLeader {
if bs.isForWriteLeader() {
lpCmp = sliceLPCmp(
minLPCmp(negLoadCmp(sliceLoadCmp(
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
Expand Down Expand Up @@ -1059,7 +1091,7 @@ func (bs *balanceSolver) compareDstStore(st1, st2 uint64) int {
if st1 != st2 {
// compare destination store
var lpCmp storeLPCmp
if bs.rwTy == write && bs.opTy == transferLeader {
if bs.isForWriteLeader() {
lpCmp = sliceLPCmp(
maxLPCmp(sliceLoadCmp(
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.minDst.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
Expand Down
7 changes: 4 additions & 3 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ type hotRegionSchedulerConfig struct {
SrcToleranceRatio float64 `json:"src-tolerance-ratio"`
DstToleranceRatio float64 `json:"dst-tolerance-ratio"`
ReadPriorities []string `json:"read-priorities"`
WriteLeaderPriorities []string `json:"write-leader-priorities"`
WritePeerPriorities []string `json:"write-peer-priorities"`
StrictPickingStore bool `json:"strict-picking-store,string"`
// For first priority of write leader, it is better to consider key rate or query rather than byte
WriteLeaderPriorities []string `json:"write-leader-priorities"`
WritePeerPriorities []string `json:"write-peer-priorities"`
StrictPickingStore bool `json:"strict-picking-store,string"`
}

func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
Expand Down
38 changes: 32 additions & 6 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,11 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithPendingInfluence(c *C) {
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
old := pendingAmpFactor
pendingAmpFactor = 0.0
defer func() {
pendingAmpFactor = old
}()
for i := 0; i < 2; i++ {
// 0: byte rate
// 1: key rate
Expand Down Expand Up @@ -854,7 +859,11 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) {
hb.(*hotScheduler).conf.MinorDecRatio = 1
hb.(*hotScheduler).conf.DstToleranceRatio = 1
hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority}

old := pendingAmpFactor
pendingAmpFactor = 0.0
defer func() {
pendingAmpFactor = old
}()
for i := 0; i < 2; i++ {
// 0: byte rate
// 1: key rate
Expand Down Expand Up @@ -908,11 +917,16 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) {
testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4)
// After move-peer, store byte/key rate (min, max): (6.6, 7.1) | 6.1 | 6 | (5, 5.5)

pendingAmpFactor = old
ops := hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
pendingAmpFactor = 0.0

op2 := hb.Schedule(tc)[0]
testutil.CheckTransferPeer(c, op2, operator.OpHotRegion, 1, 4)
// After move-peer, store byte/key rate (min, max): (6.1, 7.1) | 6.1 | 6 | (5, 6)

ops := hb.Schedule(tc)
ops = hb.Schedule(tc)
c.Logf("%v", ops)
c.Assert(ops, HasLen, 0)
}
Expand Down Expand Up @@ -1397,10 +1411,19 @@ func (s *testInfluenceSerialSuite) TestInfluenceByRWType(c *C) {
c.Assert(nearlyAbout(pendingInfluence[4].Loads[statistics.RegionReadKeys], 0.5*MB), IsTrue)
c.Assert(nearlyAbout(pendingInfluence[4].Loads[statistics.RegionReadBytes], 0.5*MB), IsTrue)

// consider pending amp, there are nine regions or more.
addRegionInfo(tc, write, []testRegionInfo{
rleungx marked this conversation as resolved.
Show resolved Hide resolved
{2, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{3, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{4, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{5, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{6, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{7, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{8, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{9, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{10, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{11, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
{12, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
})
addRegionInfo(tc, read, []testRegionInfo{
{2, []uint64{1, 2, 3}, 0.7 * MB, 0.7 * MB},
Expand Down Expand Up @@ -1546,6 +1569,7 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5)
hb.(*hotScheduler).clearPendingInfluence()
}

func (s *testHotSchedulerSuite) TestHotWriteLeaderScheduleWithPriority(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1573,17 +1597,19 @@ func (s *testHotSchedulerSuite) TestHotWriteLeaderScheduleWithPriority(c *C) {
{4, []uint64{2, 1, 3}, 10 * MB, 0 * MB},
{5, []uint64{3, 2, 1}, 0 * MB, 10 * MB},
})
old := schedulePeerPr
schedulePeerPr = 0.0
old1, old2 := schedulePeerPr, pendingAmpFactor
schedulePeerPr, pendingAmpFactor = 0.0, 0.0
defer func() {
schedulePeerPr, pendingAmpFactor = old1, old2
}()
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{KeyPriority, BytePriority}
ops := hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 2)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{BytePriority, KeyPriority}
ops = hb.Schedule(tc)
c.Assert(len(ops), Equals, 1)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 3)
schedulePeerPr = old
}

func (s *testHotSchedulerSuite) TestCompatibility(c *C) {
Expand Down
9 changes: 9 additions & 0 deletions server/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ var scatterRangeRegionCounter = prometheus.NewCounterVec(
Help: "Counter of scatter range region scheduler.",
}, []string{"type", "store"})

var hotPendingStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "hot_pending",
Help: "Counter of direction of balance related schedulers.",
}, []string{"type", "source", "target"})

func init() {
prometheus.MustRegister(schedulerCounter)
prometheus.MustRegister(schedulerStatus)
Expand All @@ -124,4 +132,5 @@ func init() {
prometheus.MustRegister(scatterRangeRegionCounter)
prometheus.MustRegister(opInfluenceStatus)
prometheus.MustRegister(tolerantResourceStatus)
prometheus.MustRegister(hotPendingStatus)
}
12 changes: 12 additions & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,18 @@ func (lp *storeLoadPred) max() *storeLoad {
return maxLoad(&lp.Current, &lp.Future)
}

func (lp *storeLoadPred) pending() *storeLoad {
mx, mn := lp.max(), lp.min()
loads := make([]float64, len(mx.Loads))
for i := range loads {
loads[i] = mx.Loads[i] - mn.Loads[i]
}
return &storeLoad{
Loads: loads,
Count: mx.Count,
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (lp *storeLoadPred) diff() *storeLoad {
mx, mn := lp.max(), lp.min()
loads := make([]float64, len(mx.Loads))
Expand Down