Skip to content

Commit

Permalink
ddl: delay before changing column from null to not null (#23364)
Browse files Browse the repository at this point in the history
* ddl: delay before changing column from null to not null

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com>
Co-authored-by: Arenatlx <314806019@qq.com>
  • Loading branch information
3 people authored Mar 19, 2021
1 parent 3813da0 commit e79ac3d
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
11 changes: 9 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
}

if !needChangeColumnData(oldCol, jobParam.newCol) {
return w.doModifyColumn(t, job, dbInfo, tblInfo, jobParam.newCol, oldCol, jobParam.pos)
return w.doModifyColumn(d, t, job, dbInfo, tblInfo, jobParam.newCol, oldCol, jobParam.pos)
}

if jobParam.changingCol == nil {
Expand Down Expand Up @@ -1384,11 +1384,18 @@ func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.Ind

// doModifyColumn updates the column information and reorders all columns. It does not support modifying column data.
func (w *worker) doModifyColumn(
t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
d *ddlCtx, t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) {
// Column from null to not null.
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag)

// lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly.
// We need to check after the flag is set
if d.lease > 0 && !noPreventNullFlag {
delayForAsyncCommit()
}

// Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values.
err := modifyColsFromNull2NotNull(w, dbInfo, tblInfo, []*model.ColumnInfo{oldCol}, newCol.Name, oldCol.Tp != newCol.Tp)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,17 @@ type RecoverInfo struct {
CurAutoRandID int64
}

// delayForAsyncCommit sleeps `SafeWindow + AllowedClockDrift` before a DDL job finishes.
// It should be called before any DDL that could break data consistency.
// This provides a safe window for async commit and 1PC to commit with an old schema.
func delayForAsyncCommit() {
cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
duration := cfg.SafeWindow + cfg.AllowedClockDrift
logutil.BgLogger().Info("sleep before DDL finishes to make async commit and 1PC safe",
zap.Duration("duration", duration))
time.Sleep(duration)
}

var (
// RunInGoTest is used to identify whether ddl in running in the test.
RunInGoTest bool
Expand Down
9 changes: 1 addition & 8 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -158,15 +157,9 @@ func (rc *reorgCtx) clean() {
}

func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error {
// Sleep for reorgDelay before doing reorganization.
// This provides a safe window for async commit and 1PC to commit with an old schema.
// lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly.
if lease > 0 {
cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
reorgDelay := cfg.SafeWindow + cfg.AllowedClockDrift
logutil.BgLogger().Info("sleep before reorganization to make async commit safe",
zap.Duration("duration", reorgDelay))
time.Sleep(reorgDelay)
delayForAsyncCommit()
}

job := reorgInfo.Job
Expand Down
4 changes: 3 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
}
}

failpoint.Inject("beforePrewrite", nil)
if c.sessionID > 0 {
failpoint.Inject("beforePrewrite", nil)
}

c.prewriteStarted = true
var binlogChan <-chan BinlogWriteResult
Expand Down

0 comments on commit e79ac3d

Please sign in to comment.