Skip to content

Commit

Permalink
ddl: fix a bug that MDL may progress unexpectedly or block forever (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 13, 2023
1 parent cd6c09e commit 2a3255d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 19 deletions.
28 changes: 24 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ const (
OnExistReplace

jobRecordCapacity = 16
jobOnceCapacity = 1000
)

var (
Expand Down Expand Up @@ -288,14 +289,14 @@ type waitSchemaSyncedController struct {
mu sync.RWMutex
job map[int64]struct{}

// true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job.
once *atomicutil.Bool
// Use to check if the DDL job is the first run on this owner.
onceMap map[int64]struct{}
}

func newWaitSchemaSyncedController() *waitSchemaSyncedController {
return &waitSchemaSyncedController{
job: make(map[int64]struct{}, jobRecordCapacity),
once: atomicutil.NewBool(true),
job: make(map[int64]struct{}, jobRecordCapacity),
onceMap: make(map[int64]struct{}, jobOnceCapacity),
}
}

Expand All @@ -318,6 +319,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) {
delete(w.job, job.ID)
}

// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner.
// Returns false means that the job must not be the first run on this owner.
func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool {
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.onceMap[id]
return ok
}

func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.onceMap) > jobOnceCapacity {
// If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok.
w.onceMap = make(map[int64]struct{}, jobRecordCapacity)
}
w.onceMap[id] = struct{}{}
}

// ddlCtx is the context when we use worker to handle DDL jobs.
type ddlCtx struct {
ctx context.Context
Expand Down
21 changes: 12 additions & 9 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,14 +1165,14 @@ func toTError(err error) *terror.Error {
return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error())
}

// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens,
// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens,
// we wait at most 2 * lease time(sessionTTL, 90 seconds).
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) {
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return
return nil
}
if waitTime == 0 {
return
return nil
}

timeStart := time.Now()
Expand All @@ -1183,29 +1183,33 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in

if latestSchemaVersion == 0 {
logutil.Logger(d.ctx).Info("[ddl] schema version doesn't change")
return
return nil
}

err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
if variable.EnableMDL.Load() {
return err
}
if terror.ErrorEqual(err, context.DeadlineExceeded) {
// If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker.
// There is no need to use etcd to sync. The function returns directly.
return
return nil
}
}

// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return
return err
}
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)",
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
Expand Down Expand Up @@ -1264,8 +1268,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
}
})

waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
return nil
return waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
Expand Down
17 changes: 11 additions & 6 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (d *ddl) startDispatchLoop() {
}
if !d.isOwner() {
isOnce = true
d.once.Store(true)
d.onceMap = make(map[int64]struct{}, jobOnceCapacity)
time.Sleep(dispatchLoopWaitingDuration)
continue
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
}()
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) {
if variable.EnableMDL.Load() {
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
Expand All @@ -397,7 +397,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
if err != nil {
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
// Don't have a worker now.
return
Expand All @@ -411,7 +411,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
pool.put(wk)
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
}
}

Expand All @@ -430,9 +430,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
})

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update
// the newest schema.
waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
if err != nil {
// May be caused by server closing, shouldn't clean the MDL info.
logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err))
return
}
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
d.synced(job)

Expand Down

0 comments on commit 2a3255d

Please sign in to comment.