Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#46921
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
wjhuang2016 authored and ti-chi-bot committed Sep 18, 2023
1 parent 38fb0eb commit 1aebd8b
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 19 deletions.
28 changes: 24 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ const (
OnExistReplace

jobRecordCapacity = 16
jobOnceCapacity = 1000
)

var (
Expand Down Expand Up @@ -288,14 +289,14 @@ type waitSchemaSyncedController struct {
mu sync.RWMutex
job map[int64]struct{}

// true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job.
once *atomicutil.Bool
// Use to check if the DDL job is the first run on this owner.
onceMap map[int64]struct{}
}

func newWaitSchemaSyncedController() *waitSchemaSyncedController {
return &waitSchemaSyncedController{
job: make(map[int64]struct{}, jobRecordCapacity),
once: atomicutil.NewBool(true),
job: make(map[int64]struct{}, jobRecordCapacity),
onceMap: make(map[int64]struct{}, jobOnceCapacity),
}
}

Expand All @@ -318,6 +319,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) {
delete(w.job, job.ID)
}

// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner.
// Returns false means that the job must not be the first run on this owner.
func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool {
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.onceMap[id]
return ok
}

func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.onceMap) > jobOnceCapacity {
// If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok.
w.onceMap = make(map[int64]struct{}, jobRecordCapacity)
}
w.onceMap[id] = struct{}{}
}

// ddlCtx is the context when we use worker to handle DDL jobs.
type ddlCtx struct {
ctx context.Context
Expand Down
41 changes: 32 additions & 9 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,17 +818,23 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
}
w.registerSync(job)

if runJobErr != nil {
if runJobErr != nil && !dbterror.ErrPausedDDLJob.Equal(runJobErr) {
// Omit the ErrPausedDDLJob
if !dbterror.ErrPausedDDLJob.Equal(runJobErr) {
w.jobLogger(job).Info("run DDL job failed, sleeps a while then retries it.",
zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr))
// In test and job is cancelling we can ignore the sleep.
if !(intest.InTest && job.IsCancelling()) {
// wait a while to retry again. If we don't wait here, DDL will retry this job immediately,
// which may act like a deadlock.
<<<<<<< HEAD
logutil.Logger(w.logCtx).Info("[ddl] run DDL job failed, sleeps a while then retries it.",
zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr))
}

// In test and job is cancelling we can ignore the sleep
if !(intest.InTest && job.IsCancelling()) {
=======
>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921))
time.Sleep(GetWaitTimeWhenErrorOccurred())
}
}
Expand Down Expand Up @@ -1165,14 +1171,14 @@ func toTError(err error) *terror.Error {
return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error())
}

// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens,
// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens,
// we wait at most 2 * lease time(sessionTTL, 90 seconds).
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) {
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return
return nil
}
if waitTime == 0 {
return
return nil
}

timeStart := time.Now()
Expand All @@ -1182,30 +1188,48 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in
}()

if latestSchemaVersion == 0 {
<<<<<<< HEAD
logutil.Logger(d.ctx).Info("[ddl] schema version doesn't change")
return
=======
logutil.Logger(d.ctx).Info("schema version doesn't change", zap.String("category", "ddl"))
return nil
>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921))
}

err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion)
if err != nil {
<<<<<<< HEAD
logutil.Logger(d.ctx).Info("[ddl] update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
=======
logutil.Logger(d.ctx).Info("update latest schema version failed", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err))
if variable.EnableMDL.Load() {
return err
}
>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921))
if terror.ErrorEqual(err, context.DeadlineExceeded) {
// If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker.
// There is no need to use etcd to sync. The function returns directly.
return
return nil
}
}

// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
if err != nil {
<<<<<<< HEAD
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return
=======
logutil.Logger(d.ctx).Info("wait latest schema version encounter error", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return err
>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921))
}
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)",
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
Expand Down Expand Up @@ -1264,8 +1288,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
}
})

waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
return nil
return waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
Expand Down
17 changes: 11 additions & 6 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (d *ddl) startDispatchLoop() {
}
if !d.isOwner() {
isOnce = true
d.once.Store(true)
d.onceMap = make(map[int64]struct{}, jobOnceCapacity)
time.Sleep(dispatchLoopWaitingDuration)
continue
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
}()
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) {
if variable.EnableMDL.Load() {
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
Expand All @@ -397,7 +397,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
if err != nil {
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
// Don't have a worker now.
return
Expand All @@ -411,7 +411,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
pool.put(wk)
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
}
}

Expand All @@ -430,9 +430,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
})

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update
// the newest schema.
waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
if err != nil {
// May be caused by server closing, shouldn't clean the MDL info.
logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err))
return
}
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
d.synced(job)

Expand Down
106 changes: 106 additions & 0 deletions session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,112 @@ func TestUpgradeVersionForSystemPausedJob(t *testing.T) {
checkDDLJobExecSucc(t, seLatestV, jobID)
}

<<<<<<< HEAD
=======
func TestUpgradeVersionForResumeJob(t *testing.T) {
store, dom := session.CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

seV := session.CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
err = m.FinishBootstrap(session.CurrentBootstrapVersion - 1)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
session.MustExec(t, seV, fmt.Sprintf("update mysql.tidb set variable_value='%d' where variable_name='tidb_server_version'", session.CurrentBootstrapVersion-1))
session.UnsetStoreBootstrapped(store.UUID())
ver, err := session.GetBootstrapVersion(seV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion-1, ver)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/mockResumeAllJobsFailed", `return(true)`))
defer failpoint.Disable("github.com/pingcap/tidb/session/mockResumeAllJobsFailed")

// Add a paused DDL job before upgrade.
session.MustExec(t, seV, "create table test.upgrade_tbl(a int, b int)")
session.MustExec(t, seV, "create table test.upgrade_tbl1(a int, b int)")
ch := make(chan struct{})
hook := &callback.TestDDLCallback{}
var jobID int64
doOnce := true
hook.OnGetJobBeforeExported = func(str string) {
if jobID == 0 || !doOnce {
return
}

for i := 0; i < 50; i++ {
sql := fmt.Sprintf("admin show ddl jobs where job_id=%d or job_id=%d", jobID, jobID+1)
se := session.CreateSessionAndSetID(t, store)
rows, err := execute(context.Background(), se, sql)
require.NoError(t, err)
if len(rows) == 2 {
doOnce = false
break
}
time.Sleep(100 * time.Millisecond)
}
}
wg := sync.WaitGroup{}
wg.Add(1)
times := 0
hook.OnGetJobAfterExported = func(tp string, job *model.Job) {
if job.SchemaState == model.StateWriteOnly && times == 0 {
ch <- struct{}{}
jobID = job.ID
times = 1
}
if job.ID == jobID && job.State == model.JobStateDone && job.SchemaState == model.StatePublic {
wg.Done()
}
}

dom.DDL().SetHook(hook)
go func() {
// This "add index" job will be paused when upgrading.
_, _ = execute(context.Background(), seV, "alter table test.upgrade_tbl add index idx(a)")
}()

<-ch
dom.Close()
// Make sure upgrade is successful.
startUpgrade(store)
domLatestV, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domLatestV.Close()
domLatestV.DDL().SetHook(hook)
finishUpgrade(store)
seLatestV := session.CreateSessionAndSetID(t, store)
// Add a new DDL (an "add index" job uses a different table than the previous DDL job) to the DDL table.
session.MustExec(t, seLatestV, "alter table test.upgrade_tbl1 add index idx2(a)")
ver, err = session.GetBootstrapVersion(seLatestV)
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion, ver)

wg.Wait()
// Make sure the second add index operation is successful.
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id=%d or job_id=%d order by job_id", jobID, jobID+1)
rows, err := execute(context.Background(), seLatestV, sql)
require.NoError(t, err)
require.Len(t, rows, 2)
var idxFinishTS uint64
for i, row := range rows {
jobBinary := row.GetBytes(0)
runJob := model.Job{}
err := runJob.Decode(jobBinary)
require.NoError(t, err)
require.True(t, strings.Contains(runJob.TableName, "upgrade_tbl"))
require.Equal(t, model.JobStateSynced.String(), runJob.State.String())
if i == 0 {
idxFinishTS = runJob.BinlogInfo.FinishedTS
} else {
require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS)
}
}
}

>>>>>>> 89fb7adfa99 (ddl: fix a bug that MDL may progress unexpectedly or block forever (#46921))
func execute(ctx context.Context, s sessionctx.Context, query string) ([]chunk.Row, error) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
rs, err := s.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query)
Expand Down

0 comments on commit 1aebd8b

Please sign in to comment.