From a948ec8b06fac2e78788adf4e87ee3768150acb5 Mon Sep 17 00:00:00 2001 From: wjHuang Date: Wed, 20 Sep 2023 14:33:14 +0800 Subject: [PATCH] ddl: cherrypick some PRs to fix MDL bug (#47079) close pingcap/tidb#46920 --- ddl/ddl.go | 28 ++++++++++++++++++++++++---- ddl/ddl_worker.go | 30 ++++++++++++++++-------------- ddl/job_table.go | 17 +++++++++++------ ddl/syncer/syncer.go | 6 ++++-- 4 files changed, 55 insertions(+), 26 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index a7412cb047412..4d9ca95813aba 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -152,6 +152,7 @@ const ( OnExistReplace jobRecordCapacity = 16 + jobOnceCapacity = 1000 ) var ( @@ -286,14 +287,14 @@ type waitSchemaSyncedController struct { mu sync.RWMutex job map[int64]struct{} - // true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job. - once *atomicutil.Bool + // Use to check if the DDL job is the first run on this owner. + onceMap map[int64]struct{} } func newWaitSchemaSyncedController() *waitSchemaSyncedController { return &waitSchemaSyncedController{ - job: make(map[int64]struct{}, jobRecordCapacity), - once: atomicutil.NewBool(true), + job: make(map[int64]struct{}, jobRecordCapacity), + onceMap: make(map[int64]struct{}, jobOnceCapacity), } } @@ -316,6 +317,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) { delete(w.job, job.ID) } +// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner. +// Returns false means that the job must not be the first run on this owner. +func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool { + w.mu.Lock() + defer w.mu.Unlock() + _, ok := w.onceMap[id] + return ok +} + +func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) { + w.mu.Lock() + defer w.mu.Unlock() + if len(w.onceMap) > jobOnceCapacity { + // If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok. + w.onceMap = make(map[int64]struct{}, jobRecordCapacity) + } + w.onceMap[id] = struct{}{} +} + // ddlCtx is the context when we use worker to handle DDL jobs. type ddlCtx struct { ctx context.Context diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index b5cd5c9c06c6a..07b49da7dbe7a 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1052,8 +1052,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. // If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update // the newest schema. - waitSchemaChanged(context.Background(), d, waitTime, schemaVer, job) - + _ = waitSchemaChanged(d, waitTime, schemaVer, job) if RunInGoTest { // d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. d.mu.RLock() @@ -1355,14 +1354,14 @@ func toTError(err error) *terror.Error { return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error()) } -// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens, +// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens, // we wait at most 2 * lease time(sessionTTL, 90 seconds). -func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) { +func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error { if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() { - return + return nil } if waitTime == 0 { - return + return nil } timeStart := time.Now() @@ -1373,29 +1372,33 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l if latestSchemaVersion == 0 { logutil.Logger(d.ctx).Info("[ddl] schema version doesn't change") - return + return nil } - err = d.schemaSyncer.OwnerUpdateGlobalVersion(ctx, latestSchemaVersion) + err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion) if err != nil { logutil.Logger(d.ctx).Info("[ddl] update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + if variable.EnableMDL.Load() { + return err + } if terror.ErrorEqual(err, context.DeadlineExceeded) { // If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker. // There is no need to use etcd to sync. The function returns directly. - return + return nil } } // OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB). - err = d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion) + err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion) if err != nil { logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) - return + return err } logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", zap.Int64("ver", latestSchemaVersion), zap.Duration("take time", time.Since(timeStart)), zap.String("job", job.String())) + return nil } // waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. @@ -1412,7 +1415,7 @@ func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64 timeStart := time.Now() // OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB). - err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion) + err := d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion) if err != nil { logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err)) return err @@ -1454,8 +1457,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error { } }) - waitSchemaChanged(context.Background(), d, waitTime, latestSchemaVersion, job) - return nil + return waitSchemaChanged(d, waitTime, latestSchemaVersion, job) } func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption { diff --git a/ddl/job_table.go b/ddl/job_table.go index 268228df82035..3534d70a31c96 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -174,7 +174,7 @@ func (d *ddl) startDispatchLoop() { return } if !variable.EnableConcurrentDDL.Load() || !d.isOwner() || d.waiting.Load() { - d.once.Store(true) + d.onceMap = make(map[int64]struct{}, jobOnceCapacity) time.Sleep(time.Second) continue } @@ -234,7 +234,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() }() // check if this ddl job is synced to all servers. - if !d.isSynced(job) || d.once.Load() { + if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) { if variable.EnableMDL.Load() { exist, version, err := checkMDLInfo(job.ID, d.sessPool) if err != nil { @@ -249,7 +249,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { if err != nil { return } - d.once.Store(false) + d.setAlreadyRunOnce(job.ID) cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) // Don't have a worker now. return @@ -263,7 +263,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { pool.put(wk) return } - d.once.Store(false) + d.setAlreadyRunOnce(job.ID) } } @@ -282,9 +282,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { }) // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. - // If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update + // If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update // the newest schema. - waitSchemaChanged(context.Background(), d.ddlCtx, d.lease*2, schemaVer, job) + err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + if err != nil { + // May be caused by server closing, shouldn't clean the MDL info. + logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err)) + return + } cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) d.synced(job) diff --git a/ddl/syncer/syncer.go b/ddl/syncer/syncer.go index c46e744e3ba1f..7a9cf9ff63804 100644 --- a/ddl/syncer/syncer.go +++ b/ddl/syncer/syncer.go @@ -310,13 +310,15 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i if variable.EnableMDL.Load() { for _, kv := range resp.Kvs { key := string(kv.Key) + tidbIDInResp := key[strings.LastIndex(key, "/")+1:] ver, err := strconv.Atoi(string(kv.Value)) if err != nil { logutil.BgLogger().Info("[ddl] syncer check all versions, convert value to int failed, continue checking.", zap.String("ddl", string(kv.Key)), zap.String("value", string(kv.Value)), zap.Error(err)) succ = false break } - if int64(ver) < latestVer { + // We need to check if the tidb ID is in the updatedMap, in case that deleting etcd is failed, and tidb server is down. + if int64(ver) < latestVer && updatedMap[tidbIDInResp] != "" { if notMatchVerCnt%intervalCnt == 0 { logutil.BgLogger().Info("[ddl] syncer check all versions, someone is not synced, continue checking", zap.String("ddl", string(kv.Key)), zap.Int("currentVer", ver), zap.Int64("latestVer", latestVer)) @@ -325,7 +327,7 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i notMatchVerCnt++ break } - delete(updatedMap, key[strings.LastIndex(key, "/")+1:]) + delete(updatedMap, tidbIDInResp) } if len(updatedMap) > 0 { succ = false