Skip to content

Commit

Permalink
txn: fix the resolved txn status cache for pessimistic txn(#21689) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored Dec 15, 2020
1 parent dbe814c commit 9ab0bbf
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 10 deletions.
68 changes: 67 additions & 1 deletion session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
Expand All @@ -36,7 +37,7 @@ import (
"github.com/pingcap/tidb/util/testleak"
)

var _ = Suite(&testPessimisticSuite{})
var _ = SerialSuites(&testPessimisticSuite{})

type testPessimisticSuite struct {
cluster *mocktikv.Cluster
Expand Down Expand Up @@ -779,3 +780,68 @@ func (s *testPessimisticSuite) TestInsertDupKeyAfterLock(c *C) {
err = tk.ExecToErr("commit")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
}

func (s *testPessimisticSuite) TestResolveStalePessimisticPrimaryLock(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeCommitSecondaries", "return(\"skip\")"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return(20000)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommitSecondaries"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep"), IsNil)
}()
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop database if exists test")
tk.MustExec("create database test")
tk.MustExec("use test")
tk2.MustExec("use test")
tk3.MustExec("use test")

tk3.MustExec("drop table if exists t1")
tk3.MustExec("create table t1(c1 int key, c2 int, c3 int, unique key uk(c2), key k1(c3), key k2(c2, c3));")
tk3.MustExec("insert into t1 values(1, 1, 1);")
tk3.MustExec("insert into t1 values(2, 2, 2);")
tk3.MustExec("insert into t1 values(3, 3, 3);")
tk3.MustExec("insert into t1 values(101, 101, 101);")
tk3.MustExec("insert into t1 values(201, 201, 201);")
tk3.MustExec("insert into t1 values(301, 301, 301);")
tk3.MustExec("insert into t1 values(401, 401, 401);")
tk3.MustExec("insert into t1 values(402, 402, 402);")
tk3.MustExec("insert into t1 values(501, 501, 501);")
tbl, err := domain.GetDomain(tk3.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
tblID := tbl.Meta().ID
ukIdxID := tbl.Indices()[0].Meta().ID
k1IdxID := tbl.Indices()[1].Meta().ID
k2IdxID := tbl.Indices()[2].Meta().ID
s.cluster.SplitTable(s.mvccStore, tblID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, ukIdxID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, k1IdxID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, k2IdxID, 8)

tk.MustExec("set innodb_lock_wait_timeout = 1")
tk.MustExec("begin pessimistic")
tk3.MustExec("begin pessimistic")
tk3.MustQuery("select * from t1 where c1 = 501 for update").Check(testkit.Rows("501 501 501"))
err = tk.ExecToErr("update t1 set c1 = c1 + 10, c2 = c2 + 10;")
c.Assert(err, NotNil)
tk3.MustExec("rollback")

tk2.MustExec("begin pessimistic")
tk2.MustExec("delete from t1 where c1 = 1")
tk2.MustExec("commit")

tk.MustExec("update t1 set c1 = c1 + 10, c2 = c2 + 10 where c1 > 1;")
tk.MustExec("commit")

tk2.MustExec("set innodb_lock_wait_timeout = 1")
tk2.MustExec("begin pessimistic")
tk2.MustExec("update t1 set c3 = c3 + 7 where c1 in (3, 101, 201, 301, 401, 402, 501)")
tk2.MustExec("commit")

tk.MustExec("rollback")
tk2.MustExec("rollback")
tk3.MustExec("rollback")

c.Assert(tk2.ExecToErr("admin check table t1"), IsNil)
}
11 changes: 11 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mocktikv
import (
"bytes"
"context"
"encoding/hex"
"math"
"sync"

Expand Down Expand Up @@ -999,6 +1000,10 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
if ok && dec.lock.startTS == startTS {
// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
logutil.Logger(context.Background()).Info("rollback expired lock and write rollback record",
zap.String("key", hex.EncodeToString(key)),
zap.Uint64("lock startTS", dec.lock.startTS),
zap.Stringer("lock op", dec.lock.op))
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
return err
}
Expand Down Expand Up @@ -1125,6 +1130,12 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr
func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()
if len(startKey) > 0 {
startKey = []byte{}
}
if len(endKey) > 0 {
endKey = []byte{}
}

iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release()
Expand Down
15 changes: 14 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -456,6 +457,15 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
// by test suites.
secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff)
go func() {
failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) {
if s, ok := v.(string); !ok {
logutil.Logger(bo.ctx).Info("[failpoint] sleep 2s before commit secondary keys",
zap.Uint64("connID", c.connID), zap.Uint64("startTS", c.startTS))
time.Sleep(2 * time.Second)
} else if s == "skip" {
failpoint.Return()
}
})
e := c.doActionOnBatches(secondaryBo, action, batches)
if e != nil {
logutil.Logger(context.Background()).Debug("2PC async doActionOnBatches",
Expand Down Expand Up @@ -1020,7 +1030,10 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
// There must be a serious bug somewhere.
logutil.Logger(context.Background()).Error("2PC failed commit key after primary key committed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
zap.String("primaryKey", hex.EncodeToString(c.primaryKey)),
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("forUpdateTS", c.forUpdateTS),
zap.Uint64("commitTS", c.commitTS))
return errors.Trace(err)
}
// The transaction maybe rolled back by concurrent transactions.
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS, nil)
c.Assert(err, IsNil)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

Expand Down
25 changes: 19 additions & 6 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
continue
}

status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, l)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -377,7 +377,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, e
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, currentTS)
return lr.getTxnStatus(bo, txnID, primary, currentTS, nil)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) {
Expand All @@ -386,17 +386,17 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus,
// In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call
// getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction.
if l.TTL == 0 {
return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, l)
}

currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return TxnStatus{}, err
}
return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS)
return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS, l)
}

func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64, lockInfo *Lock) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
Expand Down Expand Up @@ -453,7 +453,20 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
} else {
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
}
lr.saveResolved(txnID, status)
// If its status is certain:
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached, todo.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
if status.ttl == 0 {
if status.IsCommitted() || (lockInfo != nil && lockInfo.LockType != kvrpcpb.Op_PessimisticLock) {
lr.saveResolved(txnID, status)
}
}
return status, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) {
c.Assert(newTTL, Equals, uint64(666))

// The getTxnStatus API is confusing, it really means rollback!
status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), 0)
status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), 0, nil)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
Expand Down
11 changes: 11 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,17 @@ func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte)
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
failpoint.Inject("AsyncRollBackSleep", func(sleepTimeMS failpoint.Value) {
if tmp, ok := sleepTimeMS.(int); ok {
if tmp < 10000 {
logutil.Logger(ctx).Info("[failpoint] sleep before trigger asyncPessimisticRollback", zap.Int("sleep ms", tmp))
time.Sleep(time.Duration(tmp) * time.Millisecond)
} else {
logutil.Logger(ctx).Info("[failpoint] async rollback return")
failpoint.Return()
}
}
})
err := committer.pessimisticRollbackKeys(NewBackoffer(ctx, pessimisticRollbackMaxBackoff), keys)
if err != nil {
logutil.Logger(ctx).Warn("[kv] pessimisticRollback failed.", zap.Error(err))
Expand Down

0 comments on commit 9ab0bbf

Please sign in to comment.