Skip to content

Commit

Permalink
ddl: consider paused job when check runnable (#54419) (#54431)
Browse files Browse the repository at this point in the history
ref #53246, close #54383
  • Loading branch information
ti-chi-bot authored Jul 4, 2024
1 parent dc816da commit 1b84f38
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 52 deletions.
8 changes: 4 additions & 4 deletions pkg/ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ type Callback interface {
// OnWatched is called after watching owner is completed.
OnWatched(ctx context.Context)
// OnGetJobBefore is called before getting job.
OnGetJobBefore(jobType string)
OnGetJobBefore()
// OnGetJobAfter is called after getting job.
OnGetJobAfter(jobType string, job *model.Job)
OnGetJobAfter(job *model.Job)
}

// BaseCallback implements Callback.OnChanged interface.
Expand Down Expand Up @@ -100,12 +100,12 @@ func (*BaseCallback) OnWatched(_ context.Context) {
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (*BaseCallback) OnGetJobBefore(_ string) {
func (*BaseCallback) OnGetJobBefore() {
// Nothing to do.
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (*BaseCallback) OnGetJobAfter(_ string, _ *model.Job) {
func (*BaseCallback) OnGetJobAfter(_ *model.Job) {
// Nothing to do.
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
failpoint.InjectCall("afterModifyColumnStateDeleteOnly", job.ID)
case model.StateWriteOnly:
// write only -> reorganization
updateChangingObjState(changingCol, changingIdxs, model.StateWriteReorganization)
Expand Down
16 changes: 16 additions & 0 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,23 @@ func (j *runningJobs) addRunning(jobID int64, involves []model.InvolvingSchemaIn
}
}

func (j *runningJobs) finishOrPendJob(jobID int64, involves []model.InvolvingSchemaInfo, moveToPending bool) {
j.mu.Lock()
defer j.mu.Unlock()
j.removeRunningWithoutLock(jobID, involves)
if moveToPending {
j.addPendingWithoutLock(involves)
}
}

// removeRunning can be concurrently called with add and checkRunnable.
func (j *runningJobs) removeRunning(jobID int64, involves []model.InvolvingSchemaInfo) {
j.mu.Lock()
defer j.mu.Unlock()
j.removeRunningWithoutLock(jobID, involves)
}

func (j *runningJobs) removeRunningWithoutLock(jobID int64, involves []model.InvolvingSchemaInfo) {
if intest.InTest {
if _, ok := j.ids[jobID]; !ok {
panic(fmt.Sprintf("job %d is not running", jobID))
Expand Down Expand Up @@ -296,6 +308,10 @@ func (j *runningJobs) addPending(involves []model.InvolvingSchemaInfo) {
j.mu.Lock()
defer j.mu.Unlock()

j.addPendingWithoutLock(involves)
}

func (j *runningJobs) addPendingWithoutLock(involves []model.InvolvingSchemaInfo) {
for _, info := range involves {
if info.Database != model.InvolvingNone {
if _, ok := j.pending.schemas[info.Database]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestParallelDDL(t *testing.T) {
}

once1 := sync.Once{}
tc.OnGetJobBeforeExported = func(string) {
tc.OnGetJobBeforeExported = func() {
once1.Do(func() {
for {
tk := testkit.NewTestKit(t, store)
Expand Down
75 changes: 40 additions & 35 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,51 +185,47 @@ func (s *jobScheduler) close() {
}

// getJob reads tidb_ddl_job and returns the first runnable DDL job.
func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error) {
func (s *jobScheduler) getJob(se *sess.Session) (*model.Job, bool, error) {
defer s.runningJobs.resetAllPending()

not := "not"
label := "get_job_general"
if tp == jobTypeReorg {
not = ""
label = "get_job_reorg"
}
const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in
const getJobSQL = `select job_meta, processing, reorg from mysql.tidb_ddl_job where job_id in
(select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing)
and %s reorg %s order by processing desc, job_id`
%s order by processing desc, job_id`
var excludedJobIDs string
if ids := s.runningJobs.allIDs(); len(ids) > 0 {
excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids)
}
sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs)
rows, err := se.Execute(context.Background(), sql, label)
sql := fmt.Sprintf(getJobSQL, excludedJobIDs)
rows, err := se.Execute(context.Background(), sql, "get_job")
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
for _, row := range rows {
jobBinary := row.GetBytes(0)
isJobProcessing := row.GetInt64(1) == 1
isReorg := row.GetInt64(2) != 0

job := model.Job{}
err = job.Decode(jobBinary)
if err != nil {
return nil, errors.Trace(err)
return nil, isReorg, errors.Trace(err)
}

involving := job.GetInvolvingSchemaInfo()
isRunnable, err := s.processJobDuringUpgrade(se, &job)
if err != nil {
return nil, errors.Trace(err)
return nil, isReorg, errors.Trace(err)
}
if !isRunnable {
s.runningJobs.addPending(involving)
continue
}

// The job has already been picked up, just return to continue it.
if isJobProcessing {
return &job, nil
return &job, isReorg, nil
}

involving := job.GetInvolvingSchemaInfo()
if !s.runningJobs.checkRunnable(job.ID, involving) {
s.runningJobs.addPending(involving)
continue
Expand All @@ -241,11 +237,11 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error)
zap.Error(err),
zap.Stringer("job", &job))
s.runningJobs.addPending(involving)
return nil, errors.Trace(err)
return nil, isReorg, errors.Trace(err)
}
return &job, nil
return &job, isReorg, nil
}
return nil, nil
return nil, false, nil
}

func hasSysDB(job *model.Job) bool {
Expand Down Expand Up @@ -394,8 +390,7 @@ func (s *jobScheduler) startDispatch() error {
continue
}
failpoint.InjectCall("beforeAllLoadDDLJobAndRun")
s.loadDDLJobAndRun(se, s.generalDDLWorkerPool, jobTypeGeneral)
s.loadDDLJobAndRun(se, s.reorgWorkerPool, jobTypeReorg)
s.loadDDLJobAndRun(se)
}
}

Expand Down Expand Up @@ -436,30 +431,32 @@ func (s *jobScheduler) checkAndUpdateClusterState(needUpdate bool) error {
return nil
}

func (s *jobScheduler) loadDDLJobAndRun(se *sess.Session, pool *workerPool, tp jobType) {
wk, err := pool.get()
if err != nil || wk == nil {
logutil.DDLLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err))
return
}

func (s *jobScheduler) loadDDLJobAndRun(se *sess.Session) {
s.mu.RLock()
s.mu.hook.OnGetJobBefore(pool.tp().String())
s.mu.hook.OnGetJobBefore()
s.mu.RUnlock()

startTime := time.Now()
job, err := s.getJob(se, tp)
job, isReorg, err := s.getJob(se)
if job == nil || err != nil {
if err != nil {
wk.jobLogger(job).Warn("get job met error", zap.Duration("take time", time.Since(startTime)), zap.Error(err))
logutil.DDLLogger().Warn("get job met error", zap.Duration("take time", time.Since(startTime)), zap.Error(err))
}
pool.put(wk)
return
}
s.mu.RLock()
s.mu.hook.OnGetJobAfter(pool.tp().String(), job)
s.mu.hook.OnGetJobAfter(job)
s.mu.RUnlock()

pool := s.generalDDLWorkerPool
if isReorg {
pool = s.reorgWorkerPool
}
wk, err := pool.get()
if err != nil || wk == nil {
logutil.DDLLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err))
return
}
s.delivery2Worker(wk, pool, job)
}

Expand Down Expand Up @@ -526,10 +523,18 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.
jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo()
s.runningJobs.addRunning(jobID, involvedSchemaInfos)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc()
s.wg.RunWithLog(func() {
s.wg.Run(func() {
defer func() {
r := recover()
if r != nil {
logutil.DDLLogger().Error("panic in delivery2Worker", zap.Any("recover", r), zap.Stack("stack"))
}
failpoint.InjectCall("afterDelivery2Worker", job)
s.runningJobs.removeRunning(jobID, involvedSchemaInfos)
// Because there is a gap between `allIDs()` and `checkRunnable()`,
// we append unfinished job to pending atomically to prevent `getJob()`
// chosing another runnable job that involves the same schema object.
moveRunningJobsToPending := r != nil || (job != nil && !job.IsFinished())
s.runningJobs.finishOrPendJob(jobID, involvedSchemaInfos, moveRunningJobsToPending)
asyncNotify(s.ddlJobNotifyCh)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
pool.put(wk)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDDLScheduling(t *testing.T) {
var wg util.WaitGroupWrapper
wg.Add(1)
var once sync.Once
hook.OnGetJobBeforeExported = func(jobType string) {
hook.OnGetJobBeforeExported = func() {
once.Do(func() {
for i, job := range ddlJobs {
wg.Run(func() {
Expand All @@ -91,7 +91,7 @@ func TestDDLScheduling(t *testing.T) {
}

record := make([]int64, 0, 16)
hook.OnGetJobAfterExported = func(jobType string, job *model.Job) {
hook.OnGetJobAfterExported = func(job *model.Job) {
// record the job schedule order
record = append(record, job.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/adminpause/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ go_test(
],
embed = [":adminpause"],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
61 changes: 60 additions & 1 deletion pkg/ddl/tests/adminpause/pause_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
testddlutil "github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
Expand Down Expand Up @@ -123,7 +125,7 @@ func pauseResumeAndCancel(t *testing.T, stmtKit *testkit.TestKit, adminCommandKi
var isCancelled = false
var cancelResult []sqlexec.RecordSet
var cancelErr error
var cancelFunc = func(jobType string) {
var cancelFunc = func() {
adminCommandMutex.Lock()
defer adminCommandMutex.Unlock()
if isPaused && isResumed && !isCancelled {
Expand Down Expand Up @@ -354,3 +356,60 @@ func TestPauseResumeCancelAndRerunPartitionTableStmt(t *testing.T) {

Logger.Info("TestPauseResumeCancelAndRerunPartitionTableStmt: all cases finished.")
}

func TestPauseJobDependency(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")

afterPause := make(chan struct{})
afterAddCol := make(chan struct{})
startAddCol := make(chan struct{})
var (
modifyJobID int64
errModCol error
errAddCol error
)
once := sync.Once{}
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterModifyColumnStateDeleteOnly", func(jobID int64) {
once.Do(func() {
modifyJobID = jobID
tk2.MustExec(fmt.Sprintf("admin pause ddl jobs %d", jobID))
afterPause <- struct{}{}
})
})
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
// Will stuck because the job is paused.
errModCol = tk.ExecToErr("alter table t modify column b tinyint;")
}()
go func() {
defer wg.Done()
<-afterPause
// This should be blocked because they handle the same table.
startAddCol <- struct{}{}
errAddCol = tk2.ExecToErr("alter table t add column c int;")
afterAddCol <- struct{}{}
}()
<-startAddCol
select {
case <-afterAddCol:
t.Logf("add column DDL on same table should be blocked")
t.FailNow()
case <-time.After(3 * time.Second):
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")
tk3.MustExec(fmt.Sprintf("admin resume ddl jobs %d", modifyJobID))
<-afterAddCol
}
wg.Wait()
require.NoError(t, errModCol)
require.NoError(t, errAddCol)
}
16 changes: 8 additions & 8 deletions pkg/ddl/util/callback/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type TestDDLCallback struct {
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnGetJobBeforeExported func()
OnGetJobAfterExported func(*model.Job)
OnJobSchemaStateChanged func(int64)

OnUpdateReorgInfoExported func(job *model.Job, pid int64)
Expand Down Expand Up @@ -146,21 +146,21 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
func (tc *TestDDLCallback) OnGetJobBefore() {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
tc.OnGetJobBeforeExported()
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
tc.BaseCallback.OnGetJobBefore()
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
func (tc *TestDDLCallback) OnGetJobAfter(job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
tc.OnGetJobAfterExported(job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
tc.BaseCallback.OnGetJobAfter(job)
}

// Clone copies the callback and take its reference
Expand Down

0 comments on commit 1b84f38

Please sign in to comment.