diff --git a/ddl/ddl.go b/ddl/ddl.go index 0318bb563a9af..cf9919893a4ea 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -153,6 +153,7 @@ const ( OnExistReplace jobRecordCapacity = 16 + jobOnceCapacity = 1000 ) var ( @@ -288,14 +289,14 @@ type waitSchemaSyncedController struct { mu sync.RWMutex job map[int64]struct{} - // true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job. - once *atomicutil.Bool + // Use to check if the DDL job is the first run on this owner. + onceMap map[int64]struct{} } func newWaitSchemaSyncedController() *waitSchemaSyncedController { return &waitSchemaSyncedController{ - job: make(map[int64]struct{}, jobRecordCapacity), - once: atomicutil.NewBool(true), + job: make(map[int64]struct{}, jobRecordCapacity), + onceMap: make(map[int64]struct{}, jobOnceCapacity), } } @@ -318,6 +319,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) { delete(w.job, job.ID) } +// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner. +// Returns false means that the job must not be the first run on this owner. +func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool { + w.mu.Lock() + defer w.mu.Unlock() + _, ok := w.onceMap[id] + return ok +} + +func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) { + w.mu.Lock() + defer w.mu.Unlock() + if len(w.onceMap) > jobOnceCapacity { + // If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok. + w.onceMap = make(map[int64]struct{}, jobRecordCapacity) + } + w.onceMap[id] = struct{}{} +} + // ddlCtx is the context when we use worker to handle DDL jobs. type ddlCtx struct { ctx context.Context diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 30782fa694985..6ae8951a4543b 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -818,17 +818,23 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { } w.registerSync(job) - if runJobErr != nil { + if runJobErr != nil && !dbterror.ErrPausedDDLJob.Equal(runJobErr) { // Omit the ErrPausedDDLJob - if !dbterror.ErrPausedDDLJob.Equal(runJobErr) { + w.jobLogger(job).Info("run DDL job failed, sleeps a while then retries it.", + zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr)) + // In test and job is cancelling we can ignore the sleep. + if !(intest.InTest && job.IsCancelling()) { // wait a while to retry again. If we don't wait here, DDL will retry this job immediately, // which may act like a deadlock. +<<<<<<< HEAD logutil.Logger(w.logCtx).Info("[ddl] run DDL job failed, sleeps a while then retries it.", zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr)) } // In test and job is cancelling we can ignore the sleep if !(intest.InTest && job.IsCancelling()) { +======= +>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921)) time.Sleep(GetWaitTimeWhenErrorOccurred()) } } @@ -1165,14 +1171,14 @@ func toTError(err error) *terror.Error { return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error()) } -// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens, +// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens, // we wait at most 2 * lease time(sessionTTL, 90 seconds). -func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) { +func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error { if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() { - return + return nil } if waitTime == 0 { - return + return nil } timeStart := time.Now() @@ -1182,30 +1188,48 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in }() if latestSchemaVersion == 0 { +<<<<<<< HEAD logutil.Logger(d.ctx).Info("[ddl] schema version doesn't change") return +======= + logutil.Logger(d.ctx).Info("schema version doesn't change", zap.String("category", "ddl")) + return nil +>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921)) } err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion) if err != nil { +<<<<<<< HEAD logutil.Logger(d.ctx).Info("[ddl] update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) +======= + logutil.Logger(d.ctx).Info("update latest schema version failed", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + if variable.EnableMDL.Load() { + return err + } +>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921)) if terror.ErrorEqual(err, context.DeadlineExceeded) { // If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker. // There is no need to use etcd to sync. The function returns directly. - return + return nil } } // OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB). err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion) if err != nil { +<<<<<<< HEAD logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) return +======= + logutil.Logger(d.ctx).Info("wait latest schema version encounter error", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + return err +>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921)) } logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", zap.Int64("ver", latestSchemaVersion), zap.Duration("take time", time.Since(timeStart)), zap.String("job", job.String())) + return nil } // waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. @@ -1264,8 +1288,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error { } }) - waitSchemaChanged(d, waitTime, latestSchemaVersion, job) - return nil + return waitSchemaChanged(d, waitTime, latestSchemaVersion, job) } func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption { diff --git a/ddl/job_table.go b/ddl/job_table.go index 1f6ca6286057b..353fe508a0aeb 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -275,7 +275,7 @@ func (d *ddl) startDispatchLoop() { } if !d.isOwner() { isOnce = true - d.once.Store(true) + d.onceMap = make(map[int64]struct{}, jobOnceCapacity) time.Sleep(dispatchLoopWaitingDuration) continue } @@ -382,7 +382,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() }() // check if this ddl job is synced to all servers. - if !d.isSynced(job) || d.once.Load() { + if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) { if variable.EnableMDL.Load() { exist, version, err := checkMDLInfo(job.ID, d.sessPool) if err != nil { @@ -397,7 +397,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { if err != nil { return } - d.once.Store(false) + d.setAlreadyRunOnce(job.ID) cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) // Don't have a worker now. return @@ -411,7 +411,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { pool.put(wk) return } - d.once.Store(false) + d.setAlreadyRunOnce(job.ID) } } @@ -430,9 +430,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { }) // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. - // If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update + // If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update // the newest schema. - waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + if err != nil { + // May be caused by server closing, shouldn't clean the MDL info. + logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err)) + return + } cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) d.synced(job) diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index 194fa5e41f55b..794e24cb1441e 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -445,6 +445,112 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) { checkDDLJobExecSucc(t, seLatestV, jobID) } +<<<<<<< HEAD +======= +func TestUpgradeVersionForResumeJob(t *testing.T) { + store, dom := session.CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + seV := session.CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1)) + session.UnsetStoreBootstrapped(store.UUID()) + ver, err := session.GetBootstrapVersion(seV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion-1, ver) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/mockResumeAllJobsFailed", `return(true)`)) + defer failpoint.Disable("github.com/pingcap/tidb/session/mockResumeAllJobsFailed") + + // Add a paused DDL job before upgrade. + session.MustExec(t, seV, "create table test.upgrade_tbl(a int, b int)") + session.MustExec(t, seV, "create table test.upgrade_tbl1(a int, b int)") + ch := make(chan struct{}) + hook := &callback.TestDDLCallback{} + var jobID int64 + doOnce := true + hook.OnGetJobBeforeExported = func(str string) { + if jobID == 0 || !doOnce { + return + } + + for i := 0; i < 50; i++ { + sql := fmt.Sprintf("admin show ddl jobs where job_id=%d or job_id=%d", jobID, jobID+1) + se := session.CreateSessionAndSetID(t, store) + rows, err := execute(context.Background(), se, sql) + require.NoError(t, err) + if len(rows) == 2 { + doOnce = false + break + } + time.Sleep(100 * time.Millisecond) + } + } + wg := sync.WaitGroup{} + wg.Add(1) + times := 0 + hook.OnGetJobAfterExported = func(tp string, job *model.Job) { + if job.SchemaState == model.StateWriteOnly && times == 0 { + ch <- struct{}{} + jobID = job.ID + times = 1 + } + if job.ID == jobID && job.State == model.JobStateDone && job.SchemaState == model.StatePublic { + wg.Done() + } + } + + dom.DDL().SetHook(hook) + go func() { + // This "add index" job will be paused when upgrading. + _, _ = execute(context.Background(), seV, "alter table test.upgrade_tbl add index idx(a)") + }() + + <-ch + dom.Close() + // Make sure upgrade is successful. + startUpgrade(store) + domLatestV, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domLatestV.Close() + domLatestV.DDL().SetHook(hook) + finishUpgrade(store) + seLatestV := session.CreateSessionAndSetID(t, store) + // Add a new DDL (an "add index" job uses a different table than the previous DDL job) to the DDL table. + session.MustExec(t, seLatestV, "alter table test.upgrade_tbl1 add index idx2(a)") + ver, err = session.GetBootstrapVersion(seLatestV) + require.NoError(t, err) + require.Equal(t, session.CurrentBootstrapVersion, ver) + + wg.Wait() + // Make sure the second add index operation is successful. + sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id=%d or job_id=%d order by job_id", jobID, jobID+1) + rows, err := execute(context.Background(), seLatestV, sql) + require.NoError(t, err) + require.Len(t, rows, 2) + var idxFinishTS uint64 + for i, row := range rows { + jobBinary := row.GetBytes(0) + runJob := model.Job{} + err := runJob.Decode(jobBinary) + require.NoError(t, err) + require.True(t, strings.Contains(runJob.TableName, "upgrade_tbl")) + require.Equal(t, model.JobStateSynced.String(), runJob.State.String()) + if i == 0 { + idxFinishTS = runJob.BinlogInfo.FinishedTS + } else { + require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS) + } + } +} + +>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921)) func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.Row, error) { ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) rs, err := s.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query)