diff --git a/pkg/ddl/callback.go b/pkg/ddl/callback.go index 7ce8b4d5e0382..8787b08d306b5 100644 --- a/pkg/ddl/callback.go +++ b/pkg/ddl/callback.go @@ -60,9 +60,9 @@ type Callback interface { // OnWatched is called after watching owner is completed. OnWatched(ctx context.Context) // OnGetJobBefore is called before getting job. - OnGetJobBefore(jobType string) + OnGetJobBefore() // OnGetJobAfter is called after getting job. - OnGetJobAfter(jobType string, job *model.Job) + OnGetJobAfter(job *model.Job) } // BaseCallback implements Callback.OnChanged interface. @@ -100,12 +100,12 @@ func (*BaseCallback) OnWatched(_ context.Context) { } // OnGetJobBefore implements Callback.OnGetJobBefore interface. -func (*BaseCallback) OnGetJobBefore(_ string) { +func (*BaseCallback) OnGetJobBefore() { // Nothing to do. } // OnGetJobAfter implements Callback.OnGetJobAfter interface. -func (*BaseCallback) OnGetJobAfter(_ string, _ *model.Job) { +func (*BaseCallback) OnGetJobAfter(_ *model.Job) { // Nothing to do. } diff --git a/pkg/ddl/column.go b/pkg/ddl/column.go index 2302b8dc2823d..05c26ebf41530 100644 --- a/pkg/ddl/column.go +++ b/pkg/ddl/column.go @@ -752,6 +752,7 @@ func (w *worker) doModifyColumnTypeWithData( return ver, errors.Trace(err) } job.SchemaState = model.StateWriteOnly + failpoint.InjectCall("afterModifyColumnStateDeleteOnly", job.ID) case model.StateWriteOnly: // write only -> reorganization updateChangingObjState(changingCol, changingIdxs, model.StateWriteReorganization) diff --git a/pkg/ddl/ddl_running_jobs.go b/pkg/ddl/ddl_running_jobs.go index 5c85ff5311878..d54c6582b54c6 100644 --- a/pkg/ddl/ddl_running_jobs.go +++ b/pkg/ddl/ddl_running_jobs.go @@ -233,11 +233,23 @@ func (j *runningJobs) addRunning(jobID int64, involves []model.InvolvingSchemaIn } } +func (j *runningJobs) finishOrPendJob(jobID int64, involves []model.InvolvingSchemaInfo, moveToPending bool) { + j.mu.Lock() + defer j.mu.Unlock() + j.removeRunningWithoutLock(jobID, involves) + if moveToPending { + j.addPendingWithoutLock(involves) + } +} + // removeRunning can be concurrently called with add and checkRunnable. func (j *runningJobs) removeRunning(jobID int64, involves []model.InvolvingSchemaInfo) { j.mu.Lock() defer j.mu.Unlock() + j.removeRunningWithoutLock(jobID, involves) +} +func (j *runningJobs) removeRunningWithoutLock(jobID int64, involves []model.InvolvingSchemaInfo) { if intest.InTest { if _, ok := j.ids[jobID]; !ok { panic(fmt.Sprintf("job %d is not running", jobID)) @@ -296,6 +308,10 @@ func (j *runningJobs) addPending(involves []model.InvolvingSchemaInfo) { j.mu.Lock() defer j.mu.Unlock() + j.addPendingWithoutLock(involves) +} + +func (j *runningJobs) addPendingWithoutLock(involves []model.InvolvingSchemaInfo) { for _, info := range involves { if info.Database != model.InvolvingNone { if _, ok := j.pending.schemas[info.Database]; !ok { diff --git a/pkg/ddl/ddl_worker_test.go b/pkg/ddl/ddl_worker_test.go index 32ee893dafa9f..ed41a881b1e38 100644 --- a/pkg/ddl/ddl_worker_test.go +++ b/pkg/ddl/ddl_worker_test.go @@ -135,7 +135,7 @@ func TestParallelDDL(t *testing.T) { } once1 := sync.Once{} - tc.OnGetJobBeforeExported = func(string) { + tc.OnGetJobBeforeExported = func() { once1.Do(func() { for { tk := testkit.NewTestKit(t, store) diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index c25ffc26d4922..ab9cb8ac97139 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -185,51 +185,47 @@ func (s *jobScheduler) close() { } // getJob reads tidb_ddl_job and returns the first runnable DDL job. -func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error) { +func (s *jobScheduler) getJob(se *sess.Session) (*model.Job, bool, error) { defer s.runningJobs.resetAllPending() - not := "not" - label := "get_job_general" - if tp == jobTypeReorg { - not = "" - label = "get_job_reorg" - } - const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in + const getJobSQL = `select job_meta, processing, reorg from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) - and %s reorg %s order by processing desc, job_id` + %s order by processing desc, job_id` var excludedJobIDs string if ids := s.runningJobs.allIDs(); len(ids) > 0 { excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids) } - sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs) - rows, err := se.Execute(context.Background(), sql, label) + sql := fmt.Sprintf(getJobSQL, excludedJobIDs) + rows, err := se.Execute(context.Background(), sql, "get_job") if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } for _, row := range rows { jobBinary := row.GetBytes(0) isJobProcessing := row.GetInt64(1) == 1 + isReorg := row.GetInt64(2) != 0 job := model.Job{} err = job.Decode(jobBinary) if err != nil { - return nil, errors.Trace(err) + return nil, isReorg, errors.Trace(err) } + involving := job.GetInvolvingSchemaInfo() isRunnable, err := s.processJobDuringUpgrade(se, &job) if err != nil { - return nil, errors.Trace(err) + return nil, isReorg, errors.Trace(err) } if !isRunnable { + s.runningJobs.addPending(involving) continue } // The job has already been picked up, just return to continue it. if isJobProcessing { - return &job, nil + return &job, isReorg, nil } - involving := job.GetInvolvingSchemaInfo() if !s.runningJobs.checkRunnable(job.ID, involving) { s.runningJobs.addPending(involving) continue @@ -241,11 +237,11 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error) zap.Error(err), zap.Stringer("job", &job)) s.runningJobs.addPending(involving) - return nil, errors.Trace(err) + return nil, isReorg, errors.Trace(err) } - return &job, nil + return &job, isReorg, nil } - return nil, nil + return nil, false, nil } func hasSysDB(job *model.Job) bool { @@ -394,8 +390,7 @@ func (s *jobScheduler) startDispatch() error { continue } failpoint.InjectCall("beforeAllLoadDDLJobAndRun") - s.loadDDLJobAndRun(se, s.generalDDLWorkerPool, jobTypeGeneral) - s.loadDDLJobAndRun(se, s.reorgWorkerPool, jobTypeReorg) + s.loadDDLJobAndRun(se) } } @@ -436,30 +431,32 @@ func (s *jobScheduler) checkAndUpdateClusterState(needUpdate bool) error { return nil } -func (s *jobScheduler) loadDDLJobAndRun(se *sess.Session, pool *workerPool, tp jobType) { - wk, err := pool.get() - if err != nil || wk == nil { - logutil.DDLLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err)) - return - } - +func (s *jobScheduler) loadDDLJobAndRun(se *sess.Session) { s.mu.RLock() - s.mu.hook.OnGetJobBefore(pool.tp().String()) + s.mu.hook.OnGetJobBefore() s.mu.RUnlock() startTime := time.Now() - job, err := s.getJob(se, tp) + job, isReorg, err := s.getJob(se) if job == nil || err != nil { if err != nil { - wk.jobLogger(job).Warn("get job met error", zap.Duration("take time", time.Since(startTime)), zap.Error(err)) + logutil.DDLLogger().Warn("get job met error", zap.Duration("take time", time.Since(startTime)), zap.Error(err)) } - pool.put(wk) return } s.mu.RLock() - s.mu.hook.OnGetJobAfter(pool.tp().String(), job) + s.mu.hook.OnGetJobAfter(job) s.mu.RUnlock() + pool := s.generalDDLWorkerPool + if isReorg { + pool = s.reorgWorkerPool + } + wk, err := pool.get() + if err != nil || wk == nil { + logutil.DDLLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err)) + return + } s.delivery2Worker(wk, pool, job) } @@ -526,10 +523,18 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model. jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo() s.runningJobs.addRunning(jobID, involvedSchemaInfos) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() - s.wg.RunWithLog(func() { + s.wg.Run(func() { defer func() { + r := recover() + if r != nil { + logutil.DDLLogger().Error("panic in delivery2Worker", zap.Any("recover", r), zap.Stack("stack")) + } failpoint.InjectCall("afterDelivery2Worker", job) - s.runningJobs.removeRunning(jobID, involvedSchemaInfos) + // Because there is a gap between `allIDs()` and `checkRunnable()`, + // we append unfinished job to pending atomically to prevent `getJob()` + // chosing another runnable job that involves the same schema object. + moveRunningJobsToPending := r != nil || (job != nil && !job.IsFinished()) + s.runningJobs.finishOrPendJob(jobID, involvedSchemaInfos, moveRunningJobsToPending) asyncNotify(s.ddlJobNotifyCh) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() pool.put(wk) diff --git a/pkg/ddl/job_table_test.go b/pkg/ddl/job_table_test.go index e09e0c08c9288..4de21498a1dfe 100644 --- a/pkg/ddl/job_table_test.go +++ b/pkg/ddl/job_table_test.go @@ -65,7 +65,7 @@ func TestDDLScheduling(t *testing.T) { var wg util.WaitGroupWrapper wg.Add(1) var once sync.Once - hook.OnGetJobBeforeExported = func(jobType string) { + hook.OnGetJobBeforeExported = func() { once.Do(func() { for i, job := range ddlJobs { wg.Run(func() { @@ -91,7 +91,7 @@ func TestDDLScheduling(t *testing.T) { } record := make([]int64, 0, 16) - hook.OnGetJobAfterExported = func(jobType string, job *model.Job) { + hook.OnGetJobAfterExported = func(job *model.Job) { // record the job schedule order record = append(record, job.ID) } diff --git a/pkg/ddl/tests/adminpause/BUILD.bazel b/pkg/ddl/tests/adminpause/BUILD.bazel index 34034000e12f4..3650c5b1c36c2 100644 --- a/pkg/ddl/tests/adminpause/BUILD.bazel +++ b/pkg/ddl/tests/adminpause/BUILD.bazel @@ -29,7 +29,7 @@ go_test( ], embed = [":adminpause"], flaky = True, - shard_count = 14, + shard_count = 15, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/ddl/tests/adminpause/pause_resume_test.go b/pkg/ddl/tests/adminpause/pause_resume_test.go index 95d7ee22bad8c..b5fb7b50c98b3 100644 --- a/pkg/ddl/tests/adminpause/pause_resume_test.go +++ b/pkg/ddl/tests/adminpause/pause_resume_test.go @@ -20,7 +20,9 @@ import ( "strconv" "sync" "testing" + "time" + "github.com/pingcap/failpoint" testddlutil "github.com/pingcap/tidb/pkg/ddl/testutil" "github.com/pingcap/tidb/pkg/ddl/util/callback" "github.com/pingcap/tidb/pkg/domain" @@ -123,7 +125,7 @@ func pauseResumeAndCancel(t *testing.T, stmtKit *testkit.TestKit, adminCommandKi var isCancelled = false var cancelResult []sqlexec.RecordSet var cancelErr error - var cancelFunc = func(jobType string) { + var cancelFunc = func() { adminCommandMutex.Lock() defer adminCommandMutex.Unlock() if isPaused && isResumed && !isCancelled { @@ -354,3 +356,60 @@ func TestPauseResumeCancelAndRerunPartitionTableStmt(t *testing.T) { Logger.Info("TestPauseResumeCancelAndRerunPartitionTableStmt: all cases finished.") } + +func TestPauseJobDependency(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 1);") + + afterPause := make(chan struct{}) + afterAddCol := make(chan struct{}) + startAddCol := make(chan struct{}) + var ( + modifyJobID int64 + errModCol error + errAddCol error + ) + once := sync.Once{} + failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterModifyColumnStateDeleteOnly", func(jobID int64) { + once.Do(func() { + modifyJobID = jobID + tk2.MustExec(fmt.Sprintf("admin pause ddl jobs %d", jobID)) + afterPause <- struct{}{} + }) + }) + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + // Will stuck because the job is paused. + errModCol = tk.ExecToErr("alter table t modify column b tinyint;") + }() + go func() { + defer wg.Done() + <-afterPause + // This should be blocked because they handle the same table. + startAddCol <- struct{}{} + errAddCol = tk2.ExecToErr("alter table t add column c int;") + afterAddCol <- struct{}{} + }() + <-startAddCol + select { + case <-afterAddCol: + t.Logf("add column DDL on same table should be blocked") + t.FailNow() + case <-time.After(3 * time.Second): + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use test") + tk3.MustExec(fmt.Sprintf("admin resume ddl jobs %d", modifyJobID)) + <-afterAddCol + } + wg.Wait() + require.NoError(t, errModCol) + require.NoError(t, errAddCol) +} diff --git a/pkg/ddl/util/callback/callback.go b/pkg/ddl/util/callback/callback.go index 457e91544b66a..7e21b3584ed9a 100644 --- a/pkg/ddl/util/callback/callback.go +++ b/pkg/ddl/util/callback/callback.go @@ -55,8 +55,8 @@ type TestDDLCallback struct { onJobUpdated func(*model.Job) OnJobUpdatedExported atomic.Pointer[func(*model.Job)] onWatched func(ctx context.Context) - OnGetJobBeforeExported func(string) - OnGetJobAfterExported func(string, *model.Job) + OnGetJobBeforeExported func() + OnGetJobAfterExported func(*model.Job) OnJobSchemaStateChanged func(int64) OnUpdateReorgInfoExported func(job *model.Job, pid int64) @@ -146,21 +146,21 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) { } // OnGetJobBefore implements Callback.OnGetJobBefore interface. -func (tc *TestDDLCallback) OnGetJobBefore(jobType string) { +func (tc *TestDDLCallback) OnGetJobBefore() { if tc.OnGetJobBeforeExported != nil { - tc.OnGetJobBeforeExported(jobType) + tc.OnGetJobBeforeExported() return } - tc.BaseCallback.OnGetJobBefore(jobType) + tc.BaseCallback.OnGetJobBefore() } // OnGetJobAfter implements Callback.OnGetJobAfter interface. -func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) { +func (tc *TestDDLCallback) OnGetJobAfter(job *model.Job) { if tc.OnGetJobAfterExported != nil { - tc.OnGetJobAfterExported(jobType, job) + tc.OnGetJobAfterExported(job) return } - tc.BaseCallback.OnGetJobAfter(jobType, job) + tc.BaseCallback.OnGetJobAfter(job) } // Clone copies the callback and take its reference