Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: fix the resolved txn status cache for pessimistic txn #21689

Merged
merged 4 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -548,7 +550,7 @@ func (s *testPessimisticSuite) TestAsyncRollBackNoWait(c *C) {
// even though async rollback for pessimistic lock may rollback later locked key if get ts failed from pd
// the txn correctness should be ensured
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/ExecStmtGetTsError", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return(100)"), IsNil)
tk.MustExec("begin pessimistic")
tk.MustExec("select * from tk where c1 > 0 for update nowait")
tk2.MustExec("begin pessimistic")
Expand Down Expand Up @@ -1833,3 +1835,71 @@ func (s *testPessimisticSuite) TestAmendForUniqueIndex(c *C) {
tk.MustExec("commit")
tk2.MustExec("admin check table t")
}

func (s *testPessimisticSuite) TestResolveStalePessimisticPrimaryLock(c *C) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's complex. I want to confirm Does it fail with the always cached version?

Copy link
Contributor Author

@cfzjywxk cfzjywxk Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested without cache change, it will fail with error reported from admin check statement. Could be treated as a reproducing case in unit-test, we need also add cases in ticase I think.

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeCommitSecondaries", "return(\"skip\")"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return(20000)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommitSecondaries"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep"), IsNil)
}()
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop database if exists test")
tk.MustExec("create database test")
tk.MustExec("use test")
tk2.MustExec("use test")
tk3.MustExec("use test")

tk3.MustExec("drop table if exists t1")
tk3.MustExec("create table t1(c1 int key, c2 int, c3 int, unique key uk(c2), key k1(c3), key k2(c2, c3));")
tk3.MustExec("insert into t1 values(1, 1, 1);")
tk3.MustExec("insert into t1 values(2, 2, 2);")
tk3.MustExec("insert into t1 values(3, 3, 3);")
tk3.MustExec("insert into t1 values(101, 101, 101);")
tk3.MustExec("insert into t1 values(201, 201, 201);")
tk3.MustExec("insert into t1 values(301, 301, 301);")
tk3.MustExec("insert into t1 values(401, 401, 401);")
tk3.MustExec("insert into t1 values(402, 402, 402);")
tk3.MustExec("insert into t1 values(501, 501, 501);")
tbl, err := domain.GetDomain(tk3.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
tblID := tbl.Meta().ID
ukIdxID := tbl.Indices()[0].Meta().ID
k1IdxID := tbl.Indices()[1].Meta().ID
k2IdxID := tbl.Indices()[2].Meta().ID
s.cluster.SplitTable(s.mvccStore, tblID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, ukIdxID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, k1IdxID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, k2IdxID, 8)

tk.MustExec("set innodb_lock_wait_timeout = 1")
tk.MustExec("begin pessimistic")
tk3.MustExec("begin pessimistic")
tk3.MustQuery("select * from t1 where c1 = 501 for update nowait").Check(testkit.Rows("501 501 501"))
err = tk.ExecToErr("update t1 set c1 = c1 + 10, c2 = c2 + 10;")
c.Assert(err, NotNil)
tk3.MustExec("rollback")

tk2.MustExec("begin pessimistic")
tk2.MustExec("delete from t1 where c1 = 1")
tk2.MustExec("commit")

// tk will get abort error.
err = tk.ExecToErr("update t1 set c1 = c1 + 10, c2 = c2 + 10 where c1 in(1)")
c.Assert(err, NotNil)

tk.MustExec("update t1 set c1 = c1 + 10, c2 = c2 + 10 where c1 > 1;")
tk.MustExec("commit")

tk2.MustExec("begin pessimistic")
tk2.MustExec("update t1 set c3 = c3 + 7 where c1 in (3, 101, 201, 301, 401, 402, 501)")
tk2.MustExec("commit")

tk.MustExec("rollback")
tk2.MustExec("rollback")
tk3.MustExec("rollback")

c.Assert(tk2.ExecToErr("admin check table t1"), IsNil)
}
10 changes: 10 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,10 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
logutil.BgLogger().Info("rollback expired lock and write rollback record",
zap.Stringer("primary key", kv.Key(primaryKey)),
zap.Uint64("lock startTS", dec.lock.startTS),
zap.Stringer("lock op", dec.lock.op))
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
err = errors.Trace(err)
return
Expand Down Expand Up @@ -1333,6 +1337,12 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

if len(startKey) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temproraly solving some mvcc leveldb problem without which resolve lock will not work for splitted regions.

startKey = []byte{}
}
if len(endKey) > 0 {
endKey = []byte{}
}
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release()
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,16 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
// by test suites.
secondaryBo := NewBackofferWithVars(context.Background(), CommitMaxBackoff, c.txn.vars)
go func() {
failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) {
if s, ok := v.(string); !ok {
logutil.Logger(bo.ctx).Info("[failpoint] sleep 2s before commit secondary keys",
zap.Uint64("connID", c.connID), zap.Uint64("startTS", c.startTS))
time.Sleep(2 * time.Second)
} else if s == "skip" {
failpoint.Return()
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about to make this failpoint more general (eg. like the following) so that we can either disable it by return("skip") or delay it by sleep(1000)?

failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) {
	if s, ok := v.(string); !ok {
	} else if s == "skip" {
		failpoint.Return()
	}
})


e := c.doActionOnBatches(secondaryBo, action, batches)
if e != nil {
logutil.BgLogger().Debug("2PC async doActionOnBatches",
Expand Down Expand Up @@ -1218,8 +1228,10 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}
logutil.Logger(bo.ctx).Error("2PC failed commit key after primary key committed",
zap.Error(err),
zap.Stringer("primaryKey", kv.Key(c.primaryKey)),
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("commitTS", c.commitTS),
zap.Uint64("forUpdateTS", c.forUpdateTS),
zap.Strings("keys", hexBatchKeys(batch.mutations.keys)))
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true, nil)
c.Assert(err, IsNil)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

Expand Down
25 changes: 20 additions & 5 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
tikvLockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, l)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true)
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, nil)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) {
Expand All @@ -455,7 +455,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
time.Sleep(100 * time.Millisecond)
})
for {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, l)
if err == nil {
return status, nil
}
Expand Down Expand Up @@ -507,7 +507,8 @@ func (e txnNotFoundErr) Error() string {

// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64,
rollbackIfNotExist bool, lockInfo *Lock) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
Expand Down Expand Up @@ -576,7 +577,21 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
}

status.commitTS = cmdResp.CommitVersion
lr.saveResolved(txnID, status)
lysu marked this conversation as resolved.
Show resolved Hide resolved
// If the transaction is still valid with ttl greater than zero, do nothing.
// If its status is certain:
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached, todo.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
if status.ttl == 0 {
if status.IsCommitted() || (lockInfo != nil && lockInfo.LockType != kvrpcpb.Op_PessimisticLock) {
lr.saveResolved(txnID, status)
}
}
}
return status, nil
}
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil)
resolver := newLockResolver(s.store)
// Call getTxnStatus to check the lock status.
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true)
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, nil)
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
Expand All @@ -306,15 +306,15 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
// Then call getTxnStatus again and check the lock status.
currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, nil)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Call getTxnStatus on a committed transaction.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true)
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, nil)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, commitTS)
Expand Down Expand Up @@ -342,7 +342,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
resolver := newLockResolver(s.store)

// Call getTxnStatus for the TxnNotFound case.
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false)
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, nil)
c.Assert(err, NotNil)
_, ok := errors.Cause(err).(txnNotFoundErr)
c.Assert(ok, IsTrue)
Expand Down
12 changes: 10 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,16 @@ func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte)
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
failpoint.Inject("AsyncRollBackSleep", func() {
time.Sleep(100 * time.Millisecond)
failpoint.Inject("AsyncRollBackSleep", func(sleepTimeMS failpoint.Value) {
if tmp, ok := sleepTimeMS.(int); ok {
if tmp < 10000 {
logutil.Logger(ctx).Info("[failpoint] sleep before trigger asyncPessimisticRollback", zap.Int("sleep ms", tmp))
time.Sleep(time.Duration(tmp) * time.Millisecond)
} else {
logutil.Logger(ctx).Info("[failpoint] async rollback return")
failpoint.Return()
}
}
})
err := committer.pessimisticRollbackMutations(NewBackofferWithVars(ctx, pessimisticRollbackMaxBackoff, txn.vars), CommitterMutations{keys: keys})
if err != nil {
Expand Down