Skip to content

Commit

Permalink
fix goleak
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Nov 30, 2021
1 parent a2936f9 commit 609774e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 54 deletions.
43 changes: 26 additions & 17 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *testLockSuite) TearDownTest() {
s.store.Close()
}

func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, commitPrimary bool, asyncCommit bool) (uint64, uint64) {
func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, ttl uint64, commitPrimary bool, asyncCommit bool) (uint64, uint64) {
txn, err := s.store.Begin()
s.Nil(err)
if len(value) > 0 {
Expand All @@ -97,6 +97,7 @@ func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, com
tpc, err := txn.NewCommitter(0)
s.Nil(err)
tpc.SetPrimaryKey(primaryKey)
tpc.SetLockTTL(ttl)
if asyncCommit {
tpc.SetUseAsyncCommit()
}
Expand Down Expand Up @@ -133,11 +134,11 @@ func (s *testLockSuite) putKV(key, value []byte) (uint64, uint64) {

func (s *testLockSuite) prepareAlphabetLocks() {
s.putKV([]byte("c"), []byte("cc"))
s.lockKey([]byte("c"), []byte("c"), []byte("z1"), []byte("z1"), true, false)
s.lockKey([]byte("d"), []byte("dd"), []byte("z2"), []byte("z2"), false, false)
s.lockKey([]byte("foo"), []byte("foo"), []byte("z3"), []byte("z3"), false, false)
s.lockKey([]byte("c"), []byte("c"), []byte("z1"), []byte("z1"), 3000, true, false)
s.lockKey([]byte("d"), []byte("dd"), []byte("z2"), []byte("z2"), 3000, false, false)
s.lockKey([]byte("foo"), []byte("foo"), []byte("z3"), []byte("z3"), 3000, false, false)
s.putKV([]byte("bar"), []byte("bar"))
s.lockKey([]byte("bar"), nil, []byte("z4"), []byte("z4"), true, false)
s.lockKey([]byte("bar"), nil, []byte("z4"), []byte("z4"), 3000, true, false)
}

func (s *testLockSuite) TestScanLockResolveWithGet() {
Expand Down Expand Up @@ -208,7 +209,7 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet() {
func (s *testLockSuite) TestCleanLock() {
for ch := byte('a'); ch <= byte('z'); ch++ {
k := []byte{ch}
s.lockKey(k, k, k, k, false, false)
s.lockKey(k, k, k, k, 3000, false, false)
}
txn, err := s.store.Begin()
s.Nil(err)
Expand All @@ -227,13 +228,13 @@ func (s *testLockSuite) TestGetTxnStatus() {
s.True(status.IsCommitted())
s.Equal(status.CommitTS(), commitTS)

startTS, commitTS = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), true, false)
startTS, commitTS = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), 3000, true, false)
status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a"))
s.Nil(err)
s.True(status.IsCommitted())
s.Equal(status.CommitTS(), commitTS)

startTS, _ = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), false, false)
startTS, _ = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), 3000, false, false)
status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a"))
s.Nil(err)
s.False(status.IsCommitted())
Expand Down Expand Up @@ -903,13 +904,13 @@ func (s *testLockSuite) TestResolveLocksForRead() {
var locks []*txnlock.Lock

// commitTS < readStartTS
startTS, _ := s.lockKey([]byte("k1"), []byte("v1"), []byte("k11"), []byte("v11"), true, false)
startTS, _ := s.lockKey([]byte("k1"), []byte("v1"), []byte("k11"), []byte("v11"), 3000, true, false)
committedLocks = append(committedLocks, startTS)
lock := s.mustGetLock([]byte("k1"))
locks = append(locks, lock)

// rolled back
startTS, _ = s.lockKey([]byte("k2"), []byte("v2"), []byte("k22"), []byte("v22"), false, false)
startTS, _ = s.lockKey([]byte("k2"), []byte("v2"), []byte("k22"), []byte("v22"), 3000, false, false)
lock = s.mustGetLock([]byte("k22"))
err := s.store.NewLockResolver().ForceResolveLock(ctx, lock)
s.Nil(err)
Expand All @@ -918,28 +919,34 @@ func (s *testLockSuite) TestResolveLocksForRead() {
locks = append(locks, lock)

// pushed
startTS, _ = s.lockKey([]byte("k3"), []byte("v3"), []byte("k33"), []byte("v33"), false, false)
startTS, _ = s.lockKey([]byte("k3"), []byte("v3"), []byte("k33"), []byte("v33"), 3000, false, false)
resolvedLocks = append(resolvedLocks, startTS)
lock = s.mustGetLock([]byte("k3"))
locks = append(locks, lock)

// can't be pushed and isn't expired
_, _ = s.lockKey([]byte("k4"), []byte("v4"), []byte("k44"), []byte("v44"), false, true)
_, _ = s.lockKey([]byte("k4"), []byte("v4"), []byte("k44"), []byte("v44"), 3000, false, true)
lock = s.mustGetLock([]byte("k4"))
locks = append(locks, lock)

// can't be pushed but is expired
startTS, _ = s.lockKey([]byte("k5"), []byte("v5"), []byte("k55"), []byte("v55"), 0, false, true)
committedLocks = append(committedLocks, startTS)
lock = s.mustGetLock([]byte("k5"))
locks = append(locks, lock)

// commitTS > readStartTS
var readStartTS uint64
{
t, err := s.store.Begin()
s.Nil(err)
resolvedLocks = append(resolvedLocks, t.StartTS())
s.Nil(t.Set([]byte("k5"), []byte("v5")))
s.Nil(t.Set([]byte("k55"), []byte("v55")))
s.Nil(t.Set([]byte("k6"), []byte("v6")))
s.Nil(t.Set([]byte("k66"), []byte("v66")))
committer, err := t.NewCommitter(1)
s.Nil(err)
s.Nil(committer.PrewriteAllMutations(ctx))
committer.SetPrimaryKey([]byte("k55"))
committer.SetPrimaryKey([]byte("k66"))

readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.Nil(err)
Expand All @@ -950,12 +957,14 @@ func (s *testLockSuite) TestResolveLocksForRead() {
committer.SetCommitTS(commitTS)
err = committer.CommitMutations(ctx)
s.Nil(err)
lock = s.mustGetLock([]byte("k5"))
lock = s.mustGetLock([]byte("k6"))
locks = append(locks, lock)
}

bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
msBeforeExpired, resolved, committed, err := s.store.NewLockResolver().ResolveLocksForRead(bo, readStartTS, locks, false)
lr := s.store.NewLockResolver()
defer lr.Close()
msBeforeExpired, resolved, committed, err := lr.ResolveLocksForRead(bo, readStartTS, locks, false)
s.Nil(err)
s.Greater(msBeforeExpired, int64(0))
s.Equal(resolvedLocks, resolved)
Expand Down
95 changes: 60 additions & 35 deletions integration_tests/snapshot_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (s *testSnapshotFailSuite) TestResetSnapshotTS() {
s.Equal(val, []byte("y1"))
}

func (s *testSnapshotFailSuite) mustGetLock(key []byte) *txnkv.Lock {
func (s *testSnapshotFailSuite) getLock(key []byte) *txnkv.Lock {
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
Expand All @@ -324,45 +324,70 @@ func (s *testSnapshotFailSuite) mustGetLock(key []byte) *txnkv.Lock {
s.Nil(err)
s.NotNil(resp.Resp)
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
s.NotNil(keyErr)
if keyErr == nil {
return nil
}
lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
s.Nil(err)
if err != nil {
return nil
}
return lock
}

func (s *testSnapshotFailSuite) TestSnapshotUseResolveForRead() {
x := []byte("x_key_TestSnapshotUseResolveForRead")
y := []byte("y_key_TestSnapshotUseResolveForRead")
txn, err := s.store.Begin()
s.Nil(err)
s.Nil(txn.Set(x, []byte("x")))
s.Nil(txn.Set(y, []byte("y")))
ctx := context.Background()
committer, err := txn.NewCommitter(1)
s.Nil(err)
committer.SetLockTTL(3000)
s.Nil(committer.PrewriteAllMutations(ctx))
committer.SetCommitTS(committer.GetStartTS() + 1)
committer.CommitMutations(ctx)
s.Equal(committer.GetPrimaryKey(), x)
s.NotNil(s.mustGetLock(y))

s.Nil(failpoint.Enable("tikvclient/resolveLock", `sleep(1000)`))
defer s.Nil(failpoint.Disable("tikvclient/resolveLock"))

txn, err = s.store.Begin()
s.Nil(err)
snapshot := txn.GetSnapshot()
s.Nil(failpoint.Enable("tikvclient/resolveLock", "sleep(500)"))
s.Nil(failpoint.Enable("tikvclient/resolveAsyncResolveData", "sleep(500)"))
defer func() {
s.Nil(failpoint.Disable("tikvclient/resolveAsyncResolveData"))
s.Nil(failpoint.Disable("tikvclient/resolveLock"))
}()

start := time.Now()
val, err := snapshot.Get(ctx, y)
s.Nil(err)
s.Equal([]byte("y"), val)
s.Less(time.Since(start), 500*time.Millisecond)
for _, asyncCommit := range []bool{false, true} {
x := []byte("x_key_TestSnapshotUseResolveForRead")
y := []byte("y_key_TestSnapshotUseResolveForRead")
txn, err := s.store.Begin()
s.Nil(err)
s.Nil(txn.Set(x, []byte("x")))
s.Nil(txn.Set(y, []byte("y")))
txn.SetEnableAsyncCommit(asyncCommit)
ctx := context.Background()
committer, err := txn.NewCommitter(1)
s.Nil(err)
committer.SetLockTTL(3000)
s.Nil(committer.PrewriteAllMutations(ctx))
committer.SetCommitTS(committer.GetStartTS() + 1)
committer.CommitMutations(ctx)
s.Equal(committer.GetPrimaryKey(), x)
s.NotNil(s.getLock(y))

txn, err = s.store.Begin()
s.Nil(err)
snapshot := txn.GetSnapshot()
start := time.Now()
val, err := snapshot.Get(ctx, y)
s.Nil(err)
s.Equal([]byte("y"), val)
s.Less(time.Since(start), 200*time.Millisecond)
s.NotNil(s.getLock(y))

start = time.Now()
res, err := snapshot.BatchGet(ctx, [][]byte{y})
s.Nil(err)
s.Equal([]byte("y"), res[string(y)])
s.Less(time.Since(start), 500*time.Millisecond)
txn, err = s.store.Begin()
s.Nil(err)
snapshot = txn.GetSnapshot()
start = time.Now()
res, err := snapshot.BatchGet(ctx, [][]byte{y})
s.Nil(err)
s.Equal([]byte("y"), res[string(y)])
s.Less(time.Since(start), 200*time.Millisecond)
s.NotNil(s.getLock(y))

var lock *txnkv.Lock
for i := 0; i < 10; i++ {
time.Sleep(100 * time.Millisecond)
lock = s.getLock(y)
if lock == nil {
break
}
}
s.Nil(lock, "failed to resolve lock timely")
}
}
1 change: 1 addition & 0 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ func (s *KVStore) Close() error {

s.oracle.Close()
s.pdClient.Close()
s.lockResolver.Close()

if err := s.GetTiKVClient().Close(); err != nil {
return err
Expand Down
17 changes: 15 additions & 2 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type LockResolver struct {
testingKnobs struct {
meetLock func(locks []*Lock)
}

// LockResolver may have some goroutines resolving locks in the background.
// The Cancel function is to cancel these goroutines for passing goleak test.
asyncResolveCtx context.Context
asyncResolveCancel func()
}

// NewLockResolver creates a new LockResolver instance.
Expand All @@ -79,9 +84,15 @@ func NewLockResolver(store storage) *LockResolver {
}
r.mu.resolved = make(map[uint64]TxnStatus)
r.mu.recentResolved = list.New()
r.asyncResolveCtx, r.asyncResolveCancel = context.WithCancel(context.Background())
return r
}

// Close cancels all background goroutines.
func (lr *LockResolver) Close() {
lr.asyncResolveCancel()
}

// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback.
type TxnStatus struct {
ttl uint64
Expand Down Expand Up @@ -359,7 +370,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64,
err = lr.resolvePessimisticLock(bo, l)
} else {
if forRead {
asyncBo := retry.NewBackoffer(context.Background(), asyncResolveLockMaxBackoff)
asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff)
go func() {
// Pass an empty cleanRegions here to avoid data race and
// let `reqCollapse` deduplicate identical resolve requests.
Expand Down Expand Up @@ -745,6 +756,8 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK

// resolveAsyncResolveData resolves all locks in an async-commit transaction according to the status.
func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, data *asyncResolveData) error {
util.EvalFailpoint("resolveAsyncResolveData")

keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, data.keys, nil)
if err != nil {
return err
Expand Down Expand Up @@ -794,7 +807,7 @@ func (lr *LockResolver) resolveAsyncCommitLock(bo *retry.Backoffer, l *Lock, sta

logutil.BgLogger().Info("resolve async commit", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS))
if asyncResolveAll {
asyncBo := retry.NewBackoffer(context.Background(), asyncResolveLockMaxBackoff)
asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff)
go func() {
err := lr.resolveAsyncResolveData(asyncBo, l, status, resolveData)
if err != nil {
Expand Down

0 comments on commit 609774e

Please sign in to comment.