diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 4397a6d025a9e..be8e3c84f82da 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -104,10 +104,11 @@ func (rs *batchCopResponse) RespTime() time.Duration { // if there is only 1 available store, then put the region to the related store // otherwise, use a greedy algorithm to put it into the store with highest weight func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask { - if len(originalTasks) <= 1 { + isMPP := mppStoreLastFailTime != nil + // for mpp, we still need to detect the store availability + if len(originalTasks) <= 1 && !isMPP { return originalTasks } - isMPP := mppStoreLastFailTime != nil cache := kvStore.GetRegionCache() storeTaskMap := make(map[uint64]*batchCopTask) // storeCandidateRegionMap stores all the possible store->region map. Its content is @@ -227,16 +228,28 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] } } } - if totalRemainingRegionNum == 0 { - return originalTasks - } - avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) - findNextStore := func(candidateStores []uint64) uint64 { - store := uint64(math.MaxUint64) - weightedRegionNum := math.MaxFloat64 - if candidateStores != nil { - for _, storeID := range candidateStores { + if totalRemainingRegionNum > 0 { + avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) + findNextStore := func(candidateStores []uint64) uint64 { + store := uint64(math.MaxUint64) + weightedRegionNum := math.MaxFloat64 + if candidateStores != nil { + for _, storeID := range candidateStores { + if _, validStore := storeCandidateRegionMap[storeID]; !validStore { + continue + } + num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos)) + if num < weightedRegionNum { + store = storeID + weightedRegionNum = num + } + } + if store != uint64(math.MaxUint64) { + return store + } + } + for storeID := range storeTaskMap { if _, validStore := storeCandidateRegionMap[storeID]; !validStore { continue } @@ -246,57 +259,44 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] weightedRegionNum = num } } - if store != uint64(math.MaxUint64) { - return store - } + return store } - for storeID := range storeTaskMap { - if _, validStore := storeCandidateRegionMap[storeID]; !validStore { - continue + + store := findNextStore(nil) + for totalRemainingRegionNum > 0 { + if store == uint64(math.MaxUint64) { + break } - num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos)) - if num < weightedRegionNum { - store = storeID - weightedRegionNum = num + var key string + var ri RegionInfo + for key, ri = range storeCandidateRegionMap[store] { + // get the first region + break } - } - return store - } - - store := findNextStore(nil) - for totalRemainingRegionNum > 0 { - if store == uint64(math.MaxUint64) { - break - } - var key string - var ri RegionInfo - for key, ri = range storeCandidateRegionMap[store] { - // get the first region - break - } - storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) - totalRemainingRegionNum-- - for _, id := range ri.AllStores { - if _, ok := storeCandidateRegionMap[id]; ok { - delete(storeCandidateRegionMap[id], key) - totalRegionCandidateNum-- - if len(storeCandidateRegionMap[id]) == 0 { - delete(storeCandidateRegionMap, id) + storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) + totalRemainingRegionNum-- + for _, id := range ri.AllStores { + if _, ok := storeCandidateRegionMap[id]; ok { + delete(storeCandidateRegionMap[id], key) + totalRegionCandidateNum-- + if len(storeCandidateRegionMap[id]) == 0 { + delete(storeCandidateRegionMap, id) + } } } + if totalRemainingRegionNum > 0 { + avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) + // it is not optimal because we only check the stores that affected by this region, in fact in order + // to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think + // check only the affected stores is more simple and will get a good enough result + store = findNextStore(ri.AllStores) + } } if totalRemainingRegionNum > 0 { - avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) - // it is not optimal because we only check the stores that affected by this region, in fact in order - // to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think - // check only the affected stores is more simple and will get a good enough result - store = findNextStore(ri.AllStores) + logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing") + return originalTasks } } - if totalRemainingRegionNum > 0 { - logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing") - return originalTasks - } var ret []*batchCopTask for _, task := range storeTaskMap {