diff --git a/executor/adapter.go b/executor/adapter.go index faf894816c702..31a8c8d20150f 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -351,7 +351,7 @@ func IsFastPlan(p plannercore.Plan) bool { } // Exec builds an Executor from a plan. If the Executor doesn't return result, -// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns +// like the INSERT, UPDATE statements, it executes in this function. If the Executor returns // result, execution is done after this function returns, in the returned sqlexec.RecordSet Next method. func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { defer func() { @@ -708,7 +708,10 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { keys = filterTemporaryTableKeys(sctx.GetSessionVars(), keys) seVars := sctx.GetSessionVars() keys = filterLockTableKeys(seVars.StmtCtx, keys) - lockCtx := newLockCtx(seVars, seVars.LockWaitTimeout, len(keys)) + lockCtx, err := newLockCtx(sctx, seVars.LockWaitTimeout, len(keys)) + if err != nil { + return err + } var lockKeyStats *util.LockKeysDetails ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats) startLocking := time.Now() @@ -730,43 +733,18 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { } } -// UpdateForUpdateTS updates the ForUpdateTS, if newForUpdateTS is 0, it obtain a new TS from PD. -func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error { - txn, err := seCtx.Txn(false) - if err != nil { - return err - } - if !txn.Valid() { - return errors.Trace(kv.ErrInvalidTxn) - } - - // The Oracle serializable isolation is actually SI in pessimistic mode. - // Do not update ForUpdateTS when the user is using the Serializable isolation level. - // It can be used temporarily on the few occasions when an Oracle-like isolation level is needed. - // Support for this does not mean that TiDB supports serializable isolation of MySQL. - // tidb_skip_isolation_level_check should still be disabled by default. - if seCtx.GetSessionVars().IsIsolation(ast.Serializable) { - return nil - } - if newForUpdateTS == 0 { - // Because the ForUpdateTS is used for the snapshot for reading data in DML. - // We can avoid allocating a global TSO here to speed it up by using the local TSO. - version, err := seCtx.GetStore().CurrentVersion(seCtx.GetSessionVars().TxnCtx.TxnScope) - if err != nil { - return err - } - newForUpdateTS = version.Ver - } - seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS) - txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS()) - return nil -} - // handlePessimisticLockError updates TS and rebuild executor if the err is write conflict. func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error) (_ Executor, err error) { if lockErr == nil { return nil, nil } + failpoint.Inject("assertPessimisticLockErr", func() { + if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) { + sessiontxn.AddAssertEntranceForLockError(a.Ctx, "errWriteConflict") + } else if terror.ErrorEqual(kv.ErrKeyExists, lockErr) { + sessiontxn.AddAssertEntranceForLockError(a.Ctx, "errDuplicateKey") + } + }) defer func() { if _, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok { @@ -774,7 +752,8 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error } }() - action, err := sessiontxn.GetTxnManager(a.Ctx).OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr) + txnManager := sessiontxn.GetTxnManager(a.Ctx) + action, err := txnManager.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr) if err != nil { return nil, err } @@ -789,10 +768,17 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error a.retryCount++ a.retryStartTime = time.Now() - err = sessiontxn.GetTxnManager(a.Ctx).OnStmtRetry(ctx) + err = txnManager.OnStmtRetry(ctx) if err != nil { return nil, err } + + // Without this line of code, the result will still be correct. But it can ensure that the update time of for update read + // is determined which is beneficial for testing. + if _, err = txnManager.GetStmtForUpdateTS(); err != nil { + return nil, err + } + breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError) e, err := a.buildExecutor() diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 0ce745d172e4a..b5eb68a8b12de 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -56,7 +56,6 @@ type BatchPointGetExec struct { singlePart bool partTblID int64 idxVals [][]types.Datum - startTS uint64 readReplicaScope string isStaleness bool snapshotTS uint64 @@ -97,13 +96,9 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() { // Open implements the Executor interface. func (e *BatchPointGetExec) Open(context.Context) error { - e.snapshotTS = e.startTS sessVars := e.ctx.GetSessionVars() txnCtx := sessVars.TxnCtx stmtCtx := sessVars.StmtCtx - if e.lock { - e.snapshotTS = txnCtx.GetForUpdateTS() - } txn, err := e.ctx.Txn(false) if err != nil { return err @@ -111,8 +106,8 @@ func (e *BatchPointGetExec) Open(context.Context) error { e.txn = txn var snapshot kv.Snapshot if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == e.snapshotTS { - // We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS. - // The snapshot may contains cache that can reduce RPC call. + // We can safely reuse the transaction snapshot if snapshotTS is equal to forUpdateTS. + // The snapshot may contain cache that can reduce RPC call. snapshot = txn.GetSnapshot() } else { snapshot = e.ctx.GetSnapshotWithTS(e.snapshotTS) @@ -540,13 +535,16 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { } // LockKeys locks the keys for pessimistic transaction. -func LockKeys(ctx context.Context, seCtx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error { - txnCtx := seCtx.GetSessionVars().TxnCtx - lctx := newLockCtx(seCtx.GetSessionVars(), lockWaitTime, len(keys)) +func LockKeys(ctx context.Context, sctx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error { + txnCtx := sctx.GetSessionVars().TxnCtx + lctx, err := newLockCtx(sctx, lockWaitTime, len(keys)) + if err != nil { + return err + } if txnCtx.IsPessimistic { lctx.InitReturnValues(len(keys)) } - err := doLockKeys(ctx, seCtx, lctx, keys...) + err = doLockKeys(ctx, sctx, lctx, keys...) if err != nil { return err } diff --git a/executor/builder.go b/executor/builder.go index 8a44c09aaf033..73a4995696de0 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -657,11 +657,11 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor defer func() { b.inSelectLockStmt = false }() } b.hasLock = true - if b.err = b.updateForUpdateTSIfNeeded(v.Children()[0]); b.err != nil { + + // Build 'select for update' using the 'for update' ts. + if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil { return nil } - // Build 'select for update' using the 'for update' ts. - b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() src := b.build(v.Children()[0]) if b.err != nil { @@ -865,14 +865,11 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor { func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { b.inInsertStmt = true - if v.SelectPlan != nil { - // Try to update the forUpdateTS for insert/replace into select statements. - // Set the selectPlan parameter to nil to make it always update the forUpdateTS. - if b.err = b.updateForUpdateTSIfNeeded(nil); b.err != nil { - return nil - } + + if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil { + return nil } - b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() + selectExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -2116,10 +2113,11 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { } } } - if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil { + + if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil { return nil } - b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() + selExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -2173,10 +2171,11 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { for _, info := range v.TblColPosInfos { tblID2table[info.TblID], _ = b.is.TableByID(info.TblID) } - if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil { + + if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil { return nil } - b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() + selExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -2192,34 +2191,6 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { return deleteExec } -// updateForUpdateTSIfNeeded updates the ForUpdateTS for a pessimistic transaction if needed. -// PointGet executor will get conflict error if the ForUpdateTS is older than the latest commitTS, -// so we don't need to update now for better latency. -func (b *executorBuilder) updateForUpdateTSIfNeeded(selectPlan plannercore.PhysicalPlan) error { - txnCtx := b.ctx.GetSessionVars().TxnCtx - if !txnCtx.IsPessimistic { - return nil - } - if _, ok := selectPlan.(*plannercore.PointGetPlan); ok { - return nil - } - // Activate the invalid txn, use the txn startTS as newForUpdateTS - txn, err := b.ctx.Txn(false) - if err != nil { - return err - } - if !txn.Valid() { - _, err := b.ctx.Txn(true) - if err != nil { - return err - } - return nil - } - // GetStmtForUpdateTS will auto update the for update ts if necessary - _, err = sessiontxn.GetTxnManager(b.ctx).GetStmtForUpdateTS() - return err -} - func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string) *analyzeTask { job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze index " + task.IndexInfo.Name.O} _, offset := timeutil.Zone(b.ctx.GetSessionVars().Location()) @@ -4663,18 +4634,26 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan return nil } - startTS, err := b.getSnapshotTS() + if plan.Lock && !b.inSelectLockStmt { + b.inSelectLockStmt = true + defer func() { + b.inSelectLockStmt = false + }() + } + + snapshotTS, err := b.getSnapshotTS() if err != nil { b.err = err return nil } + decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo) e := &BatchPointGetExec{ baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ID()), tblInfo: plan.TblInfo, idxInfo: plan.IndexInfo, rowDecoder: decoder, - startTS: startTS, + snapshotTS: snapshotTS, readReplicaScope: b.readReplicaScope, isStaleness: b.isStaleness, keepOrder: plan.KeepOrder, @@ -4687,9 +4666,11 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan partTblID: plan.PartTblID, columns: plan.Columns, } + if plan.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable { - e.cacheTable = b.getCacheTable(plan.TblInfo, startTS) + e.cacheTable = b.getCacheTable(plan.TblInfo, snapshotTS) } + if plan.TblInfo.TempTableType != model.TempTableNone { // Temporary table should not do any lock operations e.lock = false @@ -4699,6 +4680,7 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan if e.lock { b.hasLock = true } + var capacity int if plan.IndexInfo != nil && !isCommonHandleRead(plan.TblInfo, plan.IndexInfo) { e.idxVals = plan.IndexValues diff --git a/executor/executor.go b/executor/executor.go index aa71929859898..d2b726f24adbb 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -49,6 +49,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -1042,12 +1043,20 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { for id := range e.tblID2Handle { e.updateDeltaForTableID(id) } - - return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime, len(e.keys)), e.keys...) + lockCtx, err := newLockCtx(e.ctx, lockWaitTime, len(e.keys)) + if err != nil { + return err + } + return doLockKeys(ctx, e.ctx, lockCtx, e.keys...) } -func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64, numKeys int) *tikvstore.LockCtx { - lockCtx := tikvstore.NewLockCtx(seVars.TxnCtx.GetForUpdateTS(), lockWaitTime, seVars.StmtCtx.GetLockWaitStartTime()) +func newLockCtx(sctx sessionctx.Context, lockWaitTime int64, numKeys int) (*tikvstore.LockCtx, error) { + seVars := sctx.GetSessionVars() + forUpdateTS, err := sessiontxn.GetTxnManager(sctx).GetStmtForUpdateTS() + if err != nil { + return nil, err + } + lockCtx := tikvstore.NewLockCtx(forUpdateTS, lockWaitTime, seVars.StmtCtx.GetLockWaitStartTime()) lockCtx.Killed = &seVars.Killed lockCtx.PessimisticLockWaited = &seVars.StmtCtx.PessimisticLockWaited lockCtx.LockKeysDuration = &seVars.StmtCtx.LockKeysDuration @@ -1082,7 +1091,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64, numKeys int) * if lockCtx.ForUpdateTS > 0 && seVars.AssertionLevel != variable.AssertionLevelOff { lockCtx.InitCheckExistence(numKeys) } - return lockCtx + return lockCtx, nil } // doLockKeys is the main entry for pessimistic lock keys diff --git a/executor/point_get.go b/executor/point_get.go index f33ba20b5dd5a..1b4d6666663b5 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -49,11 +49,19 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { return nil } - startTS, err := b.getSnapshotTS() + if p.Lock && !b.inSelectLockStmt { + b.inSelectLockStmt = true + defer func() { + b.inSelectLockStmt = false + }() + } + + snapshotTS, err := b.getSnapshotTS() if err != nil { b.err = err return nil } + e := &PointGetExecutor{ baseExecutor: newBaseExecutor(b.ctx, p.Schema(), p.ID()), readReplicaScope: b.readReplicaScope, @@ -61,14 +69,17 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { } if p.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable { - e.cacheTable = b.getCacheTable(p.TblInfo, startTS) + e.cacheTable = b.getCacheTable(p.TblInfo, snapshotTS) } + e.base().initCap = 1 e.base().maxChunkSize = 1 - e.Init(p, startTS) + e.Init(p, snapshotTS) + if e.lock { b.hasLock = true } + return e } @@ -83,7 +94,7 @@ type PointGetExecutor struct { idxKey kv.Key handleVal []byte idxVals []types.Datum - startTS uint64 + snapshotTS uint64 readReplicaScope string isStaleness bool txn kv.Transaction @@ -106,13 +117,13 @@ type PointGetExecutor struct { } // Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field -func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) { +func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, snapshotTS uint64) { decoder := NewRowDecoder(e.ctx, p.Schema(), p.TblInfo) e.tblInfo = p.TblInfo e.handle = p.Handle e.idxInfo = p.IndexInfo e.idxVals = p.IndexValues - e.startTS = startTs + e.snapshotTS = snapshotTS e.done = false if e.tblInfo.TempTableType == model.TempTableNone { e.lock = p.Lock @@ -142,10 +153,7 @@ func (e *PointGetExecutor) buildVirtualColumnInfo() { // Open implements the Executor interface. func (e *PointGetExecutor) Open(context.Context) error { txnCtx := e.ctx.GetSessionVars().TxnCtx - snapshotTS := e.startTS - if e.lock { - snapshotTS = txnCtx.GetForUpdateTS() - } + snapshotTS := e.snapshotTS var err error e.txn, err = e.ctx.Txn(false) if err != nil { @@ -381,9 +389,12 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro } if e.lock { seVars := e.ctx.GetSessionVars() - lockCtx := newLockCtx(seVars, e.lockWaitTime, 1) + lockCtx, err := newLockCtx(e.ctx, e.lockWaitTime, 1) + if err != nil { + return err + } lockCtx.InitReturnValues(1) - err := doLockKeys(ctx, e.ctx, lockCtx, key) + err = doLockKeys(ctx, e.ctx, lockCtx, key) if err != nil { return err } diff --git a/executor/trace_test.go b/executor/trace_test.go index f8e8e91ddebd7..9b448670cc39a 100644 --- a/executor/trace_test.go +++ b/executor/trace_test.go @@ -33,7 +33,7 @@ func TestTraceExec(t *testing.T) { require.GreaterOrEqual(t, len(rows), 1) // +---------------------------+-----------------+------------+ - // | operation | startTS | duration | + // | operation | snapshotTS | duration | // +---------------------------+-----------------+------------+ // | session.getTxnFuture | 22:08:38.247834 | 78.909µs | // | ├─session.Execute | 22:08:38.247829 | 1.478487ms | diff --git a/sessiontxn/failpoint.go b/sessiontxn/failpoint.go index d33984649b371..b41be21165908 100644 --- a/sessiontxn/failpoint.go +++ b/sessiontxn/failpoint.go @@ -43,6 +43,10 @@ var BreakPointBeforeExecutorFirstRun = "beforeExecutorFirstRun" // Only for test var BreakPointOnStmtRetryAfterLockError = "lockErrorAndThenOnStmtRetryCalled" +// AssertLockErr is used to record the lock errors we encountered +// Only for test +var AssertLockErr stringutil.StringerStr = "assertLockError" + // RecordAssert is used only for test func RecordAssert(sctx sessionctx.Context, name string, value interface{}) { records, ok := sctx.Value(AssertRecordsKey).(map[string]interface{}) @@ -94,6 +98,20 @@ func AssertTxnManagerReadTS(sctx sessionctx.Context, expected uint64) { } } +// AddAssertEntranceForLockError is used only for test +func AddAssertEntranceForLockError(sctx sessionctx.Context, name string) { + records, ok := sctx.Value(AssertLockErr).(map[string]int) + if !ok { + records = make(map[string]int) + sctx.SetValue(AssertLockErr, records) + } + if v, ok := records[name]; ok { + records[name] = v + 1 + } else { + records[name] = 1 + } +} + // ExecTestHook is used only for test. It consumes hookKey in session wait do what it gets from it. func ExecTestHook(sctx sessionctx.Context, hookKey fmt.Stringer) { c := sctx.Value(hookKey) diff --git a/sessiontxn/isolation/readcommitted.go b/sessiontxn/isolation/readcommitted.go index 8a409000d7049..5fb316b59f8bf 100644 --- a/sessiontxn/isolation/readcommitted.go +++ b/sessiontxn/isolation/readcommitted.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/terror" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" @@ -31,20 +32,15 @@ import ( ) type stmtState struct { - stmtTS uint64 - stmtTSFuture oracle.Future - stmtUseStartTS bool - onNextRetryOrStmt func() error + stmtTS uint64 + stmtTSFuture oracle.Future + stmtUseStartTS bool } func (s *stmtState) prepareStmt(useStartTS bool) error { - onNextStmt := s.onNextRetryOrStmt *s = stmtState{ stmtUseStartTS: useStartTS, } - if onNextStmt != nil { - return onNextStmt() - } return nil } @@ -52,7 +48,9 @@ func (s *stmtState) prepareStmt(useStartTS bool) error { type PessimisticRCTxnContextProvider struct { baseTxnContextProvider stmtState - availableRCCheckTS uint64 + latestOracleTS uint64 + // latestOracleTSValid shows whether we have already fetched a ts from pd and whether the ts we fetched is still valid. + latestOracleTSValid bool } // NewPessimisticRCTxnContextProvider returns a new PessimisticRCTxnContextProvider @@ -65,12 +63,14 @@ func NewPessimisticRCTxnContextProvider(sctx sessionctx.Context, causalConsisten txnCtx.IsPessimistic = true txnCtx.Isolation = ast.ReadCommitted }, - onTxnActive: func(txn kv.Transaction) { - txn.SetOption(kv.Pessimistic, true) - }, }, } + provider.onTxnActive = func(txn kv.Transaction) { + txn.SetOption(kv.Pessimistic, true) + provider.latestOracleTS = txn.StartTS() + provider.latestOracleTSValid = true + } provider.getStmtReadTSFunc = provider.getStmtTS provider.getStmtForUpdateTSFunc = provider.getStmtTS return provider @@ -86,9 +86,6 @@ func (p *PessimisticRCTxnContextProvider) OnStmtStart(ctx context.Context) error // OnStmtErrorForNextAction is the hook that should be called when a new statement get an error func (p *PessimisticRCTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) { - // Invalid rc check for next statement or retry when error occurs - p.availableRCCheckTS = 0 - switch point { case sessiontxn.StmtErrAfterQuery: return p.handleAfterQueryError(err) @@ -117,15 +114,30 @@ func (p *PessimisticRCTxnContextProvider) prepareStmtTS() { switch { case p.stmtUseStartTS: stmtTSFuture = sessiontxn.FuncFuture(p.getTxnStartTS) - case p.availableRCCheckTS != 0 && sessVars.StmtCtx.RCCheckTS: - stmtTSFuture = sessiontxn.ConstantFuture(p.availableRCCheckTS) + case p.latestOracleTSValid && sessVars.StmtCtx.RCCheckTS: + stmtTSFuture = sessiontxn.ConstantFuture(p.latestOracleTS) default: - stmtTSFuture = sessiontxn.NewOracleFuture(p.ctx, p.sctx, sessVars.TxnCtx.TxnScope) + stmtTSFuture = p.getOracleFuture() } p.stmtTSFuture = stmtTSFuture } +func (p *PessimisticRCTxnContextProvider) getOracleFuture() sessiontxn.FuncFuture { + txnCtx := p.sctx.GetSessionVars().TxnCtx + future := sessiontxn.NewOracleFuture(p.ctx, p.sctx, txnCtx.TxnScope) + return func() (ts uint64, err error) { + if ts, err = future.Wait(); err != nil { + return + } + txnCtx.SetForUpdateTS(ts) + ts = txnCtx.GetForUpdateTS() + p.latestOracleTS = ts + p.latestOracleTSValid = true + return + } +} + func (p *PessimisticRCTxnContextProvider) getStmtTS() (ts uint64, err error) { if p.stmtTS != 0 { return p.stmtTS, nil @@ -141,13 +153,8 @@ func (p *PessimisticRCTxnContextProvider) getStmtTS() (ts uint64, err error) { return 0, err } - // forUpdateTS should exactly equal to the read ts - txnCtx := p.sctx.GetSessionVars().TxnCtx - txnCtx.SetForUpdateTS(ts) txn.SetOption(kv.SnapshotTS, ts) - p.stmtTS = ts - p.availableRCCheckTS = ts return } @@ -155,16 +162,18 @@ func (p *PessimisticRCTxnContextProvider) getStmtTS() (ts uint64, err error) { // At this point the query will be retried from the beginning. func (p *PessimisticRCTxnContextProvider) handleAfterQueryError(queryErr error) (sessiontxn.StmtErrorAction, error) { sessVars := p.sctx.GetSessionVars() - if sessVars.StmtCtx.RCCheckTS && errors.ErrorEqual(queryErr, kv.ErrWriteConflict) { - logutil.Logger(p.ctx).Info("RC read with ts checking has failed, retry RC read", - zap.String("sql", sessVars.StmtCtx.OriginalSQL)) - return sessiontxn.RetryReady() + if !errors.ErrorEqual(queryErr, kv.ErrWriteConflict) || !sessVars.StmtCtx.RCCheckTS { + return sessiontxn.NoIdea() } - return sessiontxn.NoIdea() + p.latestOracleTSValid = false + logutil.Logger(p.ctx).Info("RC read with ts checking has failed, retry RC read", + zap.String("sql", sessVars.StmtCtx.OriginalSQL)) + return sessiontxn.RetryReady() } func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockErr error) (sessiontxn.StmtErrorAction, error) { + p.latestOracleTSValid = false txnCtx := p.sctx.GetSessionVars().TxnCtx retryable := false if deadlock, ok := errors.Cause(lockErr).(*tikverr.ErrDeadlock); ok && deadlock.IsRetryable { @@ -182,16 +191,9 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockEr retryable = true } - // force refresh ts in next retry or statement when lock error occurs - p.onNextRetryOrStmt = func() error { - _, err := p.getStmtTS() - return err - } - if retryable { return sessiontxn.RetryReady() } - return sessiontxn.ErrorAction(lockErr) } @@ -207,3 +209,31 @@ func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error { p.prepareStmtTS() return nil } + +// AdviseOptimizeWithPlan in RC covers much fewer cases compared with pessimistic repeatable read. +// We only optimize with insert operator with no selection in that we do not fetch latest ts immediately. +// We only update ts if write conflict is incurred. +func (p *PessimisticRCTxnContextProvider) AdviseOptimizeWithPlan(val interface{}) (err error) { + if p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() { + return nil + } + + if p.stmtUseStartTS || !p.latestOracleTSValid { + return nil + } + + plan, ok := val.(plannercore.Plan) + if !ok { + return nil + } + + if execute, ok := plan.(*plannercore.Execute); ok { + plan = execute.Plan + } + + if v, ok := plan.(*plannercore.Insert); ok && v.SelectPlan == nil { + p.stmtTSFuture = sessiontxn.ConstantFuture(p.latestOracleTS) + } + + return nil +} diff --git a/sessiontxn/isolation/readcommitted_test.go b/sessiontxn/isolation/readcommitted_test.go index d867cc689ca28..5c747eba4fa2c 100644 --- a/sessiontxn/isolation/readcommitted_test.go +++ b/sessiontxn/isolation/readcommitted_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/executor" @@ -53,13 +54,13 @@ func TestPessimisticRCTxnContextProviderRCCheck(t *testing.T) { require.NoError(t, err) forUpdateStmt := stmts[0] - compareTS := getOracleTS(t, se) + compareTS := se.GetSessionVars().TxnCtx.StartTS // first ts should request from tso require.NoError(t, executor.ResetContextOfStmt(se, readOnlyStmt)) require.NoError(t, provider.OnStmtStart(context.TODO())) ts, err := provider.GetStmtReadTS() require.NoError(t, err) - require.Greater(t, ts, compareTS) + require.Equal(t, ts, compareTS) rcCheckTS := ts // second ts should reuse first ts @@ -101,14 +102,11 @@ func TestPessimisticRCTxnContextProviderRCCheck(t *testing.T) { nextAction, err = provider.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterQuery, errors.New("err")) require.NoError(t, err) require.Equal(t, sessiontxn.StmtActionNoIdea, nextAction) - compareTS = getOracleTS(t, se) - require.Greater(t, compareTS, rcCheckTS) require.NoError(t, executor.ResetContextOfStmt(se, readOnlyStmt)) require.NoError(t, provider.OnStmtStart(context.TODO())) ts, err = provider.GetStmtReadTS() require.NoError(t, err) - require.Greater(t, ts, compareTS) - rcCheckTS = ts + require.Equal(t, rcCheckTS, ts) // `StmtErrAfterPessimisticLock` will still disable rc check require.NoError(t, executor.ResetContextOfStmt(se, readOnlyStmt)) @@ -381,6 +379,88 @@ func TestTidbSnapshotVarInRC(t *testing.T) { } } +func TestConflictErrorsInRC(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertPessimisticLockErr", "return")) + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + se := tk.Session() + tk2 := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk2.MustExec("use test") + tk.MustExec("create table t (id int primary key, v int)") + + tk.MustExec("set tx_isolation='READ-COMMITTED'") + + // Test for insert + tk.MustExec("begin pessimistic") + tk2.MustExec("insert into t values (1, 2)") + se.SetValue(sessiontxn.AssertLockErr, nil) + _, err := tk.Exec("insert into t values (1, 1), (2, 2)") + require.Error(t, err) + records, ok := se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.True(t, ok) + for _, name := range errorsInInsert { + require.Equal(t, records[name], 1) + } + + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustExec("rollback") + + // Test for delete + tk.MustExec("truncate t") + tk.MustExec("insert into t values (1, 1), (2, 2)") + + tk.MustExec("begin pessimistic") + tk2.MustExec("insert into t values (3, 1)") + + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustExec("delete from t where v = 1") + _, ok = se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.False(t, ok) + tk.MustQuery("select * from t").Check(testkit.Rows("2 2")) + tk.MustExec("commit") + + // Unlike RR, in RC, we will always fetch the latest ts. So write conflict will not be happened + tk.MustExec("begin pessimistic") + tk2.MustExec("update t set id = 1 where id = 2") + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustExec("delete from t where id = 1") + _, ok = se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.False(t, ok) + tk.MustQuery("select * from t for update").Check(testkit.Rows()) + + tk.MustExec("rollback") + + // Test for update + tk.MustExec("truncate t") + tk.MustExec("insert into t values (1, 1), (2, 2)") + + tk.MustExec("begin pessimistic") + tk2.MustExec("update t set v = v + 10") + + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustExec("update t set v = v + 10") + _, ok = se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.False(t, ok) + tk.MustQuery("select * from t").Check(testkit.Rows("1 21", "2 22")) + tk.MustExec("commit") + + // Unlike RR, in RC, we will always fetch the latest ts. So write conflict will not be happened + tk.MustExec("begin pessimistic") + tk2.MustExec("update t set v = v + 10 where id = 1") + tk.MustExec("update t set v = v + 10 where id = 1") + _, ok = se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.False(t, ok) + tk.MustQuery("select * from t for update").Check(testkit.Rows("1 41", "2 22")) + + tk.MustExec("rollback") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertPessimisticLockErr")) +} + func activeRCTxnAssert(t *testing.T, sctx sessionctx.Context, inTxn bool) *txnAssert[*isolation.PessimisticRCTxnContextProvider] { return &txnAssert[*isolation.PessimisticRCTxnContextProvider]{ sctx: sctx, diff --git a/sessiontxn/isolation/repeatable_read.go b/sessiontxn/isolation/repeatable_read.go index ef678f40c614c..571d2754be9a3 100644 --- a/sessiontxn/isolation/repeatable_read.go +++ b/sessiontxn/isolation/repeatable_read.go @@ -35,10 +35,11 @@ type PessimisticRRTxnContextProvider struct { baseTxnContextProvider // Used for ForUpdateRead statement - forUpdateTS uint64 + forUpdateTS uint64 + latestForUpdateTS uint64 // It may decide whether to update forUpdateTs when calling provider's getForUpdateTs // See more details in the comments of optimizeWithPlan - followingOperatorIsPointGetForUpdate bool + optimizeForNotFetchingLatestTS bool } // NewPessimisticRRTxnContextProvider returns a new PessimisticRRTxnContextProvider @@ -73,7 +74,7 @@ func (p *PessimisticRRTxnContextProvider) getForUpdateTs() (ts uint64, err error return 0, err } - if p.followingOperatorIsPointGetForUpdate { + if p.optimizeForNotFetchingLatestTS { p.forUpdateTS = p.sctx.GetSessionVars().TxnCtx.GetForUpdateTS() return p.forUpdateTS, nil } @@ -114,7 +115,8 @@ func (p *PessimisticRRTxnContextProvider) updateForUpdateTS() (err error) { } sctx.GetSessionVars().TxnCtx.SetForUpdateTS(version.Ver) - txn.SetOption(kv.SnapshotTS, sctx.GetSessionVars().TxnCtx.GetForUpdateTS()) + p.latestForUpdateTS = version.Ver + txn.SetOption(kv.SnapshotTS, version.Ver) return nil } @@ -126,7 +128,7 @@ func (p *PessimisticRRTxnContextProvider) OnStmtStart(ctx context.Context) error } p.forUpdateTS = 0 - p.followingOperatorIsPointGetForUpdate = false + p.optimizeForNotFetchingLatestTS = false return nil } @@ -137,15 +139,14 @@ func (p *PessimisticRRTxnContextProvider) OnStmtRetry(ctx context.Context) (err return err } - txnCtxForUpdateTS := p.sctx.GetSessionVars().TxnCtx.GetForUpdateTS() // If TxnCtx.forUpdateTS is updated in OnStmtErrorForNextAction, we assign the value to the provider - if txnCtxForUpdateTS > p.forUpdateTS { - p.forUpdateTS = txnCtxForUpdateTS + if p.latestForUpdateTS > p.forUpdateTS { + p.forUpdateTS = p.latestForUpdateTS } else { p.forUpdateTS = 0 } - p.followingOperatorIsPointGetForUpdate = false + p.optimizeForNotFetchingLatestTS = false return nil } @@ -165,6 +166,8 @@ func (p *PessimisticRRTxnContextProvider) OnStmtErrorForNextAction(point session // We expect that the data that the point get acquires has not been changed. // Benefit: Save the cost of acquiring ts from PD. // Drawbacks: If the data has been changed since the ts we used, we need to retry. +// One exception is insert operation, when it has no select plan, we do not fetch the latest ts immediately. We only update ts +// if write conflict is incurred. func (p *PessimisticRRTxnContextProvider) AdviseOptimizeWithPlan(val interface{}) (err error) { if p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() { return nil @@ -179,24 +182,44 @@ func (p *PessimisticRRTxnContextProvider) AdviseOptimizeWithPlan(val interface{} plan = execute.Plan } - mayOptimizeForPointGet := false - if v, ok := plan.(*plannercore.PhysicalLock); ok { - if _, ok := v.Children()[0].(*plannercore.PointGetPlan); ok { - mayOptimizeForPointGet = true - } - } else if v, ok := plan.(*plannercore.Update); ok { - if _, ok := v.SelectPlan.(*plannercore.PointGetPlan); ok { - mayOptimizeForPointGet = true + p.optimizeForNotFetchingLatestTS = notNeedGetLatestTSFromPD(plan, false) + + return nil +} + +// notNeedGetLatestTSFromPD searches for optimization condition recursively +// Note: For point get and batch point get (name it plan), if one of the ancestor node is update/delete/physicalLock, +// we should check whether the plan.Lock is true or false. See comments in needNotToBeOptimized. +// inLockOrWriteStmt = true means one of the ancestor node is update/delete/physicalLock. +func notNeedGetLatestTSFromPD(plan plannercore.Plan, inLockOrWriteStmt bool) bool { + switch v := plan.(type) { + case *plannercore.PointGetPlan: + // We do not optimize the point get/ batch point get if plan.lock = false and inLockOrWriteStmt = true. + // Theoretically, the plan.lock should be true if the flag is true. But due to the bug describing in Issue35524, + // the plan.lock can be false in the case of inLockOrWriteStmt being true. In this case, optimization here can lead to different results + // which cannot be accepted as AdviseOptimizeWithPlan cannot change results. + return !inLockOrWriteStmt || v.Lock + case *plannercore.BatchPointGetPlan: + return !inLockOrWriteStmt || v.Lock + case plannercore.PhysicalPlan: + if len(v.Children()) == 0 { + return false } - } else if v, ok := plan.(*plannercore.Delete); ok { - if _, ok := v.SelectPlan.(*plannercore.PointGetPlan); ok { - mayOptimizeForPointGet = true + _, isPhysicalLock := v.(*plannercore.PhysicalLock) + for _, p := range v.Children() { + if !notNeedGetLatestTSFromPD(p, isPhysicalLock || inLockOrWriteStmt) { + return false + } } + return true + case *plannercore.Update: + return notNeedGetLatestTSFromPD(v.SelectPlan, true) + case *plannercore.Delete: + return notNeedGetLatestTSFromPD(v.SelectPlan, true) + case *plannercore.Insert: + return v.SelectPlan == nil } - - p.followingOperatorIsPointGetForUpdate = mayOptimizeForPointGet - - return nil + return false } func (p *PessimisticRRTxnContextProvider) handleAfterPessimisticLockError(lockErr error) (sessiontxn.StmtErrorAction, error) { diff --git a/sessiontxn/isolation/repeatable_read_test.go b/sessiontxn/isolation/repeatable_read_test.go index dfeed73e9af33..c60c1c3da560d 100644 --- a/sessiontxn/isolation/repeatable_read_test.go +++ b/sessiontxn/isolation/repeatable_read_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/executor" @@ -344,60 +345,89 @@ func TestOptimizeWithPlanInPessimisticRR(t *testing.T) { tk.MustExec("insert into t values (1,1), (2,2)") se := tk.Session() provider := initializeRepeatableReadProvider(t, tk, true) - forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS() + lastFetchedForUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS() txnManager := sessiontxn.GetTxnManager(se) - require.NoError(t, txnManager.OnStmtStart(context.TODO())) - stmt, err := parser.New().ParseOneStmt("delete from t where id = 1", "", "") - require.NoError(t, err) - compareTs := getOracleTS(t, se) - compiler := executor.Compiler{Ctx: se} - execStmt, err := compiler.Compile(context.TODO(), stmt) - require.NoError(t, err) - err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan) - require.NoError(t, err) - ts, err := provider.GetStmtForUpdateTS() - require.NoError(t, err) - require.Greater(t, compareTs, ts) - require.Equal(t, ts, forUpdateTS) + type testStruct struct { + sql string + shouldOptimize bool + } - require.NoError(t, txnManager.OnStmtStart(context.TODO())) - stmt, err = parser.New().ParseOneStmt("update t set v = v + 10 where id = 1", "", "") - require.NoError(t, err) - compiler = executor.Compiler{Ctx: se} - execStmt, err = compiler.Compile(context.TODO(), stmt) - require.NoError(t, err) - err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan) - require.NoError(t, err) - ts, err = provider.GetStmtForUpdateTS() - require.NoError(t, err) - require.Equal(t, ts, forUpdateTS) + cases := []testStruct{ + { + "delete from t where id = 1", + true, + }, + { + "update t set v = v + 10 where id = 1", + true, + }, + { + "select * from (select * from t where id = 1 for update) as t1 for update", + true, + }, + { + "select * from t where id = 1 for update", + true, + }, + { + "select * from t where id = 1 or id = 2 for update", + true, + }, + { + "select * from t for update", + false, + }, + } - require.NoError(t, txnManager.OnStmtStart(context.TODO())) - stmt, err = parser.New().ParseOneStmt("select * from (select * from t where id = 1 for update) as t1 for update", "", "") - require.NoError(t, err) - compiler = executor.Compiler{Ctx: se} - execStmt, err = compiler.Compile(context.TODO(), stmt) - require.NoError(t, err) - err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan) - require.NoError(t, err) - ts, err = provider.GetStmtForUpdateTS() - require.NoError(t, err) - require.Equal(t, ts, forUpdateTS) + var stmt ast.StmtNode + var err error + var execStmt *executor.ExecStmt + var compiler executor.Compiler + var ts, compareTS uint64 + var action sessiontxn.StmtErrorAction - // Now, test for one that does not use the optimization - require.NoError(t, txnManager.OnStmtStart(context.TODO())) - stmt, err = parser.New().ParseOneStmt("select * from t for update", "", "") - compareTs = getOracleTS(t, se) - require.NoError(t, err) - compiler = executor.Compiler{Ctx: se} - execStmt, err = compiler.Compile(context.TODO(), stmt) - require.NoError(t, err) - err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan) - require.NoError(t, err) - ts, err = provider.GetStmtForUpdateTS() - require.NoError(t, err) - require.Greater(t, ts, compareTs) + for _, c := range cases { + compareTS = getOracleTS(t, se) + + require.NoError(t, txnManager.OnStmtStart(context.TODO())) + stmt, err = parser.New().ParseOneStmt(c.sql, "", "") + require.NoError(t, err) + + err = provider.OnStmtStart(context.TODO()) + require.NoError(t, err) + + compiler = executor.Compiler{Ctx: se} + execStmt, err = compiler.Compile(context.TODO(), stmt) + require.NoError(t, err) + + err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan) + require.NoError(t, err) + + ts, err = provider.GetStmtForUpdateTS() + require.NoError(t, err) + + if c.shouldOptimize { + require.Greater(t, compareTS, ts) + require.Equal(t, ts, lastFetchedForUpdateTS) + } else { + require.Greater(t, ts, compareTS) + } + + // retry + if c.shouldOptimize { + action, err = provider.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, kv.ErrWriteConflict) + require.NoError(t, err) + require.Equal(t, sessiontxn.StmtActionRetryReady, action) + err = provider.OnStmtRetry(context.TODO()) + require.NoError(t, err) + ts, err = provider.GetStmtForUpdateTS() + require.NoError(t, err) + require.Greater(t, ts, compareTS) + + lastFetchedForUpdateTS = ts + } + } // Test use startTS after optimize when autocommit=0 activeAssert := activePessimisticRRAssert(t, tk.Session(), true) @@ -415,7 +445,7 @@ func TestOptimizeWithPlanInPessimisticRR(t *testing.T) { require.Equal(t, tk.Session().GetSessionVars().TxnCtx.StartTS, ts) // Test still fetch for update ts after optimize when autocommit=0 - compareTs = getOracleTS(t, se) + compareTS = getOracleTS(t, se) activeAssert = activePessimisticRRAssert(t, tk.Session(), true) provider = initializeRepeatableReadProvider(t, tk, false) require.NoError(t, txnManager.OnStmtStart(context.TODO())) @@ -427,7 +457,179 @@ func TestOptimizeWithPlanInPessimisticRR(t *testing.T) { require.NoError(t, err) ts, err = provider.GetStmtForUpdateTS() require.NoError(t, err) - require.Greater(t, ts, compareTs) + require.Greater(t, ts, compareTS) +} + +var errorsInInsert = []string{ + "errWriteConflict", + "errDuplicateKey", +} + +func TestConflictErrorInInsertInRR(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertPessimisticLockErr", "return")) + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + se := tk.Session() + tk2 := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk2.MustExec("use test") + tk.MustExec("create table t (id int primary key, v int)") + + tk.MustExec("begin pessimistic") + tk2.MustExec("insert into t values (1, 2)") + se.SetValue(sessiontxn.AssertLockErr, nil) + _, err := tk.Exec("insert into t values (1, 1), (2, 2)") + require.Error(t, err) + records, ok := se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.True(t, ok) + for _, name := range errorsInInsert { + require.Equal(t, records[name], 1) + } + + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustExec("rollback") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertPessimisticLockErr")) +} + +func TestConflictErrorInPointGetForUpdateInRR(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertPessimisticLockErr", "return")) + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + se := tk.Session() + tk2 := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk2.MustExec("use test") + tk.MustExec("create table t (id int primary key, v int)") + tk.MustExec("insert into t values (1, 1), (2, 2)") + + tk.MustExec("begin pessimistic") + tk2.MustExec("update t set v = v + 10 where id = 1") + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustQuery("select * from t where id = 1 for update").Check(testkit.Rows("1 11")) + records, ok := se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.True(t, ok) + require.Equal(t, records["errWriteConflict"], 1) + tk.MustExec("commit") + + // batch point get + tk.MustExec("begin pessimistic") + tk2.MustExec("update t set v = v + 10 where id = 1") + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustQuery("select * from t where id = 1 or id = 2 for update").Check(testkit.Rows("1 21", "2 2")) + records, ok = se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.True(t, ok) + require.Equal(t, records["errWriteConflict"], 1) + tk.MustExec("commit") + + tk.MustExec("rollback") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertPessimisticLockErr")) +} + +// Delete should get the latest ts and thus does not incur write conflict +func TestConflictErrorInDeleteInRR(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertPessimisticLockErr", "return")) + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + se := tk.Session() + tk2 := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk2.MustExec("use test") + tk.MustExec("create table t (id int primary key, v int)") + tk.MustExec("insert into t values (1, 1), (2, 2)") + + tk.MustExec("begin pessimistic") + tk2.MustExec("insert into t values (3, 1)") + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustExec("delete from t where v = 1") + _, ok := se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.False(t, ok) + tk.MustQuery("select * from t").Check(testkit.Rows("2 2")) + tk.MustExec("commit") + + tk.MustExec("begin pessimistic") + // However, if sub select in delete is point get, we will incur one write conflict + tk2.MustExec("update t set id = 1 where id = 2") + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustExec("delete from t where id = 1") + + records, ok := se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.True(t, ok) + require.Equal(t, records["errWriteConflict"], 1) + tk.MustQuery("select * from t for update").Check(testkit.Rows()) + + tk.MustExec("rollback") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertPessimisticLockErr")) +} + +func TestConflictErrorInUpdateInRR(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertPessimisticLockErr", "return")) + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + se := tk.Session() + tk2 := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk2.MustExec("use test") + tk.MustExec("create table t (id int primary key, v int)") + tk.MustExec("insert into t values (1, 1), (2, 2)") + + tk.MustExec("begin pessimistic") + tk2.MustExec("update t set v = v + 10") + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustExec("update t set v = v + 10") + _, ok := se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.False(t, ok) + tk.MustQuery("select * from t").Check(testkit.Rows("1 21", "2 22")) + tk.MustExec("commit") + + tk.MustExec("begin pessimistic") + // However, if the sub select plan is point get, we should incur one write conflict + tk2.MustExec("update t set v = v + 10 where id = 1") + tk.MustExec("update t set v = v + 10 where id = 1") + records, ok := se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.True(t, ok) + require.Equal(t, records["errWriteConflict"], 1) + tk.MustQuery("select * from t for update").Check(testkit.Rows("1 41", "2 22")) + + tk.MustExec("rollback") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertPessimisticLockErr")) +} + +func TestConflictErrorInOtherQueryContainingPointGet(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertPessimisticLockErr", "return")) + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + se := tk.Session() + tk2 := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk2.MustExec("use test") + tk.MustExec("create table t (id int primary key, v int)") + tk.MustExec("insert into t values (1, 1)") + + tk.MustExec("begin pessimistic") + tk2.MustExec("update t set v = v + 10 where id = 1") + se.SetValue(sessiontxn.AssertLockErr, nil) + tk.MustQuery("select * from t where id=1 and v > 1 for update").Check(testkit.Rows("1 11")) + records, ok := se.Value(sessiontxn.AssertLockErr).(map[string]int) + require.True(t, ok) + require.Equal(t, records["errWriteConflict"], 1) + + tk.MustExec("rollback") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertPessimisticLockErr")) } func activePessimisticRRAssert(t *testing.T, sctx sessionctx.Context, diff --git a/sessiontxn/txn_context_test.go b/sessiontxn/txn_context_test.go index d3e1f32124575..e247a14c86f2b 100644 --- a/sessiontxn/txn_context_test.go +++ b/sessiontxn/txn_context_test.go @@ -445,7 +445,7 @@ func TestTxnContextForHistoricalRead(t *testing.T) { }) doWithCheckPath(t, se, normalPathRecords, func() { - tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11")) + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10")) }) tk.MustExec("rollback")