From 9ab0bbf9a012aaf8ae2d3e1eee2a321e03a0d5c1 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Tue, 15 Dec 2020 17:49:55 +0800 Subject: [PATCH] txn: fix the resolved txn status cache for pessimistic txn(#21689) (#21706) --- session/pessimistic_test.go | 68 +++++++++++++++++++++++- store/mockstore/mocktikv/mvcc_leveldb.go | 11 ++++ store/tikv/2pc.go | 15 +++++- store/tikv/2pc_test.go | 2 +- store/tikv/lock_resolver.go | 25 ++++++--- store/tikv/lock_test.go | 2 +- store/tikv/txn.go | 11 ++++ 7 files changed, 124 insertions(+), 10 deletions(-) diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index ea62a78a6dd66..7215dec840c51 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -22,6 +22,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/domain" @@ -36,7 +37,7 @@ import ( "github.com/pingcap/tidb/util/testleak" ) -var _ = Suite(&testPessimisticSuite{}) +var _ = SerialSuites(&testPessimisticSuite{}) type testPessimisticSuite struct { cluster *mocktikv.Cluster @@ -779,3 +780,68 @@ func (s *testPessimisticSuite) TestInsertDupKeyAfterLock(c *C) { err = tk.ExecToErr("commit") c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue) } + +func (s *testPessimisticSuite) TestResolveStalePessimisticPrimaryLock(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeCommitSecondaries", "return(\"skip\")"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return(20000)"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommitSecondaries"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep"), IsNil) + }() + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk3 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop database if exists test") + tk.MustExec("create database test") + tk.MustExec("use test") + tk2.MustExec("use test") + tk3.MustExec("use test") + + tk3.MustExec("drop table if exists t1") + tk3.MustExec("create table t1(c1 int key, c2 int, c3 int, unique key uk(c2), key k1(c3), key k2(c2, c3));") + tk3.MustExec("insert into t1 values(1, 1, 1);") + tk3.MustExec("insert into t1 values(2, 2, 2);") + tk3.MustExec("insert into t1 values(3, 3, 3);") + tk3.MustExec("insert into t1 values(101, 101, 101);") + tk3.MustExec("insert into t1 values(201, 201, 201);") + tk3.MustExec("insert into t1 values(301, 301, 301);") + tk3.MustExec("insert into t1 values(401, 401, 401);") + tk3.MustExec("insert into t1 values(402, 402, 402);") + tk3.MustExec("insert into t1 values(501, 501, 501);") + tbl, err := domain.GetDomain(tk3.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + tblID := tbl.Meta().ID + ukIdxID := tbl.Indices()[0].Meta().ID + k1IdxID := tbl.Indices()[1].Meta().ID + k2IdxID := tbl.Indices()[2].Meta().ID + s.cluster.SplitTable(s.mvccStore, tblID, 8) + s.cluster.SplitIndex(s.mvccStore, tblID, ukIdxID, 8) + s.cluster.SplitIndex(s.mvccStore, tblID, k1IdxID, 8) + s.cluster.SplitIndex(s.mvccStore, tblID, k2IdxID, 8) + + tk.MustExec("set innodb_lock_wait_timeout = 1") + tk.MustExec("begin pessimistic") + tk3.MustExec("begin pessimistic") + tk3.MustQuery("select * from t1 where c1 = 501 for update").Check(testkit.Rows("501 501 501")) + err = tk.ExecToErr("update t1 set c1 = c1 + 10, c2 = c2 + 10;") + c.Assert(err, NotNil) + tk3.MustExec("rollback") + + tk2.MustExec("begin pessimistic") + tk2.MustExec("delete from t1 where c1 = 1") + tk2.MustExec("commit") + + tk.MustExec("update t1 set c1 = c1 + 10, c2 = c2 + 10 where c1 > 1;") + tk.MustExec("commit") + + tk2.MustExec("set innodb_lock_wait_timeout = 1") + tk2.MustExec("begin pessimistic") + tk2.MustExec("update t1 set c3 = c3 + 7 where c1 in (3, 101, 201, 301, 401, 402, 501)") + tk2.MustExec("commit") + + tk.MustExec("rollback") + tk2.MustExec("rollback") + tk3.MustExec("rollback") + + c.Assert(tk2.ExecToErr("admin check table t1"), IsNil) +} diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 444ee1d3b7cd7..ac76782885522 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -16,6 +16,7 @@ package mocktikv import ( "bytes" "context" + "encoding/hex" "math" "sync" @@ -999,6 +1000,10 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { if ok && dec.lock.startTS == startTS { // If the lock has already outdated, clean up it. if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { + logutil.Logger(context.Background()).Info("rollback expired lock and write rollback record", + zap.String("key", hex.EncodeToString(key)), + zap.Uint64("lock startTS", dec.lock.startTS), + zap.Stringer("lock op", dec.lock.op)) if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { return err } @@ -1125,6 +1130,12 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error { mvcc.mu.Lock() defer mvcc.mu.Unlock() + if len(startKey) > 0 { + startKey = []byte{} + } + if len(endKey) > 0 { + endKey = []byte{} + } iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) defer iter.Release() diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index f6d298b2b49c7..56d9b0da870d6 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -16,6 +16,7 @@ package tikv import ( "bytes" "context" + "encoding/hex" "fmt" "math" "sync" @@ -456,6 +457,15 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA // by test suites. secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff) go func() { + failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) { + if s, ok := v.(string); !ok { + logutil.Logger(bo.ctx).Info("[failpoint] sleep 2s before commit secondary keys", + zap.Uint64("connID", c.connID), zap.Uint64("startTS", c.startTS)) + time.Sleep(2 * time.Second) + } else if s == "skip" { + failpoint.Return() + } + }) e := c.doActionOnBatches(secondaryBo, action, batches) if e != nil { logutil.Logger(context.Background()).Debug("2PC async doActionOnBatches", @@ -1020,7 +1030,10 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch // There must be a serious bug somewhere. logutil.Logger(context.Background()).Error("2PC failed commit key after primary key committed", zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) + zap.String("primaryKey", hex.EncodeToString(c.primaryKey)), + zap.Uint64("txnStartTS", c.startTS), + zap.Uint64("forUpdateTS", c.forUpdateTS), + zap.Uint64("commitTS", c.commitTS)) return errors.Trace(err) } // The transaction maybe rolled back by concurrent transactions. diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 35f842fe4c38c..4480d14bbd1bb 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -556,7 +556,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lr := newLockResolver(s.store) bo := NewBackoffer(context.Background(), getMaxBackoff) - status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS, nil) c.Assert(err, IsNil) c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index a7e5f2d43351a..43dbddba9b266 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -214,7 +214,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi continue } - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, l) if err != nil { return false, errors.Trace(err) } @@ -377,7 +377,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, e if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, currentTS) + return lr.getTxnStatus(bo, txnID, primary, currentTS, nil) } func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) { @@ -386,17 +386,17 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, // In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call // getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction. if l.TTL == 0 { - return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) + return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, l) } currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) if err != nil { return TxnStatus{}, err } - return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS) + return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS, l) } -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64, lockInfo *Lock) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } @@ -453,7 +453,20 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte } else { tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc() } - lr.saveResolved(txnID, status) + // If its status is certain: + // If transaction is already committed, the result could be cached. + // Otherwise: + // If l.LockType is pessimistic lock type: + // - if its primary lock is pessimistic too, the check txn status result should not be cached. + // - if its primary lock is prewrite lock type, the check txn status could be cached, todo. + // If l.lockType is prewrite lock type: + // - always cache the check txn status result. + // For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change. + if status.ttl == 0 { + if status.IsCommitted() || (lockInfo != nil && lockInfo.LockType != kvrpcpb.Op_PessimisticLock) { + lr.saveResolved(txnID, status) + } + } return status, nil } } diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 731b93b5f2e4b..deedf92d018e2 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -256,7 +256,7 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) { c.Assert(newTTL, Equals, uint64(666)) // The getTxnStatus API is confusing, it really means rollback! - status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), 0) + status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), 0, nil) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index d20627c130a6d..2e12b8fc59e30 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -478,6 +478,17 @@ func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) wg := new(sync.WaitGroup) wg.Add(1) go func() { + failpoint.Inject("AsyncRollBackSleep", func(sleepTimeMS failpoint.Value) { + if tmp, ok := sleepTimeMS.(int); ok { + if tmp < 10000 { + logutil.Logger(ctx).Info("[failpoint] sleep before trigger asyncPessimisticRollback", zap.Int("sleep ms", tmp)) + time.Sleep(time.Duration(tmp) * time.Millisecond) + } else { + logutil.Logger(ctx).Info("[failpoint] async rollback return") + failpoint.Return() + } + } + }) err := committer.pessimisticRollbackKeys(NewBackoffer(ctx, pessimisticRollbackMaxBackoff), keys) if err != nil { logutil.Logger(ctx).Warn("[kv] pessimisticRollback failed.", zap.Error(err))