Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: use qps priority with multi priorities in hot region scheduler #3923

Merged
merged 11 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 102 additions & 79 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (h *hotScheduler) gcRegionPendings() {
}

// summaryStoresLoad Load information of all available stores.
// it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store
// it will filter the hot peer and calculate the current and future stat(rate,count) for each store
func summaryStoresLoad(
stores []*core.StoreInfo,
storesLoads map[uint64][]float64,
Expand All @@ -228,12 +228,11 @@ func summaryStoresLoad(
rwTy rwType,
kind core.ResourceKind,
) map[uint64]*storeLoadDetail {
// loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count)
// loadDetail stores the storeID -> hotPeers stat and its current and future stat(rate,count)
loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads))
allLoadSum := make([]float64, statistics.DimLen)
allCount := 0.0

// Stores without byte rate statistics is not available to schedule.
for _, store := range stores {
id := store.GetID()
storeLoads, ok := storesLoads[id]
Expand Down Expand Up @@ -303,7 +302,7 @@ func summaryStoresLoad(
}
}
storeLen := float64(len(storesLoads))
// store expectation byte/key rate and count for each store-load detail.
// store expectation rate and count for each store-load detail.
for id, detail := range loadDetail {
expectLoads := make([]float64, len(allLoadSum))
for i := range expectLoads {
Expand Down Expand Up @@ -414,14 +413,18 @@ type balanceSolver struct {

cur *solution

maxSrc *storeLoad
minDst *storeLoad
rankStep *storeLoad
firstPriority int
secondPriority int
maxSrc *storeLoad
minDst *storeLoad
rankStep *storeLoad

byteIsBetter bool
keyIsBetter bool
firstPriority int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not very elegant. Once we introduce another dimension, we need to add more fields, e.g., thirdPriority, forthPriority.

Copy link
Contributor

@nolouch nolouch Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, maybe directly use the array? but it is also difficutity to add third priority or fourth priority even use the array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only consider 2 dimension during calculating rank now, so introducing new dimension won't change the logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently only two dimensions are considered, if a third dimension is introduced, the whole scheduling policy may need to be modified

secondPriority int
writeLeaderFirstPriority int
writeLeaderSecondPriority int
isSelectedDim func(int) bool

firstPriorityIsBetter bool
secondPriorityIsBetter bool
}

type solution struct {
Expand Down Expand Up @@ -465,7 +468,10 @@ func (bs *balanceSolver) init() {
maxCur = maxLoad(maxCur, &detail.LoadPred.Current)
}

rankStepRatios := []float64{bs.sche.conf.GetByteRankStepRatio(), bs.sche.conf.GetKeyRankStepRatio(), bs.sche.conf.GetQueryRateRankStepRatio()}
rankStepRatios := []float64{
statistics.ByteDim: bs.sche.conf.GetByteRankStepRatio(),
statistics.KeyDim: bs.sche.conf.GetKeyRankStepRatio(),
statistics.QueryDim: bs.sche.conf.GetQueryRateRankStepRatio()}
stepLoads := make([]float64, statistics.DimLen)
for i := range stepLoads {
stepLoads[i] = maxCur.Loads[i] * rankStepRatios[i]
Expand All @@ -474,8 +480,25 @@ func (bs *balanceSolver) init() {
Loads: stepLoads,
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}
bs.firstPriority = stringToDim(bs.preferPriority()[0])
bs.secondPriority = stringToDim(bs.preferPriority()[1])

priorities := bs.sche.conf.ReadPriorities
if bs.rwTy == write {
priorities = bs.sche.conf.WritePriorities
}
bs.firstPriority = stringToDim(priorities[0])
bs.secondPriority = stringToDim(priorities[1])

if bs.rwTy == write {
bs.writeLeaderFirstPriority = statistics.KeyDim
if bs.firstPriority == statistics.QueryDim {
bs.writeLeaderFirstPriority = statistics.QueryDim
}
bs.writeLeaderSecondPriority = statistics.ByteDim
}

bs.isSelectedDim = func(dim int) bool {
return dim == bs.firstPriority || dim == bs.secondPriority
}
}

func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver {
Expand Down Expand Up @@ -550,7 +573,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
return []*operator.Operator{op}
}

// filterSrcStores compare the min rate and the ratio * expectation rate, if both key and byte rate is greater than
// filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than
// its expectation * ratio, the store would be selected as hot source store
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
Expand Down Expand Up @@ -755,7 +778,6 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
}
}

}
return ret
}
Expand All @@ -779,43 +801,38 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider about key rate.
srcKeyRate := srcLd.Loads[statistics.KeyDim]
dstKeyRate := dstLd.Loads[statistics.KeyDim]
peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, statistics.KeyDim))
if srcKeyRate-peerKeyRate >= dstKeyRate+peerKeyRate {
// Only consider about key rate or query rate.
srcRate := srcLd.Loads[bs.writeLeaderFirstPriority]
dstRate := dstLd.Loads[bs.writeLeaderFirstPriority]
peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.writeLeaderFirstPriority))
if srcRate-peerRate >= dstRate+peerRate {
rank = -1
}
} else {
firstPriorityDimHot, firstPriorityDecRatio, secondPriorityDimHot, secondPriorityDecRatio := bs.getHotDecRatioByPriorities(srcLd, dstLd, peer)
greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio()
switch {
case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio:
// If belong to the case, both byte rate and key rate will be more balanced, the best choice.
// If belong to the case, two dim will be more balanced, the best choice.
rank = -3
bs.byteIsBetter = true
bs.keyIsBetter = true
bs.firstPriorityIsBetter = true
bs.secondPriorityIsBetter = true
case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be not worsened, key rate will be more balanced.
// If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced.
rank = -2
bs.keyIsBetter = true
bs.secondPriorityIsBetter = true
case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be more balanced, ignore the key rate.
// If belong to the case, first priority dim will be more balanced, ignore the second priority dim.
rank = -1
bs.byteIsBetter = true
bs.firstPriorityIsBetter = true
}
}
log.Debug("calcProgressiveRank",
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
zap.Uint64("region-id", bs.cur.region.GetID()),
zap.Uint64("from-store-id", bs.cur.srcStoreID),
zap.Uint64("to-store-id", bs.cur.dstStoreID),
zap.Int64("rank", rank))
bs.cur.progressiveRank = rank
}

func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *storeLoad, peer *statistics.HotPeerStat) (bool, float64, bool, float64) {
// we use DecRatio(Decline Ratio) to expect that the dst store's (key/byte) rate should still be less
// than the src store's (key/byte) rate after scheduling one peer.
// we use DecRatio(Decline Ratio) to expect that the dst store's rate should still be less
// than the src store's rate after scheduling one peer.
getSrcDecRate := func(a, b float64) float64 {
if a-b <= 0 {
return 1
Expand All @@ -841,6 +858,8 @@ func (bs *balanceSolver) getMinRate(dim int) float64 {
return bs.sche.conf.GetMinHotKeyRate()
case statistics.ByteDim:
return bs.sche.conf.GetMinHotByteRate()
case statistics.QueryDim:
return bs.sche.conf.GetMinHotQueryRate()
}
return -1
}
Expand Down Expand Up @@ -874,31 +893,32 @@ func (bs *balanceSolver) betterThan(old *solution) bool {
// compare region

if bs.rwTy == write && bs.opTy == transferLeader {
kind := getRegionStatKind(write, bs.writeLeaderFirstPriority)
switch {
case bs.cur.srcPeerStat.GetLoad(statistics.RegionWriteKeys) > old.srcPeerStat.GetLoad(statistics.RegionWriteKeys):
case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind):
return true
case bs.cur.srcPeerStat.GetLoad(statistics.RegionWriteKeys) < old.srcPeerStat.GetLoad(statistics.RegionWriteKeys):
case bs.cur.srcPeerStat.GetLoad(kind) < old.srcPeerStat.GetLoad(kind):
return false
}
} else {
firstCmp, secondCmp := bs.getRkCmpPriorities(old)
switch bs.cur.progressiveRank {
case -2: // greatDecRatio < byteDecRatio <= minorDecRatio && keyDecRatio <= greatDecRatio
case -2: // greatDecRatio < firstPriorityDecRatio <= minorDecRatio && secondPriorityDecRatio <= greatDecRatio
if secondCmp != 0 {
return secondCmp > 0
}
if firstCmp != 0 {
// prefer smaller byte rate, to reduce oscillation
// prefer smaller first priority rate, to reduce oscillation
return firstCmp < 0
}
case -3: // byteDecRatio <= greatDecRatio && keyDecRatio <= greatDecRatio
case -3: // firstPriorityDecRatio <= greatDecRatio && secondPriorityDecRatio <= greatDecRatio
if secondCmp != 0 {
return secondCmp > 0
}
fallthrough
case -1: // byteDecRatio <= greatDecRatio
case -1: // firstPriorityDecRatio <= greatDecRatio
if firstCmp != 0 {
// prefer region with larger byte rate, to converge faster
// prefer region with larger first priority rate, to converge faster
return firstCmp > 0
}
}
Expand All @@ -916,6 +936,8 @@ func (bs *balanceSolver) getRkCmpPriorities(old *solution) (firstCmp int, second
return 100
case statistics.KeyDim:
return 10
case statistics.QueryDim:
return 10
}
return 100
}
Expand All @@ -932,31 +954,24 @@ func (bs *balanceSolver) compareSrcStore(st1, st2 uint64) int {
if bs.rwTy == write && bs.opTy == transferLeader {
lpCmp = sliceLPCmp(
minLPCmp(negLoadCmp(sliceLoadCmp(
stLdRankCmp(stLdKeyRate, stepRank(bs.maxSrc.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])),
stLdRankCmp(stLdByteRate, stepRank(bs.maxSrc.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])),
stLdRankCmp(stLdRate(bs.writeLeaderFirstPriority), stepRank(bs.maxSrc.Loads[bs.writeLeaderFirstPriority], bs.rankStep.Loads[bs.writeLeaderFirstPriority])),
stLdRankCmp(stLdRate(bs.writeLeaderSecondPriority), stepRank(bs.maxSrc.Loads[bs.writeLeaderSecondPriority], bs.rankStep.Loads[bs.writeLeaderSecondPriority])),
))),
diffCmp(sliceLoadCmp(
stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)),
stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.Loads[statistics.KeyDim])),
stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])),
stLdRankCmp(stLdRate(bs.writeLeaderFirstPriority), stepRank(0, bs.rankStep.Loads[bs.writeLeaderFirstPriority])),
stLdRankCmp(stLdRate(bs.writeLeaderSecondPriority), stepRank(0, bs.rankStep.Loads[bs.writeLeaderSecondPriority])),
)),
)
} else {
f := func(dim int) func(ld *storeLoad) float64 {
switch dim {
case statistics.ByteDim:
return stLdByteRate
case statistics.KeyDim:
return stLdKeyRate
}
return stLdByteRate
}
lpCmp = sliceLPCmp(
minLPCmp(negLoadCmp(sliceLoadCmp(
stLdRankCmp(f(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
stLdRankCmp(f(bs.secondPriority), stepRank(bs.maxSrc.Loads[bs.secondPriority], bs.rankStep.Loads[bs.secondPriority])),
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
stLdRankCmp(stLdRate(bs.secondPriority), stepRank(bs.maxSrc.Loads[bs.secondPriority], bs.rankStep.Loads[bs.secondPriority])),
))),
diffCmp(stLdRankCmp(f(bs.firstPriority), stepRank(0, bs.rankStep.Loads[bs.firstPriority]))),
diffCmp(
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(0, bs.rankStep.Loads[bs.firstPriority])),
),
)
}
lp1 := bs.stLoadDetail[st1].LoadPred
Expand All @@ -974,22 +989,22 @@ func (bs *balanceSolver) compareDstStore(st1, st2 uint64) int {
if bs.rwTy == write && bs.opTy == transferLeader {
lpCmp = sliceLPCmp(
maxLPCmp(sliceLoadCmp(
stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])),
stLdRankCmp(stLdByteRate, stepRank(bs.minDst.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])),
stLdRankCmp(stLdRate(bs.writeLeaderFirstPriority), stepRank(bs.minDst.Loads[bs.writeLeaderFirstPriority], bs.rankStep.Loads[bs.writeLeaderFirstPriority])),
stLdRankCmp(stLdRate(bs.writeLeaderSecondPriority), stepRank(bs.minDst.Loads[bs.writeLeaderSecondPriority], bs.rankStep.Loads[bs.writeLeaderSecondPriority])),
)),
diffCmp(sliceLoadCmp(
stLdRankCmp(stLdCount, stepRank(0, bs.rankStep.Count)),
stLdRankCmp(stLdKeyRate, stepRank(0, bs.rankStep.Loads[statistics.KeyDim])),
stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])),
stLdRankCmp(stLdRate(bs.writeLeaderFirstPriority), stepRank(0, bs.rankStep.Loads[bs.writeLeaderFirstPriority])),
stLdRankCmp(stLdRate(bs.writeLeaderSecondPriority), stepRank(0, bs.rankStep.Loads[bs.writeLeaderSecondPriority])),
)))
} else {
lpCmp = sliceLPCmp(
maxLPCmp(sliceLoadCmp(
stLdRankCmp(stLdByteRate, stepRank(bs.minDst.Loads[statistics.ByteDim], bs.rankStep.Loads[statistics.ByteDim])),
stLdRankCmp(stLdKeyRate, stepRank(bs.minDst.Loads[statistics.KeyDim], bs.rankStep.Loads[statistics.KeyDim])),
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.minDst.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
stLdRankCmp(stLdRate(bs.secondPriority), stepRank(bs.minDst.Loads[bs.secondPriority], bs.rankStep.Loads[bs.secondPriority])),
)),
diffCmp(
stLdRankCmp(stLdByteRate, stepRank(0, bs.rankStep.Loads[statistics.ByteDim])),
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(0, bs.rankStep.Loads[bs.firstPriority])),
),
)
}
Expand Down Expand Up @@ -1061,6 +1076,7 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence
return nil, nil
}
desc := "transfer-hot-" + bs.rwTy.String() + "-leader"
typ = "transfer-leader"
sourceLabel = strconv.FormatUint(bs.cur.srcStoreID, 10)
targetLabel = strconv.FormatUint(bs.cur.dstStoreID, 10)
op, err = operator.CreateTransferLeaderOperator(
Expand All @@ -1079,12 +1095,12 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence
}

dim := ""
if bs.byteIsBetter && bs.keyIsBetter {
dim = "both"
} else if bs.byteIsBetter {
dim = "byte"
} else if bs.keyIsBetter {
dim = "key"
if bs.firstPriorityIsBetter && bs.secondPriorityIsBetter {
dim = "all"
} else if bs.firstPriorityIsBetter {
dim = dimToString(bs.firstPriority)
} else if bs.secondPriorityIsBetter {
dim = dimToString(bs.secondPriority)
}

op.SetPriorityLevel(core.HighPriority)
Expand Down Expand Up @@ -1251,20 +1267,27 @@ func getRegionStatKind(rwTy rwType, dim int) statistics.RegionStatKind {
return 0
}

func (bs *balanceSolver) preferPriority() [2]string {
priorities := bs.sche.conf.WritePriorities
if bs.rwTy == read {
priorities = bs.sche.conf.ReadPriorities
}
return [2]string{priorities[0], priorities[1]}
}

func stringToDim(name string) int {
switch name {
case BytePriority:
return statistics.ByteDim
case KeyPriority:
return statistics.KeyDim
case QueryPriority:
return statistics.QueryDim
}
return statistics.ByteDim
}

func dimToString(dim int) string {
switch dim {
case statistics.ByteDim:
return BytePriority
case statistics.KeyDim:
return KeyPriority
case statistics.QueryDim:
return QueryPriority
default:
return ""
}
}
Loading