Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copr: Fix bug that mpp node availability detect does not work in some corner cases (#28201) #28287

Merged
merged 6 commits into from
Dec 17, 2021
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 53 additions & 53 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func (rs *batchCopResponse) RespTime() time.Duration {
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask {
if len(originalTasks) <= 1 {
isMPP := mppStoreLastFailTime != nil
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
cache := kvStore.GetRegionCache()
storeTaskMap := make(map[uint64]*batchCopTask)
// storeCandidateRegionMap stores all the possible store->region map. Its content is
Expand Down Expand Up @@ -223,16 +224,28 @@ func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTas
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}

avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if totalRemainingRegionNum > 0 {
avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
Expand All @@ -242,57 +255,44 @@ func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTas
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
return store
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
var key string
var ri tikv.RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
}
return store
}

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri tikv.RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}

var ret []*batchCopTask
for _, task := range storeTaskMap {
Expand Down