diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index f0f35dde0b35e..c95717cffedff 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -21,6 +21,7 @@ import ( "encoding/json" "math" "sort" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -29,6 +30,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/ddl/ingest" + "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" diststorage "github.com/pingcap/tidb/pkg/disttask/framework/storage" @@ -37,6 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/util/backoff" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" @@ -256,6 +259,11 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error) return subTaskMetas, nil } +const ( + scanRegionBackoffBase = 200 * time.Millisecond + scanRegionBackoffMax = 2 * time.Second +) + func generateNonPartitionPlan( d *ddl, tblInfo *model.TableInfo, @@ -279,38 +287,65 @@ func generateNonPartitionPlan( if err != nil { return nil, errors.Trace(err) } - regionCache := d.store.(helper.Storage).GetRegionCache() - recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey) - if err != nil { - return nil, err - } - regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud) subTaskMetas := make([][]byte, 0, 4) - sort.Slice(recordRegionMetas, func(i, j int) bool { - return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0 - }) - for i := 0; i < len(recordRegionMetas); i += regionBatch { - end := i + regionBatch - if end > len(recordRegionMetas) { - end = len(recordRegionMetas) - } - batch := recordRegionMetas[i:end] - subTaskMeta := &BackfillSubTaskMeta{ - RowStart: batch[0].StartKey(), - RowEnd: batch[len(batch)-1].EndKey(), + backoffer := backoff.NewExponential(scanRegionBackoffBase, 2, scanRegionBackoffMax) + err = handle.RunWithRetry(d.ctx, 8, backoffer, logutil.Logger(d.ctx), func(_ context.Context) (bool, error) { + regionCache := d.store.(helper.Storage).GetRegionCache() + recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey) + if err != nil { + return false, err } - if i == 0 { - subTaskMeta.RowStart = startKey + sort.Slice(recordRegionMetas, func(i, j int) bool { + return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0 + }) + + // Check if regions are continuous. + shouldRetry := false + cur := recordRegionMetas[0] + for _, m := range recordRegionMetas[1:] { + if !bytes.Equal(cur.EndKey(), m.StartKey()) { + shouldRetry = true + break + } + cur = m } - if end == len(recordRegionMetas) { - subTaskMeta.RowEnd = endKey + + if shouldRetry { + return true, nil } - metaBytes, err := json.Marshal(subTaskMeta) - if err != nil { - return nil, err + + regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud) + + for i := 0; i < len(recordRegionMetas); i += regionBatch { + end := i + regionBatch + if end > len(recordRegionMetas) { + end = len(recordRegionMetas) + } + batch := recordRegionMetas[i:end] + subTaskMeta := &BackfillSubTaskMeta{ + RowStart: batch[0].StartKey(), + RowEnd: batch[len(batch)-1].EndKey(), + } + if i == 0 { + subTaskMeta.RowStart = startKey + } + if end == len(recordRegionMetas) { + subTaskMeta.RowEnd = endKey + } + metaBytes, err := json.Marshal(subTaskMeta) + if err != nil { + return false, err + } + subTaskMetas = append(subTaskMetas, metaBytes) } - subTaskMetas = append(subTaskMetas, metaBytes) + return false, nil + }) + if err != nil { + return nil, errors.Trace(err) + } + if len(subTaskMetas) == 0 { + return nil, errors.Errorf("regions are not continuous") } return subTaskMetas, nil }