Skip to content

Commit

Permalink
refactor function
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk committed Dec 14, 2020
1 parent 7a84a3f commit 60b80c5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true, nil)
c.Assert(err, IsNil)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

Expand Down
36 changes: 20 additions & 16 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,10 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
tikvLockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, l)
if err != nil {
return false, err
}
if l.LockType != kvrpcpb.Op_PessimisticLock && status.ttl == 0 {
lr.saveResolved(l.TxnID, status)
}

if status.ttl > 0 {
logutil.BgLogger().Error("BatchResolveLocks fail to clean locks, this result is not expected!")
Expand Down Expand Up @@ -433,7 +430,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true)
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, nil)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) {
Expand All @@ -458,17 +455,8 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
time.Sleep(100 * time.Millisecond)
})
for {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, l)
if err == nil {
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
// For prewrite locks, their primary keys should ALWAYS be the correct one.
if l.LockType != kvrpcpb.Op_PessimisticLock && status.ttl == 0 {
lr.saveResolved(l.TxnID, status)
}
return status, nil
}
// If the error is something other than txnNotFoundErr, throw the error (network
Expand Down Expand Up @@ -519,7 +507,8 @@ func (e txnNotFoundErr) Error() string {

// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64,
rollbackIfNotExist bool, lockInfo *Lock) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
Expand Down Expand Up @@ -588,6 +577,21 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
}

status.commitTS = cmdResp.CommitVersion
// If the transaction is still valid with ttl greater than zero, do nothing.
// If its status is certain:
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached, todo.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
if status.ttl == 0 {
if status.IsCommitted() || (lockInfo != nil && lockInfo.LockType != kvrpcpb.Op_PessimisticLock) {
lr.saveResolved(txnID, status)
}
}
}
return status, nil
}
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil)
resolver := newLockResolver(s.store)
// Call getTxnStatus to check the lock status.
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true)
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, nil)
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
Expand All @@ -306,15 +306,15 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
// Then call getTxnStatus again and check the lock status.
currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, nil)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Call getTxnStatus on a committed transaction.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true)
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, nil)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, commitTS)
Expand Down Expand Up @@ -342,7 +342,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
resolver := newLockResolver(s.store)

// Call getTxnStatus for the TxnNotFound case.
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false)
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, nil)
c.Assert(err, NotNil)
_, ok := errors.Cause(err).(txnNotFoundErr)
c.Assert(ok, IsTrue)
Expand Down

0 comments on commit 60b80c5

Please sign in to comment.