diff --git a/ddl/ddl.go b/ddl/ddl.go index 0318bb563a9af..cf9919893a4ea 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -153,6 +153,7 @@ const ( OnExistReplace jobRecordCapacity = 16 + jobOnceCapacity = 1000 ) var ( @@ -288,14 +289,14 @@ type waitSchemaSyncedController struct { mu sync.RWMutex job map[int64]struct{} - // true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job. - once *atomicutil.Bool + // Use to check if the DDL job is the first run on this owner. + onceMap map[int64]struct{} } func newWaitSchemaSyncedController() *waitSchemaSyncedController { return &waitSchemaSyncedController{ - job: make(map[int64]struct{}, jobRecordCapacity), - once: atomicutil.NewBool(true), + job: make(map[int64]struct{}, jobRecordCapacity), + onceMap: make(map[int64]struct{}, jobOnceCapacity), } } @@ -318,6 +319,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) { delete(w.job, job.ID) } +// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner. +// Returns false means that the job must not be the first run on this owner. +func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool { + w.mu.Lock() + defer w.mu.Unlock() + _, ok := w.onceMap[id] + return ok +} + +func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) { + w.mu.Lock() + defer w.mu.Unlock() + if len(w.onceMap) > jobOnceCapacity { + // If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok. + w.onceMap = make(map[int64]struct{}, jobRecordCapacity) + } + w.onceMap[id] = struct{}{} +} + // ddlCtx is the context when we use worker to handle DDL jobs. type ddlCtx struct { ctx context.Context diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 3863f196f4aa4..080b04145cc26 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1165,14 +1165,14 @@ func toTError(err error) *terror.Error { return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error()) } -// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens, +// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens, // we wait at most 2 * lease time(sessionTTL, 90 seconds). -func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) { +func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error { if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() { - return + return nil } if waitTime == 0 { - return + return nil } timeStart := time.Now() @@ -1183,16 +1183,19 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in if latestSchemaVersion == 0 { logutil.Logger(d.ctx).Info("[ddl] schema version doesn't change") - return + return nil } err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion) if err != nil { logutil.Logger(d.ctx).Info("[ddl] update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + if variable.EnableMDL.Load() { + return err + } if terror.ErrorEqual(err, context.DeadlineExceeded) { // If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker. // There is no need to use etcd to sync. The function returns directly. - return + return nil } } @@ -1200,12 +1203,13 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion) if err != nil { logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) - return + return err } logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", zap.Int64("ver", latestSchemaVersion), zap.Duration("take time", time.Since(timeStart)), zap.String("job", job.String())) + return nil } // waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. @@ -1264,8 +1268,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error { } }) - waitSchemaChanged(d, waitTime, latestSchemaVersion, job) - return nil + return waitSchemaChanged(d, waitTime, latestSchemaVersion, job) } func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption { diff --git a/ddl/job_table.go b/ddl/job_table.go index 1f6ca6286057b..353fe508a0aeb 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -275,7 +275,7 @@ func (d *ddl) startDispatchLoop() { } if !d.isOwner() { isOnce = true - d.once.Store(true) + d.onceMap = make(map[int64]struct{}, jobOnceCapacity) time.Sleep(dispatchLoopWaitingDuration) continue } @@ -382,7 +382,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() }() // check if this ddl job is synced to all servers. - if !d.isSynced(job) || d.once.Load() { + if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) { if variable.EnableMDL.Load() { exist, version, err := checkMDLInfo(job.ID, d.sessPool) if err != nil { @@ -397,7 +397,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { if err != nil { return } - d.once.Store(false) + d.setAlreadyRunOnce(job.ID) cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) // Don't have a worker now. return @@ -411,7 +411,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { pool.put(wk) return } - d.once.Store(false) + d.setAlreadyRunOnce(job.ID) } } @@ -430,9 +430,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { }) // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. - // If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update + // If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update // the newest schema. - waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + if err != nil { + // May be caused by server closing, shouldn't clean the MDL info. + logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err)) + return + } cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) d.synced(job)