diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index bafe21648..89bead1c9 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -1472,52 +1472,81 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`)) s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`)) - s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return("skip")`)) + s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`)) + s.NoError(failpoint.Enable("tikvclient/onRollback", `return("skipRollbackPessimisticLock")`)) defer func() { s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback")) s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries")) s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit")) + s.NoError(failpoint.Disable("tikvclient/onRollback")) }() - k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3") + k1, k2, k3, k4 := []byte("k1"), []byte("k2"), []byte("k3"), []byte("k4") v2, v3 := []byte("v2"), []byte("v3") ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - txn, err := s.store.Begin() + txn1, err := s.store.Begin() s.NoError(err) - txn.SetPessimistic(true) + txn1.SetPessimistic(true) { // Produce write conflict on key k2 - txn2, err := s.store.Begin() + helperTxn, err := s.store.Begin() s.NoError(err) - s.NoError(txn2.Set(k2, []byte("v0"))) - s.NoError(txn2.Commit(ctx)) + s.NoError(helperTxn.Set(k2, []byte("v0"))) + s.NoError(helperTxn.Commit(ctx)) } - lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now()) - err = txn.LockKeys(ctx, lockCtx, k1, k2) + lockCtx := kv.NewLockCtx(txn1.StartTS(), 200, time.Now()) + err = txn1.LockKeys(ctx, lockCtx, k1, k2) s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err)) - // k1 has txn's stale pessimistic lock now. + // k1 has txn1's stale pessimistic lock now. forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) s.NoError(err) lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now()) - s.NoError(txn.LockKeys(ctx, lockCtx, k2, k3)) + s.NoError(txn1.LockKeys(ctx, lockCtx, k2, k3)) - s.NoError(txn.Set(k2, v2)) - s.NoError(txn.Set(k3, v3)) - s.NoError(txn.Commit(ctx)) + s.NoError(txn1.Set(k2, v2)) + s.NoError(txn1.Set(k3, v3)) + s.NoError(txn1.Commit(ctx)) - // k3 has txn's stale prewrite lock now. + // k3 has txn1's stale prewrite lock now. - // Perform ScanLock - BatchResolveLock. + txn2, err := s.store.Begin() + txn2.SetPessimistic(true) + s.NoError(err) + lockCtx = kv.NewLockCtx(txn1.StartTS(), 200, time.Now()) + err = txn2.LockKeys(ctx, lockCtx, k4) + s.NoError(err) + s.NoError(txn2.Rollback()) + + // k4 has txn2's stale primary pessimistic lock now. currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + + remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS) + s.NoError(err) + + s.Len(remainingLocks, 3) + s.Equal(remainingLocks[0].Key, k1) + s.Equal(remainingLocks[0].LockType, kvrpcpb.Op_PessimisticLock) + s.Equal(remainingLocks[1].Key, k3) + s.Equal(remainingLocks[1].LockType, kvrpcpb.Op_Put) + s.Equal(remainingLocks[2].Key, k4) + s.Equal(remainingLocks[2].LockType, kvrpcpb.Op_PessimisticLock) + s.Equal(remainingLocks[2].Primary, k4) + + // Perform ScanLock - BatchResolveLock. s.NoError(err) s.NoError(s.store.GCResolveLockPhase(ctx, currentTS, 1)) + // Do ScanLock again to make sure no locks are left. + remainingLocks, err = s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS) + s.NoError(err) + s.Empty(remainingLocks) + // Check data consistency readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) snapshot := s.store.GetSnapshot(readTS) diff --git a/tikv/test_probe.go b/tikv/test_probe.go index d470d6919..5971480f3 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -35,6 +35,7 @@ package tikv import ( + "bytes" "context" "github.com/pingcap/kvproto/pkg/metapb" @@ -114,6 +115,42 @@ func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, co return s.resolveLocks(ctx, safepoint, concurrency) } +func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxVersion uint64) ([]*txnlock.Lock, error) { + bo := NewGcResolveLockMaxBackoffer(ctx) + const limit = 1024 + + var result []*txnlock.Lock + +outerLoop: + for { + locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit) + if err != nil { + return nil, err + } + for _, l := range locks { + if bytes.Compare(endKey, l.Key) <= 0 { + // Finished scanning the given range. + break outerLoop + } + result = append(result, l) + } + + if len(locks) < limit { + if len(loc.EndKey) == 0 { + // Scanned to the very end. + break outerLoop + } + // The current region is completely scanned. + startKey = loc.EndKey + } else { + // The current region may still have more locks. + startKey = append(locks[len(locks)-1].Key, 0) + } + } + + return result, nil +} + // LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose. type LockResolverProbe struct { *txnlock.LockResolverProbe diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index e5067ce8a..f9873a33f 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -556,10 +556,27 @@ func (txn *KVTxn) Rollback() error { txn.CancelAggressiveLocking(context.Background()) } + // `skipPessimisticRollback` may be true only when set by failpoint in tests. + skipPessimisticRollback := false + if val, err := util.EvalFailpoint("onRollback"); err == nil { + if s, ok := val.(string); ok { + if s == "skipRollbackPessimisticLock" { + logutil.BgLogger().Info("[failpoint] injected skip pessimistic rollback on explicit rollback", + zap.Uint64("txnStartTS", txn.startTS)) + skipPessimisticRollback = true + } else { + panic(fmt.Sprintf("unknown instruction %s for failpoint \"onRollback\"", s)) + } + } + } + start := time.Now() // Clean up pessimistic lock. if txn.IsPessimistic() && txn.committer != nil { - err := txn.rollbackPessimisticLocks() + var err error + if !skipPessimisticRollback { + err = txn.rollbackPessimisticLocks() + } txn.committer.ttlManager.close() if err != nil { logutil.BgLogger().Error(err.Error()) diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 74caae652..1aa359eb5 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -245,6 +245,12 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo } metrics.LockResolverCountWithExpired.Inc() + // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l) + if err != nil { + return false, err + } + if l.LockType == kvrpcpb.Op_PessimisticLock { // BatchResolveLocks forces resolving the locks ignoring whether whey are expired. // For pessimistic locks, committing them makes no sense, but it won't affect transaction @@ -252,6 +258,9 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo // Pessimistic locks needs special handling logic because their primary may not point // to the real primary of that transaction, and their state cannot be put in `txnInfos`. // (see: https://github.com/pingcap/tidb/issues/42937). + // + // `resolvePessimisticLock` should be called after calling `getTxnStatus`. + // See: https://github.com/pingcap/tidb/issues/45134 err := lr.resolvePessimisticLock(bo, l) if err != nil { return false, err @@ -259,12 +268,6 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo continue } - // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l) - if err != nil { - return false, err - } - // If the transaction uses async commit, CheckTxnStatus will reject rolling back the primary lock. // Then we need to check the secondary locks to determine the final status of the transaction. if status.primaryLock != nil && status.primaryLock.UseAsyncCommit { @@ -1172,6 +1175,8 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat } } +// resolvePessimisticLock handles pessimistic locks after checking txn status. +// Note that this function assumes `CheckTxnStatus` is done (or `getTxnStatusFromLock` has been called) on the lock. func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error { metrics.LockResolverCountWithResolveLocks.Inc() // The lock has been resolved by getTxnStatusFromLock.