Skip to content

Commit

Permalink
ddl, disttask: add scan-region check before dispatch subtasks (#51242)
Browse files Browse the repository at this point in the history
close #50895
  • Loading branch information
tangenta authored Feb 23, 2024
1 parent a84d57c commit 1d68ada
Showing 1 changed file with 61 additions and 26 deletions.
87 changes: 61 additions & 26 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"math"
"sort"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
diststorage "github.com/pingcap/tidb/pkg/disttask/framework/storage"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
Expand Down Expand Up @@ -256,6 +259,11 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error)
return subTaskMetas, nil
}

const (
scanRegionBackoffBase = 200 * time.Millisecond
scanRegionBackoffMax = 2 * time.Second
)

func generateNonPartitionPlan(
d *ddl,
tblInfo *model.TableInfo,
Expand All @@ -279,38 +287,65 @@ func generateNonPartitionPlan(
if err != nil {
return nil, errors.Trace(err)
}
regionCache := d.store.(helper.Storage).GetRegionCache()
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey)
if err != nil {
return nil, err
}
regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud)

subTaskMetas := make([][]byte, 0, 4)
sort.Slice(recordRegionMetas, func(i, j int) bool {
return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0
})
for i := 0; i < len(recordRegionMetas); i += regionBatch {
end := i + regionBatch
if end > len(recordRegionMetas) {
end = len(recordRegionMetas)
}
batch := recordRegionMetas[i:end]
subTaskMeta := &BackfillSubTaskMeta{
RowStart: batch[0].StartKey(),
RowEnd: batch[len(batch)-1].EndKey(),
backoffer := backoff.NewExponential(scanRegionBackoffBase, 2, scanRegionBackoffMax)
err = handle.RunWithRetry(d.ctx, 8, backoffer, logutil.Logger(d.ctx), func(_ context.Context) (bool, error) {
regionCache := d.store.(helper.Storage).GetRegionCache()
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey)
if err != nil {
return false, err
}
if i == 0 {
subTaskMeta.RowStart = startKey
sort.Slice(recordRegionMetas, func(i, j int) bool {
return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0
})

// Check if regions are continuous.
shouldRetry := false
cur := recordRegionMetas[0]
for _, m := range recordRegionMetas[1:] {
if !bytes.Equal(cur.EndKey(), m.StartKey()) {
shouldRetry = true
break
}
cur = m
}
if end == len(recordRegionMetas) {
subTaskMeta.RowEnd = endKey

if shouldRetry {
return true, nil
}
metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return nil, err

regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud)

for i := 0; i < len(recordRegionMetas); i += regionBatch {
end := i + regionBatch
if end > len(recordRegionMetas) {
end = len(recordRegionMetas)
}
batch := recordRegionMetas[i:end]
subTaskMeta := &BackfillSubTaskMeta{
RowStart: batch[0].StartKey(),
RowEnd: batch[len(batch)-1].EndKey(),
}
if i == 0 {
subTaskMeta.RowStart = startKey
}
if end == len(recordRegionMetas) {
subTaskMeta.RowEnd = endKey
}
metaBytes, err := json.Marshal(subTaskMeta)
if err != nil {
return false, err
}
subTaskMetas = append(subTaskMetas, metaBytes)
}
subTaskMetas = append(subTaskMetas, metaBytes)
return false, nil
})
if err != nil {
return nil, errors.Trace(err)
}
if len(subTaskMetas) == 0 {
return nil, errors.Errorf("regions are not continuous")
}
return subTaskMetas, nil
}
Expand Down

0 comments on commit 1d68ada

Please sign in to comment.