diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index b7e53a8a9..2317e6cf4 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -78,7 +78,7 @@ func (s *testLockSuite) TearDownTest() { s.store.Close() } -func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, commitPrimary bool, asyncCommit bool) (uint64, uint64) { +func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, ttl uint64, commitPrimary bool, asyncCommit bool) (uint64, uint64) { txn, err := s.store.Begin() s.Nil(err) if len(value) > 0 { @@ -97,6 +97,7 @@ func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, com tpc, err := txn.NewCommitter(0) s.Nil(err) tpc.SetPrimaryKey(primaryKey) + tpc.SetLockTTL(ttl) if asyncCommit { tpc.SetUseAsyncCommit() } @@ -133,11 +134,11 @@ func (s *testLockSuite) putKV(key, value []byte) (uint64, uint64) { func (s *testLockSuite) prepareAlphabetLocks() { s.putKV([]byte("c"), []byte("cc")) - s.lockKey([]byte("c"), []byte("c"), []byte("z1"), []byte("z1"), true, false) - s.lockKey([]byte("d"), []byte("dd"), []byte("z2"), []byte("z2"), false, false) - s.lockKey([]byte("foo"), []byte("foo"), []byte("z3"), []byte("z3"), false, false) + s.lockKey([]byte("c"), []byte("c"), []byte("z1"), []byte("z1"), 3000, true, false) + s.lockKey([]byte("d"), []byte("dd"), []byte("z2"), []byte("z2"), 3000, false, false) + s.lockKey([]byte("foo"), []byte("foo"), []byte("z3"), []byte("z3"), 3000, false, false) s.putKV([]byte("bar"), []byte("bar")) - s.lockKey([]byte("bar"), nil, []byte("z4"), []byte("z4"), true, false) + s.lockKey([]byte("bar"), nil, []byte("z4"), []byte("z4"), 3000, true, false) } func (s *testLockSuite) TestScanLockResolveWithGet() { @@ -208,7 +209,7 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet() { func (s *testLockSuite) TestCleanLock() { for ch := byte('a'); ch <= byte('z'); ch++ { k := []byte{ch} - s.lockKey(k, k, k, k, false, false) + s.lockKey(k, k, k, k, 3000, false, false) } txn, err := s.store.Begin() s.Nil(err) @@ -227,13 +228,13 @@ func (s *testLockSuite) TestGetTxnStatus() { s.True(status.IsCommitted()) s.Equal(status.CommitTS(), commitTS) - startTS, commitTS = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), true, false) + startTS, commitTS = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), 3000, true, false) status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a")) s.Nil(err) s.True(status.IsCommitted()) s.Equal(status.CommitTS(), commitTS) - startTS, _ = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), false, false) + startTS, _ = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), 3000, false, false) status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a")) s.Nil(err) s.False(status.IsCommitted()) @@ -903,13 +904,13 @@ func (s *testLockSuite) TestResolveLocksForRead() { var locks []*txnlock.Lock // commitTS < readStartTS - startTS, _ := s.lockKey([]byte("k1"), []byte("v1"), []byte("k11"), []byte("v11"), true, false) + startTS, _ := s.lockKey([]byte("k1"), []byte("v1"), []byte("k11"), []byte("v11"), 3000, true, false) committedLocks = append(committedLocks, startTS) lock := s.mustGetLock([]byte("k1")) locks = append(locks, lock) // rolled back - startTS, _ = s.lockKey([]byte("k2"), []byte("v2"), []byte("k22"), []byte("v22"), false, false) + startTS, _ = s.lockKey([]byte("k2"), []byte("v2"), []byte("k22"), []byte("v22"), 3000, false, false) lock = s.mustGetLock([]byte("k22")) err := s.store.NewLockResolver().ForceResolveLock(ctx, lock) s.Nil(err) @@ -918,28 +919,34 @@ func (s *testLockSuite) TestResolveLocksForRead() { locks = append(locks, lock) // pushed - startTS, _ = s.lockKey([]byte("k3"), []byte("v3"), []byte("k33"), []byte("v33"), false, false) + startTS, _ = s.lockKey([]byte("k3"), []byte("v3"), []byte("k33"), []byte("v33"), 3000, false, false) resolvedLocks = append(resolvedLocks, startTS) lock = s.mustGetLock([]byte("k3")) locks = append(locks, lock) // can't be pushed and isn't expired - _, _ = s.lockKey([]byte("k4"), []byte("v4"), []byte("k44"), []byte("v44"), false, true) + _, _ = s.lockKey([]byte("k4"), []byte("v4"), []byte("k44"), []byte("v44"), 3000, false, true) lock = s.mustGetLock([]byte("k4")) locks = append(locks, lock) + // can't be pushed but is expired + startTS, _ = s.lockKey([]byte("k5"), []byte("v5"), []byte("k55"), []byte("v55"), 0, false, true) + committedLocks = append(committedLocks, startTS) + lock = s.mustGetLock([]byte("k5")) + locks = append(locks, lock) + // commitTS > readStartTS var readStartTS uint64 { t, err := s.store.Begin() s.Nil(err) resolvedLocks = append(resolvedLocks, t.StartTS()) - s.Nil(t.Set([]byte("k5"), []byte("v5"))) - s.Nil(t.Set([]byte("k55"), []byte("v55"))) + s.Nil(t.Set([]byte("k6"), []byte("v6"))) + s.Nil(t.Set([]byte("k66"), []byte("v66"))) committer, err := t.NewCommitter(1) s.Nil(err) s.Nil(committer.PrewriteAllMutations(ctx)) - committer.SetPrimaryKey([]byte("k55")) + committer.SetPrimaryKey([]byte("k66")) readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) @@ -950,12 +957,14 @@ func (s *testLockSuite) TestResolveLocksForRead() { committer.SetCommitTS(commitTS) err = committer.CommitMutations(ctx) s.Nil(err) - lock = s.mustGetLock([]byte("k5")) + lock = s.mustGetLock([]byte("k6")) locks = append(locks, lock) } bo := tikv.NewBackoffer(context.Background(), getMaxBackoff) - msBeforeExpired, resolved, committed, err := s.store.NewLockResolver().ResolveLocksForRead(bo, readStartTS, locks, false) + lr := s.store.NewLockResolver() + defer lr.Close() + msBeforeExpired, resolved, committed, err := lr.ResolveLocksForRead(bo, readStartTS, locks, false) s.Nil(err) s.Greater(msBeforeExpired, int64(0)) s.Equal(resolvedLocks, resolved) diff --git a/integration_tests/snapshot_fail_test.go b/integration_tests/snapshot_fail_test.go index d3e7b3640..fa6745dcb 100644 --- a/integration_tests/snapshot_fail_test.go +++ b/integration_tests/snapshot_fail_test.go @@ -310,7 +310,7 @@ func (s *testSnapshotFailSuite) TestResetSnapshotTS() { s.Equal(val, []byte("y1")) } -func (s *testSnapshotFailSuite) mustGetLock(key []byte) *txnkv.Lock { +func (s *testSnapshotFailSuite) getLock(key []byte) *txnkv.Lock { ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) @@ -324,45 +324,70 @@ func (s *testSnapshotFailSuite) mustGetLock(key []byte) *txnkv.Lock { s.Nil(err) s.NotNil(resp.Resp) keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() - s.NotNil(keyErr) + if keyErr == nil { + return nil + } lock, err := txnlock.ExtractLockFromKeyErr(keyErr) - s.Nil(err) + if err != nil { + return nil + } return lock } func (s *testSnapshotFailSuite) TestSnapshotUseResolveForRead() { - x := []byte("x_key_TestSnapshotUseResolveForRead") - y := []byte("y_key_TestSnapshotUseResolveForRead") - txn, err := s.store.Begin() - s.Nil(err) - s.Nil(txn.Set(x, []byte("x"))) - s.Nil(txn.Set(y, []byte("y"))) - ctx := context.Background() - committer, err := txn.NewCommitter(1) - s.Nil(err) - committer.SetLockTTL(3000) - s.Nil(committer.PrewriteAllMutations(ctx)) - committer.SetCommitTS(committer.GetStartTS() + 1) - committer.CommitMutations(ctx) - s.Equal(committer.GetPrimaryKey(), x) - s.NotNil(s.mustGetLock(y)) - - s.Nil(failpoint.Enable("tikvclient/resolveLock", `sleep(1000)`)) - defer s.Nil(failpoint.Disable("tikvclient/resolveLock")) - - txn, err = s.store.Begin() - s.Nil(err) - snapshot := txn.GetSnapshot() + s.Nil(failpoint.Enable("tikvclient/resolveLock", "sleep(500)")) + s.Nil(failpoint.Enable("tikvclient/resolveAsyncResolveData", "sleep(500)")) + defer func() { + s.Nil(failpoint.Disable("tikvclient/resolveAsyncResolveData")) + s.Nil(failpoint.Disable("tikvclient/resolveLock")) + }() - start := time.Now() - val, err := snapshot.Get(ctx, y) - s.Nil(err) - s.Equal([]byte("y"), val) - s.Less(time.Since(start), 500*time.Millisecond) + for _, asyncCommit := range []bool{false, true} { + x := []byte("x_key_TestSnapshotUseResolveForRead") + y := []byte("y_key_TestSnapshotUseResolveForRead") + txn, err := s.store.Begin() + s.Nil(err) + s.Nil(txn.Set(x, []byte("x"))) + s.Nil(txn.Set(y, []byte("y"))) + txn.SetEnableAsyncCommit(asyncCommit) + ctx := context.Background() + committer, err := txn.NewCommitter(1) + s.Nil(err) + committer.SetLockTTL(3000) + s.Nil(committer.PrewriteAllMutations(ctx)) + committer.SetCommitTS(committer.GetStartTS() + 1) + committer.CommitMutations(ctx) + s.Equal(committer.GetPrimaryKey(), x) + s.NotNil(s.getLock(y)) + + txn, err = s.store.Begin() + s.Nil(err) + snapshot := txn.GetSnapshot() + start := time.Now() + val, err := snapshot.Get(ctx, y) + s.Nil(err) + s.Equal([]byte("y"), val) + s.Less(time.Since(start), 200*time.Millisecond) + s.NotNil(s.getLock(y)) - start = time.Now() - res, err := snapshot.BatchGet(ctx, [][]byte{y}) - s.Nil(err) - s.Equal([]byte("y"), res[string(y)]) - s.Less(time.Since(start), 500*time.Millisecond) + txn, err = s.store.Begin() + s.Nil(err) + snapshot = txn.GetSnapshot() + start = time.Now() + res, err := snapshot.BatchGet(ctx, [][]byte{y}) + s.Nil(err) + s.Equal([]byte("y"), res[string(y)]) + s.Less(time.Since(start), 200*time.Millisecond) + s.NotNil(s.getLock(y)) + + var lock *txnkv.Lock + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + lock = s.getLock(y) + if lock == nil { + break + } + } + s.Nil(lock, "failed to resolve lock timely") + } } diff --git a/tikv/kv.go b/tikv/kv.go index aa7eae3aa..7af981a9d 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -330,6 +330,7 @@ func (s *KVStore) Close() error { s.oracle.Close() s.pdClient.Close() + s.lockResolver.Close() if err := s.GetTiKVClient().Close(); err != nil { return err diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 5dd9fcdbf..166da49bb 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -69,6 +69,11 @@ type LockResolver struct { testingKnobs struct { meetLock func(locks []*Lock) } + + // LockResolver may have some goroutines resolving locks in the background. + // The Cancel function is to cancel these goroutines for passing goleak test. + asyncResolveCtx context.Context + asyncResolveCancel func() } // NewLockResolver creates a new LockResolver instance. @@ -79,9 +84,15 @@ func NewLockResolver(store storage) *LockResolver { } r.mu.resolved = make(map[uint64]TxnStatus) r.mu.recentResolved = list.New() + r.asyncResolveCtx, r.asyncResolveCancel = context.WithCancel(context.Background()) return r } +// Close cancels all background goroutines. +func (lr *LockResolver) Close() { + lr.asyncResolveCancel() +} + // TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback. type TxnStatus struct { ttl uint64 @@ -359,7 +370,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, err = lr.resolvePessimisticLock(bo, l) } else { if forRead { - asyncBo := retry.NewBackoffer(context.Background(), asyncResolveLockMaxBackoff) + asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff) go func() { // Pass an empty cleanRegions here to avoid data race and // let `reqCollapse` deduplicate identical resolve requests. @@ -745,6 +756,8 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK // resolveAsyncResolveData resolves all locks in an async-commit transaction according to the status. func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, data *asyncResolveData) error { + util.EvalFailpoint("resolveAsyncResolveData") + keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, data.keys, nil) if err != nil { return err @@ -794,7 +807,7 @@ func (lr *LockResolver) resolveAsyncCommitLock(bo *retry.Backoffer, l *Lock, sta logutil.BgLogger().Info("resolve async commit", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS)) if asyncResolveAll { - asyncBo := retry.NewBackoffer(context.Background(), asyncResolveLockMaxBackoff) + asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff) go func() { err := lr.resolveAsyncResolveData(asyncBo, l, status, resolveData) if err != nil {