Skip to content

Commit

Permalink
ddl, model: support for dist-reorg on partitioned tables (#41145)
Browse files Browse the repository at this point in the history
close #41144
  • Loading branch information
zimulala authored Feb 9, 2023
1 parent 7f88e73 commit a22d690
Show file tree
Hide file tree
Showing 13 changed files with 715 additions and 270 deletions.
140 changes: 76 additions & 64 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand All @@ -61,12 +62,14 @@ const (
typeAddIndexMergeTmpWorker backfillerType = 3

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
updateInstanceLease = 25 * time.Second
genTaskBatch = 4096
minGenTaskBatch = 1024
minDistTaskCnt = 32
retrySQLTimes = 10
InstanceLease = 1 * time.Minute
updateInstanceLease = 25 * time.Second
genTaskBatch = 4096
genPhysicalTableTaskBatch = 256
minGenTaskBatch = 1024
minGenPhysicalTableTaskBatch = 64
minDistTaskCnt = 64
retrySQLTimes = 10
)

// RetrySQLInterval is export for test.
Expand All @@ -89,15 +92,15 @@ func (bT backfillerType) String() string {

// BackfillJob is for a tidb_ddl_backfill table's record.
type BackfillJob struct {
ID int64
JobID int64
EleID int64
EleKey []byte
Tp backfillerType
State model.JobState
StoreID int64
InstanceID string
InstanceLease types.Time
ID int64
JobID int64
EleID int64
EleKey []byte
PhysicalTableID int64
Tp backfillerType
State model.JobState
InstanceID string
InstanceLease types.Time
// range info
CurrKey []byte
StartKey []byte
Expand Down Expand Up @@ -252,14 +255,13 @@ type reorgBackfillTask struct {
physicalTable table.PhysicalTable

// TODO: Remove the following fields after remove the function of run.
id int
physicalTableID int64
startKey kv.Key
endKey kv.Key
endInclude bool
jobID int64
sqlQuery string
priority int
id int
startKey kv.Key
endKey kv.Key
endInclude bool
jobID int64
sqlQuery string
priority int
}

func (r *reorgBackfillTask) getJobID() int64 {
Expand All @@ -278,7 +280,7 @@ func (r *reorgBackfillTask) excludedEndKey() kv.Key {
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
physicalID := strconv.FormatInt(r.physicalTable.GetPhysicalID(), 10)
startKey := hex.EncodeToString(r.startKey)
endKey := hex.EncodeToString(r.endKey)
rangeStr := "taskID_" + strconv.Itoa(r.id) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
Expand Down Expand Up @@ -366,6 +368,9 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
rc := d.getReorgCtx(jobID)

isDistReorg := task.bfJob != nil
if isDistReorg {
w.initPartitionIndexInfo(task)
}
for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillDataInTxn we will never cancel the job.
Expand Down Expand Up @@ -437,6 +442,15 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
return result
}

func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) {
if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok {
if addIdxWorker, ok := w.backfiller.(*addIndexWorker); ok {
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
addIdxWorker.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
}
}

func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResult) {
logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w), zap.String("task", task.String()))
defer util.Recover(metrics.LabelDDL, "backfillWorker.runTask", func() {
Expand Down Expand Up @@ -668,7 +682,6 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, batch)
physicalTableID := reorgInfo.PhysicalTableID
var prefix kv.Key
if reorgInfo.mergingTmpIdx {
prefix = t.IndexPrefix()
Expand All @@ -679,15 +692,15 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
job := reorgInfo.Job
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID)
jobCtx := reorgInfo.d.jobContext(job.ID)
for i, keyRange := range kvRanges {
startKey := keyRange.StartKey
endKey := keyRange.EndKey
endK, err := getRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, prefix, keyRange.StartKey, endKey)
if err != nil {
logutil.BgLogger().Info("[ddl] get backfill range task, get reverse key failed", zap.Error(err))
} else {
logutil.BgLogger().Info("[ddl] get backfill range task, change end key",
logutil.BgLogger().Info("[ddl] get backfill range task, change end key", zap.Int64("pTbl", phyTbl.GetPhysicalID()),
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
endKey = endK
}
Expand All @@ -699,13 +712,12 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
}

task := &reorgBackfillTask{
id: i,
jobID: reorgInfo.Job.ID,
physicalTableID: physicalTableID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
id: i,
jobID: job.ID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
// If the boundaries overlap, we should ignore the preceding endKey.
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1}
batchTasks = append(batchTasks, task)
Expand Down Expand Up @@ -1108,8 +1120,8 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
return nil
}

func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool,
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error {
func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, phyTblID int64, notDistTask bool,
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob) error {
bJobs = bJobs[:0]
instanceID := ""
if notDistTask {
Expand All @@ -1119,12 +1131,11 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
// TODO: Adjust the number of ranges(region) for each task.
for _, task := range batchTasks {
bm := &model.BackfillMeta{
PhysicalTableID: reorgInfo.PhysicalTableID,
IsUnique: isUnique,
EndInclude: task.endInclude,
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
SQLMode: reorgInfo.ReorgMeta.SQLMode,
Location: reorgInfo.ReorgMeta.Location,
IsUnique: sJobCtx.isUnique,
EndInclude: task.endInclude,
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
SQLMode: reorgInfo.ReorgMeta.SQLMode,
Location: reorgInfo.ReorgMeta.Location,
JobMeta: &model.JobMeta{
SchemaID: reorgInfo.Job.SchemaID,
TableID: reorgInfo.Job.TableID,
Expand All @@ -1133,19 +1144,19 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
},
}
bj := &BackfillJob{
ID: *id,
JobID: reorgInfo.Job.ID,
EleID: reorgInfo.currElement.ID,
EleKey: reorgInfo.currElement.TypeKey,
Tp: bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
ID: sJobCtx.currBackfillJobID.Add(1),
JobID: reorgInfo.Job.ID,
EleID: reorgInfo.currElement.ID,
EleKey: reorgInfo.currElement.TypeKey,
PhysicalTableID: phyTblID,
Tp: sJobCtx.bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
}
*id++
bJobs = append(bJobs, bj)
}
if err := AddBackfillJobs(sess, bJobs); err != nil {
Expand All @@ -1154,29 +1165,30 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
return nil
}

func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool,
bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error {
endKey := reorgInfo.EndKey
isFirstOps := true
bJobs := make([]*BackfillJob, 0, genTaskBatch)
func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, pTblMeta *BackfillJobRangeMeta) error {
isFirstOps := !sJobCtx.isMultiPhyTbl
batchSize := sJobCtx.batchSize
startKey, endKey := pTblMeta.StartKey, pTblMeta.EndKey
bJobs := make([]*BackfillJob, 0, batchSize)
for {
kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey, genTaskBatch)
kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize)
if err != nil {
return errors.Trace(err)
}
batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch)
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize)
if len(batchTasks) == 0 {
break
}
notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt)
if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil {
if err = addBatchBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta.PhyTblID, notNeedDistProcess, batchTasks, bJobs); err != nil {
return errors.Trace(err)
}
isFirstOps = false

remains := kvRanges[len(batchTasks):]
dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job")
logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table",
zap.Int64("physicalID", pTblMeta.PhyTblID),
zap.Int("batchTasksCnt", len(batchTasks)),
zap.Int("totalRegionCnt", len(kvRanges)),
zap.Int("remainRegionCnt", len(remains)),
Expand All @@ -1188,11 +1200,11 @@ func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo,
}

for {
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, pTblMeta.PhyTblID)
if err != nil {
return errors.Trace(err)
}
if bJobCnt < minGenTaskBatch {
if bJobCnt < sJobCtx.minBatchSize {
break
}
time.Sleep(RetrySQLInterval)
Expand Down
4 changes: 2 additions & 2 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
ddl_physical_tid bigint,
type int,
exec_id blob default null,
exec_lease timestamp,
Expand All @@ -74,7 +74,7 @@ const (
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
ddl_physical_tid bigint,
type int,
exec_id blob default null,
exec_lease timestamp,
Expand Down
3 changes: 3 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ var JobNeedGCForTest = jobNeedGC
// NewSession is only used for test.
var NewSession = newSession

// GetJobWithoutPartition is only used for test.
const GetJobWithoutPartition = getJobWithoutPartition

// GetDDLCtx returns ddlCtx for test.
func GetDDLCtx(d DDL) *ddlCtx {
return d.(*ddl).ddlCtx
Expand Down
Loading

0 comments on commit a22d690

Please sign in to comment.