Skip to content

Commit

Permalink
store: update kvproto.CheckTxnStatus response (pingcap#13432)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and XiaTianliang committed Dec 21, 2019
1 parent 41512b8 commit 272d61a
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 45 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191112053614-3b43b46331d5
github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0
Expand Down
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg=
github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70 h1:l9VcGUPRHvmM7mkFHo4JqxZeCvioRuL1/4tFUQcs6jQ=
github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f h1:CJ1IdT7bPbIvyq2Of9VhC/fhEGh6+0ItdT1dPBv7x7I=
github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 h1:CHOC95Ct4abJ9EdmWqzpUxV+bgjB4lOxd3AFxqgoyzQ=
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
34 changes: 23 additions & 11 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,40 +661,52 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
}

func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)
startTS := uint64(5 << 18)
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", startTS, 666)

ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
ttl, commitTS, action, err := s.store.CheckTxnStatus([]byte("pk"), startTS, startTS+100, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(666))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed)

s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30)
s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+101)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false)
ttl, commitTS, _, err = s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(30))
c.Assert(commitTS, Equals, uint64(startTS+101))

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", startTS, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, startTS)

s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666)
s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk1"), startTS, 0, 666, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_NoAction)

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false)
s.mustPrewriteWithTTLOK(c, putMutations("pk2", "val"), "pk2", startTS, 666)
currentTS := uint64(777 << 18)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk2"), startTS, 0, currentTS, false)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_TTLExpireRollback)

// Cover the TxnNotFound case.
_, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
_, _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false)
c.Assert(err, NotNil)
notFound, ok := errors.Cause(err).(*ErrTxnNotFound)
c.Assert(ok, IsTrue)
c.Assert(notFound.StartTs, Equals, uint64(5))
c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound")

ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true)
c.Assert(err, IsNil)
c.Assert(ttl, Equals, uint64(0))
c.Assert(commitTS, Equals, uint64(0))
c.Assert(action, Equals, kvrpcpb.Action_LockNotExistRollback)

// Check the rollback tombstone blocks this prewrite which comes with a smaller startTS.
req := &kvrpcpb.PrewriteRequest{
Expand All @@ -710,7 +722,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) {
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// Push the minCommitTS
_, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
_, _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false)
c.Assert(err, IsNil)
err = s.store.Commit([][]byte{[]byte("x")}, 5, 10)
e, ok := errors.Cause(err).(*ErrCommitTSExpired)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type MVCCStore interface {
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
DeleteRange(startKey, endKey []byte) error
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error)
CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (uint64, uint64, kvrpcpb.Action, error)
Close() error
}

Expand Down
61 changes: 37 additions & 24 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,10 +1032,13 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// primaryKey + lockTS together could locate the primary lock.
// callerStartTS is the start ts of reader transaction.
// currentTS is the current ts, but it may be inaccurate. Just use it to check TTL.
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) {
func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64,
rollbackIfNotExist bool) (ttl uint64, commitTS uint64, action kvrpcpb.Action, err error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

action = kvrpcpb.Action_NoAction

startKey := mvccEncode(primaryKey, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
Expand All @@ -1046,9 +1049,11 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
dec := lockDecoder{
expectKey: primaryKey,
}
ok, err := dec.Decode(iter)
var ok bool
ok, err = dec.Decode(iter)
if err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == lockTS {
Expand All @@ -1058,17 +1063,20 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
err = errors.Trace(err)
return
}
return 0, 0, nil
return 0, 0, kvrpcpb.Action_TTLExpireRollback, nil
}

// If this is a large transaction and the lock is active, push forward the minCommitTS.
// lock.minCommitTS == 0 may be a secondary lock, or not a large transaction.
// lock.minCommitTS == 0 may be a secondary lock, or not a large transaction (old version TiDB).
if lock.minCommitTS > 0 {
action = kvrpcpb.Action_MinCommitTSPushed
// We *must* guarantee the invariance lock.minCommitTS >= callerStartTS + 1
if lock.minCommitTS < callerStartTS+1 {
lock.minCommitTS = callerStartTS + 1
Expand All @@ -1081,33 +1089,36 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
}

writeKey := mvccEncode(primaryKey, lockVer)
writeValue, err := lock.MarshalBinary()
if err != nil {
return 0, 0, errors.Trace(err)
writeValue, err1 := lock.MarshalBinary()
if err1 != nil {
err = errors.Trace(err1)
return
}
batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
if err1 = mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1)
return
}
}
}

return lock.ttl, 0, nil
return lock.ttl, 0, action, nil
}

// If current transaction's lock does not exist.
// If the commit info of the current transaction exists.
c, ok, err := getTxnCommitInfo(iter, primaryKey, lockTS)
if err != nil {
return 0, 0, errors.Trace(err)
c, ok, err1 := getTxnCommitInfo(iter, primaryKey, lockTS)
if err1 != nil {
err = errors.Trace(err1)
return
}
if ok {
// If current transaction is already committed.
if c.valueType != typeRollback {
return 0, c.commitTS, nil
return 0, c.commitTS, action, nil
}
// If current transaction is already rollback.
return 0, 0, nil
return 0, 0, kvrpcpb.Action_NoAction, nil
}
}

Expand All @@ -1120,16 +1131,18 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

if rollbackIfNotExist {
batch := &leveldb.Batch{}
if err := rollbackLock(batch, primaryKey, lockTS); err != nil {
return 0, 0, errors.Trace(err)
if err1 := rollbackLock(batch, primaryKey, lockTS); err1 != nil {
err = errors.Trace(err1)
return
}
if err := mvcc.db.Write(batch, nil); err != nil {
return 0, 0, errors.Trace(err)
if err1 := mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1)
return
}
return 0, 0, nil
return 0, 0, kvrpcpb.Action_LockNotExistRollback, nil
}

return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
return 0, 0, action, &ErrTxnNotFound{kvrpcpb.TxnNotFound{
StartTs: lockTS,
PrimaryKey: primaryKey,
}}
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,11 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest)
panic("KvCheckTxnStatus: key not in region")
}
var resp kvrpcpb.CheckTxnStatusResponse
ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
ttl, commitTS, action, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist())
if err != nil {
resp.Error = convertToKeyError(err)
} else {
resp.LockTtl, resp.CommitVersion = ttl, commitTS
resp.LockTtl, resp.CommitVersion, resp.Action = ttl, commitTS, action
}
return &resp
}
Expand Down
4 changes: 3 additions & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve
type TxnStatus struct {
ttl uint64
commitTS uint64
action kvrpcpb.Action
}

// IsCommitted returns true if the txn's final status is Commit.
Expand Down Expand Up @@ -397,7 +398,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
}

if l.LockType == kvrpcpb.Op_PessimisticLock {
return TxnStatus{l.TTL, 0}, nil
return TxnStatus{ttl: l.TTL}, nil
}

// Handle txnNotFound error.
Expand Down Expand Up @@ -482,6 +483,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
}
status.action = cmdResp.Action
if cmdResp.LockTtl != 0 {
status.ttl = cmdResp.LockTtl
} else {
Expand Down
14 changes: 11 additions & 3 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.ttl, Greater, uint64(0), Commentf("action:%s", status.action))
}

func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
Expand Down Expand Up @@ -234,6 +234,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Check a committed txn.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -279,6 +280,8 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
oracle := s.store.GetOracle()
currentTS, err := oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
c.Assert(currentTS, Greater, txn.StartTS())

bo := NewBackoffer(context.Background(), PrewriteMaxBackoff)
resolver := newLockResolver(s.store)
// Call getTxnStatus to check the lock status.
Expand All @@ -287,6 +290,9 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
c.Assert(status.IsCommitted(), IsFalse)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(status.CommitTS(), Equals, uint64(0))
// TODO: It should be Action_MinCommitTSPushed if minCommitTS is set in the Prewrite request.
// Update here to kvrpcpb.Action_MinCommitTSPushed in the next PR.
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Test the ResolveLocks API
lock := s.mustGetLock(c, []byte("second"))
Expand All @@ -303,10 +309,11 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
// Then call getTxnStatus again and check the lock status.
currentTS, err = oracle.GetTimestamp(context.Background())
c.Assert(err, IsNil)
status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true)
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)

// Call getTxnStatus on a committed transaction.
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
Expand Down Expand Up @@ -366,14 +373,15 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
c.Assert(err, IsNil)
lock = &Lock{
Key: []byte("second"),
Primary: []byte("key"),
Primary: []byte("key_not_exist"),
TxnID: startTS,
TTL: 1000,
}
status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, uint64(0))
c.Assert(status.commitTS, Equals, uint64(0))
c.Assert(status.action, Equals, kvrpcpb.Action_LockNotExistRollback)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
Expand Down

0 comments on commit 272d61a

Please sign in to comment.