diff --git a/pkg/ddl/backfilling_dispatcher.go b/pkg/ddl/backfilling_dispatcher.go index 482541feb0dd9..3c2cc3b571b8a 100644 --- a/pkg/ddl/backfilling_dispatcher.go +++ b/pkg/ddl/backfilling_dispatcher.go @@ -21,6 +21,7 @@ import ( "encoding/json" "math" "sort" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/disttask/framework/dispatcher" + "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/kv" @@ -38,6 +40,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/util/backoff" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" @@ -262,6 +265,11 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error) return subTaskMetas, nil } +const ( + scanRegionBackoffBase = 200 * time.Millisecond + scanRegionBackoffMax = 2 * time.Second +) + func generateNonPartitionPlan( d *ddl, tblInfo *model.TableInfo, job *model.Job, useCloud bool, instanceCnt int) (metas [][]byte, err error) { tbl, err := getTable((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, tblInfo) @@ -280,41 +288,69 @@ func generateNonPartitionPlan( if err != nil { return nil, errors.Trace(err) } - regionCache := d.store.(helper.Storage).GetRegionCache() - recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey) - if err != nil { - return nil, err - } - - regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud) subTaskMetas := make([][]byte, 0, 4) - sort.Slice(recordRegionMetas, func(i, j int) bool { - return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0 - }) - for i := 0; i < len(recordRegionMetas); i += regionBatch { - end := i + regionBatch - if end > len(recordRegionMetas) { - end = len(recordRegionMetas) - } - batch := recordRegionMetas[i:end] - subTaskMeta := &BackfillSubTaskMeta{ - SortedKVMeta: external.SortedKVMeta{ - StartKey: batch[0].StartKey(), - EndKey: batch[len(batch)-1].EndKey(), - }, + + backoffer := backoff.NewExponential(scanRegionBackoffBase, 2, scanRegionBackoffMax) + + err = handle.RunWithRetry(d.ctx, 8, backoffer, logutil.Logger(d.ctx), func(_ context.Context) (bool, error) { + regionCache := d.store.(helper.Storage).GetRegionCache() + recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey) + if err != nil { + return false, err } - if i == 0 { - subTaskMeta.StartKey = startKey + sort.Slice(recordRegionMetas, func(i, j int) bool { + return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0 + }) + // Check if regions are continuous. + shouldRetry := false + cur := recordRegionMetas[0] + for _, m := range recordRegionMetas[1:] { + if !bytes.Equal(cur.EndKey(), m.StartKey()) { + shouldRetry = true + break + } + cur = m } - if end == len(recordRegionMetas) { - subTaskMeta.EndKey = endKey + + if shouldRetry { + return true, nil } - metaBytes, err := json.Marshal(subTaskMeta) - if err != nil { - return nil, err + + regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud) + + for i := 0; i < len(recordRegionMetas); i += regionBatch { + end := i + regionBatch + if end > len(recordRegionMetas) { + end = len(recordRegionMetas) + } + batch := recordRegionMetas[i:end] + subTaskMeta := &BackfillSubTaskMeta{ + SortedKVMeta: external.SortedKVMeta{ + StartKey: batch[0].StartKey(), + EndKey: batch[len(batch)-1].EndKey(), + }, + } + if i == 0 { + subTaskMeta.StartKey = startKey + } + if end == len(recordRegionMetas) { + subTaskMeta.EndKey = endKey + } + metaBytes, err := json.Marshal(subTaskMeta) + if err != nil { + return false, err + } + subTaskMetas = append(subTaskMetas, metaBytes) } - subTaskMetas = append(subTaskMetas, metaBytes) + return false, nil + }) + + if err != nil { + return nil, errors.Trace(err) + } + if len(subTaskMetas) == 0 { + return nil, errors.Errorf("regions are not continuous") } return subTaskMetas, nil }