Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: define TaskType/TaskState/Step instead of string/int64 #47611

Merged
merged 13 commits into from
Oct 16, 2023
4 changes: 2 additions & 2 deletions ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch(
ctx context.Context,
taskHandle dispatcher.TaskHandle,
gTask *proto.Task,
step int64,
step proto.Step,
) (taskMeta [][]byte, err error) {
var gTaskMeta BackfillGlobalMeta
if err := json.Unmarshal(gTask.Meta, &gTaskMeta); err != nil {
Expand Down Expand Up @@ -109,7 +109,7 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch(
func (*backfillingDispatcherExt) GetNextStep(
taskHandle dispatcher.TaskHandle,
task *proto.Task,
) int64 {
) proto.Step {
switch task.Step {
case proto.StepInit:
return proto.StepOne
Expand Down
8 changes: 4 additions & 4 deletions ddl/backfilling_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestBackfillingDispatcher(t *testing.T) {
"PARTITION p1 VALUES LESS THAN (100),\n" +
"PARTITION p2 VALUES LESS THAN (1000),\n" +
"PARTITION p3 VALUES LESS THAN MAXVALUE\n);")
gTask := createAddIndexGlobalTask(t, dom, "test", "tp1", ddl.BackfillTaskType)
gTask := createAddIndexGlobalTask(t, dom, "test", "tp1", proto.Backfill)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp1"))
require.NoError(t, err)
tblInfo := tbl.Meta()
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestBackfillingDispatcher(t *testing.T) {
/// 2. test non partition table.
// 2.1 empty table
tk.MustExec("create table t1(id int primary key, v int)")
gTask = createAddIndexGlobalTask(t, dom, "test", "t1", ddl.BackfillTaskType)
gTask = createAddIndexGlobalTask(t, dom, "test", "t1", proto.Backfill)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
require.Equal(t, 0, len(metas))
Expand All @@ -97,7 +97,7 @@ func TestBackfillingDispatcher(t *testing.T) {
tk.MustExec("insert into t2 values (), (), (), (), (), ()")
tk.MustExec("insert into t2 values (), (), (), (), (), ()")
tk.MustExec("insert into t2 values (), (), (), (), (), ()")
gTask = createAddIndexGlobalTask(t, dom, "test", "t2", ddl.BackfillTaskType)
gTask = createAddIndexGlobalTask(t, dom, "test", "t2", proto.Backfill)
// 2.2.1 stepInit
gTask.Step = dsp.GetNextStep(nil, gTask)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
Expand All @@ -118,7 +118,7 @@ func TestBackfillingDispatcher(t *testing.T) {
require.Equal(t, 0, len(metas))
}

func createAddIndexGlobalTask(t *testing.T, dom *domain.Domain, dbName, tblName string, taskType string) *proto.Task {
func createAddIndexGlobalTask(t *testing.T, dom *domain.Domain, dbName, tblName string, taskType proto.TaskType) *proto.Task {
db, ok := dom.InfoSchema().SchemaByName(model.NewCIStr(dbName))
require.True(t, ok)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tblName))
Expand Down
5 changes: 1 addition & 4 deletions ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type BackfillSubTaskMeta struct {

// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
bc ingest.BackendCtx, stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) {
bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.SubtaskExecutor, error) {
bgm := &BackfillGlobalMeta{}
err := json.Unmarshal(taskMeta, bgm)
if err != nil {
Expand Down Expand Up @@ -96,9 +96,6 @@ func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
}
}

// BackfillTaskType is the type of backfill task.
const BackfillTaskType = "backfill"

type backfillDistScheduler struct {
*scheduler.BaseScheduler
d *ddl
Expand Down
6 changes: 3 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
ddlJobCh: make(chan struct{}, 100),
}

scheduler.RegisterTaskType(BackfillTaskType,
scheduler.RegisterTaskType(proto.Backfill,
func(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler {
return newBackfillDistScheduler(ctx, id, task, taskTable, d)
}, scheduler.WithSummary,
Expand All @@ -702,11 +702,11 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
if err != nil {
logutil.BgLogger().Warn("NewBackfillingDispatcherExt failed", zap.String("category", "ddl"), zap.Error(err))
} else {
dispatcher.RegisterDispatcherFactory(BackfillTaskType,
dispatcher.RegisterDispatcherFactory(proto.Backfill,
func(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher {
return newLitBackfillDispatcher(ctx, taskMgr, serverID, task, backFillDsp)
})
dispatcher.RegisterDispatcherCleanUpFactory(BackfillTaskType, newBackfillCleanUpS3)
dispatcher.RegisterDispatcherCleanUpFactory(proto.Backfill, newBackfillCleanUpS3)
}

// Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`.
Expand Down
44 changes: 23 additions & 21 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,32 +382,34 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
startTS = txn.StartTS()
return nil
})
if err == nil {
jobTasks := make([]*model.Job, 0, len(tasks))
for i, task := range tasks {
job := task.job
job.Version = currentVersion
job.StartTS = startTS
job.ID = ids[i]
setJobStateToQueueing(job)
if err != nil {
return errors.Trace(err)
}

if d.stateSyncer.IsUpgradingState() && !hasSysDB(job) {
if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil {
logutil.BgLogger().Warn("pause user DDL by system failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), zap.Error(err))
task.cacheErr = err
continue
}
logutil.BgLogger().Info("pause user DDL by system successful", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job))
}
jobTasks := make([]*model.Job, 0, len(tasks))
for i, task := range tasks {
job := task.job
job.Version = currentVersion
job.StartTS = startTS
job.ID = ids[i]
setJobStateToQueueing(job)

jobTasks = append(jobTasks, job)
injectModifyJobArgFailPoint(job)
if d.stateSyncer.IsUpgradingState() && !hasSysDB(job) {
if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil {
logutil.BgLogger().Warn("pause user DDL by system failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), zap.Error(err))
task.cacheErr = err
continue
}
logutil.BgLogger().Info("pause user DDL by system successful", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job))
}

se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err = insertDDLJobs2Table(sess.NewSession(se), true, jobTasks...)
jobTasks = append(jobTasks, job)
injectModifyJobArgFailPoint(job)
}
return errors.Trace(err)

se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)

return errors.Trace(insertDDLJobs2Table(sess.NewSession(se), true, jobTasks...))
}

func injectFailPointForGetJob(job *model.Job) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2049,7 +2049,7 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
return errors.New("do not support merge index")
}

taskType := BackfillTaskType
taskType := proto.Backfill
taskKey := fmt.Sprintf("ddl/%s/%d", taskType, reorgInfo.Job.ID)
g, ctx := errgroup.WithContext(context.Background())
done := make(chan struct{})
Expand Down
Loading