Skip to content

Commit

Permalink
table: add new option DupKeyCheckMode for table mutations (#55194)
Browse files Browse the repository at this point in the history
ref #54397
  • Loading branch information
lcwangchao authored Aug 6, 2024
1 parent 9fee330 commit 1985662
Show file tree
Hide file tree
Showing 14 changed files with 288 additions and 32 deletions.
9 changes: 6 additions & 3 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,8 +1646,6 @@ func (w *addIndexTxnWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords [
}
idxRecords[w.recordIdx[i]].skip = found && idxRecords[w.recordIdx[i]].skip
}
// Constrains is already checked.
w.tblCtx.GetSessionVars().StmtCtx.BatchCheck = true
return nil
}

Expand Down Expand Up @@ -1841,7 +1839,12 @@ func (w *addIndexTxnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx
}

handle, err := w.indexes[i%len(w.indexes)].Create(
w.tblCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion, table.FromBackfill)
w.tblCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData,
table.WithIgnoreAssertion,
table.FromBackfill,
// Constrains is already checked in batchCheckUniqueKey
table.DupKeyCheckSkip,
)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) {
// Index already exists, skip it.
Expand Down
8 changes: 5 additions & 3 deletions pkg/executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,6 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
return result, err
}

// Constrains is already checked.
e.Ctx().GetSessionVars().StmtCtx.BatchCheck = true
for _, row := range rows {
if row.skip {
continue
Expand All @@ -526,7 +524,11 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
return result, err
}

_, err = e.index.Create(e.Ctx().GetTableCtx(), txn, row.idxVals, row.handle, row.rsData, table.WithIgnoreAssertion)
_, err = e.index.Create(e.Ctx().GetTableCtx(), txn, row.idxVals, row.handle, row.rsData,
table.WithIgnoreAssertion,
// Constrains have already been checked.
table.DupKeyCheckSkip,
)
if err != nil {
return result, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if sizeHint > remain {
sizeHint = remain
}
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint)
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault)
} else {
err = e.addRecord(ctx, row)
err = e.addRecord(ctx, row, table.DupKeyCheckDefault)
}
if err != nil {
return err
Expand Down Expand Up @@ -282,7 +282,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
err := e.addRecord(ctx, newRows[i])
err := e.addRecord(ctx, newRows[i], table.DupKeyCheckDefault)
if err != nil {
return err
}
Expand Down
19 changes: 9 additions & 10 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,11 +1190,9 @@ func (e *InsertValues) handleDuplicateKey(ctx context.Context, txn kv.Transactio
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(
ctx context.Context, rows [][]types.Datum,
addRecord func(ctx context.Context, row []types.Datum) error,
addRecord func(ctx context.Context, row []types.Datum, dupKeyCheck table.DupKeyCheckMode) error,
replace bool,
) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.Ctx().GetSessionVars().StmtCtx.BatchCheck = true
defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End()
start := time.Now()
// Get keys need to be checked.
Expand Down Expand Up @@ -1307,7 +1305,8 @@ func (e *InsertValues) batchCheckAndInsert(
// it should be added to values map for the further row check.
// There may be duplicate keys inside the insert statement.
e.Ctx().GetSessionVars().StmtCtx.AddCopiedRows(1)
err = addRecord(ctx, rows[i])
// all the rows have been checked, so it is safe to use DupKeyCheckSkip
err = addRecord(ctx, rows[i], table.DupKeyCheckSkip)
if err != nil {
// throw warning when violate check constraint
if table.ErrCheckConstraintViolated.Equal(err) {
Expand Down Expand Up @@ -1405,21 +1404,21 @@ func (e *InsertValues) equalDatumsAsBinary(a []types.Datum, b []types.Datum) (bo
return true, nil
}

func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum, dupKeyCheck table.DupKeyCheckMode) error {
return e.addRecordWithAutoIDHint(ctx, row, 0, dupKeyCheck)
}

func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context, row []types.Datum, reserveAutoIDCount int,
ctx context.Context, row []types.Datum, reserveAutoIDCount int, dupKeyCheck table.DupKeyCheckMode,
) (err error) {
vars := e.Ctx().GetSessionVars()
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
}
if reserveAutoIDCount > 0 {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount))
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount), dupKeyCheck)
} else {
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx))
_, err = e.Table.AddRecord(e.Ctx().GetTableCtx(), row, table.WithCtx(ctx), dupKeyCheck)
}
vars.PresumeKeyNotExists = false
if err != nil {
Expand All @@ -1429,7 +1428,7 @@ func (e *InsertValues) addRecordWithAutoIDHint(
if e.lastInsertID != 0 {
vars.SetLastInsertID(e.lastInsertID)
}
if !vars.StmtCtx.BatchCheck {
if dupKeyCheck != table.DupKeyCheckSkip {
for _, fkc := range e.fkChecks {
err = fkc.insertRowNeedToCheck(vars.StmtCtx, row)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,9 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type
if sizeHint > remain {
sizeHint = remain
}
err = w.addRecordWithAutoIDHint(ctx, row, sizeHint)
err = w.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault)
} else {
err = w.addRecord(ctx, row)
err = w.addRecord(ctx, row, table.DupKeyCheckDefault)
}
if err != nil {
return err
Expand All @@ -670,11 +670,11 @@ func (w *commitWorker) checkAndInsertOneBatch(ctx context.Context, rows [][]type
}
}

func (w *commitWorker) addRecordLD(ctx context.Context, row []types.Datum) error {
func (w *commitWorker) addRecordLD(ctx context.Context, row []types.Datum, dupKeyCheck table.DupKeyCheckMode) error {
if row == nil {
return nil
}
return w.addRecord(ctx, row)
return w.addRecord(ctx, row, dupKeyCheck)
}

// GetInfilePath get infile path.
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
}

// No duplicated rows now, insert the row.
err = e.addRecord(ctx, r.row)
err = e.addRecord(ctx, r.row, table.DupKeyCheckDefault)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64

// AddRecord adds a record into encoder
func (e *BaseKVEncoder) AddRecord(record []types.Datum) (kv.Handle, error) {
return e.table.AddRecord(e.SessionCtx.GetTableCtx(), record)
return e.table.AddRecord(e.SessionCtx.GetTableCtx(), record, table.DupKeyCheckSkip)
}

// TableAllocators returns the allocators of the table
Expand Down
1 change: 0 additions & 1 deletion pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ func NewSession(options *encode.SessionOptions, logger log.Logger) *Session {
vars := variable.NewSessionVars(s)
vars.SkipUTF8Check = true
vars.StmtCtx.InInsertStmt = true
vars.StmtCtx.BatchCheck = true
vars.SQLMode = sqlMode

typeFlags := vars.StmtCtx.TypeFlags().
Expand Down
1 change: 0 additions & 1 deletion pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ type StatementContext struct {
contextutil.PlanCacheTracker
contextutil.RangeFallbackHandler

BatchCheck bool
IgnoreExplainIDSuffix bool
MultiSchemaInfo *model.MultiSchemaInfo
// If the select statement was like 'select * from t as of timestamp ...' or in a stale read transaction
Expand Down
34 changes: 33 additions & 1 deletion pkg/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ type RecordIterFunc func(h kv.Handle, rec []types.Datum, cols []*Column) (more b

// commonMutateOpt is the common options for mutating a table.
type commonMutateOpt struct {
Ctx context.Context
Ctx context.Context
DupKeyCheck DupKeyCheckMode
}

// AddRecordOpt contains the options will be used when adding a record.
Expand Down Expand Up @@ -222,6 +223,37 @@ func (i isUpdate) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.IsUpdate = true
}

// DupKeyCheckMode indicates how to check the duplicated key when adding/updating a record/index.
type DupKeyCheckMode uint8

const (
// DupKeyCheckDefault indicates using the default behavior.
// Currently, this means to use the return value `ctx.LazyCheckKeyNotExists()`.
// If the above method returns true, it will only check the duplicated key in the memory buffer,
// otherwise, it will also check the duplicated key in the storage.
// TODO: add `DupKeyCheckLazy` to indicate only checking the duplicated key in the memory buffer.
// After `DupKeyCheckLazy` added, `DupKeyCheckDefault` will be renamed to `DupKeyCheckInPlace` to force check
// the duplicated key in place.
DupKeyCheckDefault DupKeyCheckMode = iota
// DupKeyCheckSkip indicates skipping the duplicated key check.
DupKeyCheckSkip
)

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (m DupKeyCheckMode) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.DupKeyCheck = m
}

// ApplyUpdateRecordOpt implements the UpdateRecordOption interface.
func (m DupKeyCheckMode) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.DupKeyCheck = m
}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (m DupKeyCheckMode) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
opt.DupKeyCheck = m
}

type columnAPI interface {
// Cols returns the columns of the table which is used in select, including hidden columns.
Cols() []*Column
Expand Down
3 changes: 2 additions & 1 deletion pkg/table/tables/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ go_test(
],
embed = [":tables"],
flaky = True,
shard_count = 31,
shard_count = 32,
deps = [
"//pkg/ddl",
"//pkg/domain",
"//pkg/errctx",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/lightning/backend/encode",
Expand Down
2 changes: 1 addition & 1 deletion pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}
vars := sctx.GetSessionVars()
writeBufs := sctx.GetMutateBuffers().GetWriteStmtBufs()
skipCheck := vars.StmtCtx.BatchCheck
skipCheck := opt.DupKeyCheck == table.DupKeyCheckSkip
evalCtx := sctx.GetExprCtx().GetEvalCtx()
loc, ec := evalCtx.Location(), evalCtx.ErrCtx()
for _, value := range indexedValues {
Expand Down
4 changes: 2 additions & 2 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, r []types.Datum, opt *
}
key := t.RecordKey(recordID)
var setPresume bool
if !sctx.GetSessionVars().StmtCtx.BatchCheck {
if opt.DupKeyCheck != table.DupKeyCheckSkip {
if t.meta.TempTableType != model.TempTableNone {
// Always check key for temporary table because it does not write to TiKV
_, err = txn.Get(ctx, key)
Expand Down Expand Up @@ -1013,7 +1013,7 @@ func genIndexKeyStrs(colVals []types.Datum) ([]string, error) {
func (t *TableCommon) addIndices(sctx table.MutateContext, recordID kv.Handle, r []types.Datum, txn kv.Transaction, opt *table.CreateIdxOpt) (kv.Handle, error) {
writeBufs := sctx.GetMutateBuffers().GetWriteStmtBufs()
indexVals := writeBufs.IndexValsBuf
skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck
skipCheck := opt.DupKeyCheck == table.DupKeyCheckSkip
for _, v := range t.Indices() {
if !IsIndexWritable(v) {
continue
Expand Down
Loading

0 comments on commit 1985662

Please sign in to comment.