From 1a91fc5c4cf98444116a973685ddc0c5a03aa323 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 6 Aug 2020 11:29:17 +0800 Subject: [PATCH] transaction: fix union select for update race (#19006) (#19022) * transaction: fix LockKeys race * do not update delta for lock keys * fix more race * fix another race Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Co-authored-by: Evan Zhou --- executor/executor.go | 9 +-------- kv/kv.go | 2 +- session/pessimistic_test.go | 12 ++++++++++++ session/session.go | 2 +- sessionctx/stmtctx/stmtctx.go | 17 +++++++++-------- sessionctx/variable/session.go | 2 +- store/tikv/txn.go | 8 +++----- 7 files changed, 28 insertions(+), 24 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 77805cc455915..89ef7a4b19351 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -921,13 +921,6 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { lockWaitTime = kv.LockNoWait } - if len(e.keys) > 0 { - // This operation is only for schema validator check. - for id := range e.tblID2Handle { - e.ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{}) - } - } - return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...) } @@ -952,7 +945,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx { func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { sctx := se.GetSessionVars().StmtCtx if !sctx.InUpdateStmt && !sctx.InDeleteStmt { - se.GetSessionVars().TxnCtx.ForUpdate = true + atomic.StoreUint32(&se.GetSessionVars().TxnCtx.ForUpdate, 1) } // Lock keys only once when finished fetching all results. txn, err := se.Txn(true) diff --git a/kv/kv.go b/kv/kv.go index da0751d691fae..07af8ec6903d5 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -222,7 +222,7 @@ type LockCtx struct { LockWaitTime int64 WaitStartTime time.Time PessimisticLockWaited *int32 - LockKeysDuration *time.Duration + LockKeysDuration *int64 LockKeysCount *int32 ReturnValues bool Values map[string]ReturnedValue diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 19996a9a15d0f..7e9c5d6dc01be 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -1535,3 +1535,15 @@ func (s *testPessimisticSuite) TestInsertDupKeyAfterLockBatchPointGet(c *C) { err = tk.ExecToErr("commit") c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) } + +func (s *testPessimisticSuite) TestPessimisticUnionForUpdate(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(id int, v int, k int, primary key (id), key kk(k))") + tk.MustExec("insert into t select 1, 1, 1") + tk.MustExec("begin pessimistic") + tk.MustQuery("(select * from t where id between 0 and 1 for update) union all (select * from t where id between 0 and 1 for update)") + tk.MustExec("update t set k = 2 where k = 1") + tk.MustExec("commit") + tk.MustExec("admin check table t") +} diff --git a/session/session.go b/session/session.go index 25b1e88f83062..c4eee97dcd477 100644 --- a/session/session.go +++ b/session/session.go @@ -624,7 +624,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { connID := s.sessionVars.ConnectionID s.sessionVars.RetryInfo.Retrying = true - if s.sessionVars.TxnCtx.ForUpdate { + if atomic.LoadUint32(&s.sessionVars.TxnCtx.ForUpdate) == 1 { err = ErrForUpdateCantRetry.GenWithStackByArgs(connID) return err } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 5f02704985ff0..fcfb774b66095 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -146,10 +146,10 @@ type StatementContext struct { planNormalized string planDigest string Tables []TableEntry - PointExec bool // for point update cached execution, Constant expression need to set "paramMarker" - lockWaitStartTime *time.Time // LockWaitStartTime stores the pessimistic lock wait start time + PointExec bool // for point update cached execution, Constant expression need to set "paramMarker" + lockWaitStartTime int64 // LockWaitStartTime stores the pessimistic lock wait start time PessimisticLockWaited int32 - LockKeysDuration time.Duration + LockKeysDuration int64 LockKeysCount int32 TblInfo2UnionScan map[*model.TableInfo]bool TaskID uint64 // unique ID for an execution of a statement @@ -488,7 +488,7 @@ func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails { var details execdetails.ExecDetails sc.mu.Lock() details = sc.mu.execDetails - details.LockKeysDuration = sc.LockKeysDuration + details.LockKeysDuration = time.Duration(atomic.LoadInt64(&sc.LockKeysDuration)) sc.mu.Unlock() return details } @@ -629,11 +629,12 @@ func (sc *StatementContext) SetFlagsFromPBFlag(flags uint64) { // GetLockWaitStartTime returns the statement pessimistic lock wait start time func (sc *StatementContext) GetLockWaitStartTime() time.Time { - if sc.lockWaitStartTime == nil { - curTime := time.Now() - sc.lockWaitStartTime = &curTime + startTime := atomic.LoadInt64(&sc.lockWaitStartTime) + if startTime == 0 { + startTime = time.Now().UnixNano() + atomic.StoreInt64(&sc.lockWaitStartTime, startTime) } - return *sc.lockWaitStartTime + return time.Unix(0, startTime) } //CopTasksDetails collects some useful information of cop-tasks during execution. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index c7f3072770a1c..a9cd2dbd0b918 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -153,11 +153,11 @@ type TransactionContext struct { // CreateTime For metrics. CreateTime time.Time StatementCount int - ForUpdate bool CouldRetry bool IsPessimistic bool Isolation string LockExpire uint32 + ForUpdate uint32 } // AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock. diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 0aa4466dde148..70fa2494ae378 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -344,6 +344,8 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error { // lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput ...kv.Key) error { + txn.mu.Lock() + defer txn.mu.Unlock() // Exclude keys that are already locked. var err error keys := make([][]byte, 0, len(keysInput)) @@ -352,7 +354,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput if lockCtx.PessimisticLockWaited != nil { if atomic.LoadInt32(lockCtx.PessimisticLockWaited) > 0 { timeWaited := time.Since(lockCtx.WaitStartTime) - *lockCtx.LockKeysDuration = timeWaited + atomic.StoreInt64(lockCtx.LockKeysDuration, int64(timeWaited)) metrics.TiKVPessimisticLockKeysDuration.Observe(timeWaited.Seconds()) } } @@ -361,7 +363,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput *lockCtx.LockKeysCount += int32(len(keys)) } }() - txn.mu.Lock() for _, key := range keysInput { // The value of lockedMap is only used by pessimistic transactions. valueExist, locked := txn.lockedMap[string(key)] @@ -385,7 +386,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput lockCtx.Values[string(key)] = kv.ReturnedValue{AlreadyLocked: true} } } - txn.mu.Unlock() if len(keys) == 0 { return nil } @@ -451,7 +451,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput txn.committer.ttlManager.run(txn.committer, lockCtx) } } - txn.mu.Lock() txn.lockKeys = append(txn.lockKeys, keys...) for _, key := range keys { // PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exists. @@ -465,7 +464,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput } } txn.dirty = true - txn.mu.Unlock() return nil }