From 60b80c5b398ffd050b4fc8c88c840aaf9c10b646 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 14 Dec 2020 15:32:32 +0800 Subject: [PATCH] refactor function --- store/tikv/2pc_test.go | 2 +- store/tikv/lock_resolver.go | 36 ++++++++++++++++++++---------------- store/tikv/lock_test.go | 8 ++++---- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index da87a42551ef4..00b509e6c920e 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -629,7 +629,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lr := newLockResolver(s.store) bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil) - status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true, nil) c.Assert(err, IsNil) c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 1c5d0673eb7b2..c221608d79ce5 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -225,13 +225,10 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithExpired.Inc() // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, l) if err != nil { return false, err } - if l.LockType != kvrpcpb.Op_PessimisticLock && status.ttl == 0 { - lr.saveResolved(l.TxnID, status) - } if status.ttl > 0 { logutil.BgLogger().Error("BatchResolveLocks fail to clean locks, this result is not expected!") @@ -433,7 +430,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, nil) } func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { @@ -458,17 +455,8 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart time.Sleep(100 * time.Millisecond) }) for { - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist) + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, l) if err == nil { - // If l.LockType is pessimistic lock type: - // - if its primary lock is pessimistic too, the check txn status result should not be cached. - // - if its primary lock is prewrite lock type, the check txn status could be cached. - // If l.lockType is prewrite lock type: - // - always cache the check txn status result. - // For prewrite locks, their primary keys should ALWAYS be the correct one. - if l.LockType != kvrpcpb.Op_PessimisticLock && status.ttl == 0 { - lr.saveResolved(l.TxnID, status) - } return status, nil } // If the error is something other than txnNotFoundErr, throw the error (network @@ -519,7 +507,8 @@ func (e txnNotFoundErr) Error() string { // getTxnStatus sends the CheckTxnStatus request to the TiKV server. // When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error. -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, + rollbackIfNotExist bool, lockInfo *Lock) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } @@ -588,6 +577,21 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte } status.commitTS = cmdResp.CommitVersion + // If the transaction is still valid with ttl greater than zero, do nothing. + // If its status is certain: + // If transaction is already committed, the result could be cached. + // Otherwise: + // If l.LockType is pessimistic lock type: + // - if its primary lock is pessimistic too, the check txn status result should not be cached. + // - if its primary lock is prewrite lock type, the check txn status could be cached, todo. + // If l.lockType is prewrite lock type: + // - always cache the check txn status result. + // For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change. + if status.ttl == 0 { + if status.IsCommitted() || (lockInfo != nil && lockInfo.LockType != kvrpcpb.Op_PessimisticLock) { + lr.saveResolved(txnID, status) + } + } } return status, nil } diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index a8d217254ca3d..38dd0399e05be 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -284,7 +284,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil) resolver := newLockResolver(s.store) // Call getTxnStatus to check the lock status. - status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) + status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, nil) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) @@ -306,7 +306,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Then call getTxnStatus again and check the lock status. currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) - status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true) + status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, nil) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) @@ -314,7 +314,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true) + status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, nil) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, commitTS) @@ -342,7 +342,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { resolver := newLockResolver(s.store) // Call getTxnStatus for the TxnNotFound case. - _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) + _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, nil) c.Assert(err, NotNil) _, ok := errors.Cause(err).(txnNotFoundErr) c.Assert(ok, IsTrue)