Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: update kvrpc.Cleanup proto and change its behaviour #12212 #12417

Merged
merged 6 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY=
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
Expand All @@ -14,7 +13,6 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mo
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
Expand Down Expand Up @@ -290,7 +288,6 @@ golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGm
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180608181217-32ee49c4dd80/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 h1:9oFlwfEGIvmxXTcY53ygNyxIQtWciRHjrnUvZJCYXYU=
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg=
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) {
s.mustPutOK(c, "test", "test2", 5, 8)

// simulate `getTxnStatus` for txn 2.
err := s.store.Cleanup([]byte("test"), 2)
err := s.store.Cleanup([]byte("test"), 2, math.MaxUint64)
c.Assert(err, IsNil)
req = &kvrpcpb.PrewriteRequest{
Mutations: putMutations("test", "test3"),
Expand Down Expand Up @@ -614,7 +614,7 @@ func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) {
c.Assert(ttl, Greater, uint64(300))

// The lock has already been clean up
c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil)
c.Assert(s.store.Cleanup([]byte("pk"), 5, 0), IsNil)
_, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000)
c.Assert(err, NotNil)
}
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ type MVCCStore interface {
Prewrite(req *kvrpcpb.PrewriteRequest) []error
Commit(keys [][]byte, startTS, commitTS uint64) error
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
Cleanup(key []byte, startTS, currentTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
Expand Down
67 changes: 64 additions & 3 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/goleveldb/leveldb/util"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/deadlock"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -727,6 +728,12 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
}
if ok {
if dec.lock.startTS != startTS {
if isPessimisticLock {
// NOTE: A special handling.
// When pessimistic txn prewrite meets lock, set the TTL = 0 means
// telling TiDB to rollback the transaction **unconditionly**.
dec.lock.ttl = 0
}
return dec.lock.lockErr(mutation.Key)
}
if dec.lock.op != kvrpcpb.Op_PessimisticLock {
Expand Down Expand Up @@ -954,19 +961,73 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal
}

// Cleanup implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error {
// Cleanup API is deprecated, use CheckTxnStatus instead.
func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
mvcc.mu.Lock()
defer func() {
mvcc.mu.Unlock()
mvcc.deadlockDetector.CleanUp(startTS)
}()

batch := &leveldb.Batch{}
err := rollbackKey(mvcc.db, batch, key, startTS)
startKey := mvccEncode(key, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
})
defer iter.Release()

if iter.Valid() {
dec := lockDecoder{
expectKey: key,
}
ok, err := dec.Decode(iter)
if err != nil {
return err
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == startTS {

// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
return err
}
return mvcc.db.Write(batch, nil)
}

// Otherwise, return a locked error with the TTL information.
return dec.lock.lockErr(key)
}

// If current transaction's lock does not exist.
// If the commit information of the current transaction exist.
c, ok, err := getTxnCommitInfo(iter, key, startTS)
if err != nil {
return errors.Trace(err)
}
if ok {
// If the current transaction has already committed.
if c.valueType != typeRollback {
return ErrAlreadyCommitted(c.commitTS)
}
// If the current transaction has already rollbacked.
return nil
}
}

// If current transaction is not prewritted before.
value := mvccValue{
valueType: typeRollback,
startTS: startTS,
commitTS: startTS,
}
writeKey := mvccEncode(key, startTS)
writeValue, err := value.MarshalBinary()
if err != nil {
return errors.Trace(err)
}
return mvcc.db.Write(batch, nil)
batch.Put(writeKey, writeValue)
return nil
}

// TxnHeartBeat implements the MVCCStore interface.
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean
panic("KvCleanup: key not in region")
}
var resp kvrpcpb.CleanupResponse
err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion())
err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion(), req.GetCurrentTs())
if err != nil {
if commitTS, ok := errors.Cause(err).(ErrAlreadyCommitted); ok {
resp.CommitVersion = uint64(commitTS)
Expand Down
140 changes: 101 additions & 39 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,17 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve
return s.lockResolver, nil
}

// TxnStatus represents a txn's final status. It should be Commit or Rollback.
type TxnStatus uint64
// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback.
type TxnStatus struct {
ttl uint64
commitTS uint64
}

// IsCommitted returns true if the txn's final status is Commit.
func (s TxnStatus) IsCommitted() bool { return s > 0 }
func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 }

// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true.
func (s TxnStatus) CommitTS() uint64 { return uint64(s) }
func (s TxnStatus) CommitTS() uint64 { return uint64(s.commitTS) }

// By default, locks after 3000ms is considered unusual (the client created the
// lock might be dead). Other client may cleanup this kind of lock.
Expand Down Expand Up @@ -171,15 +174,16 @@ func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) {
return s, ok
}

// BatchResolveLocks resolve locks in a batch
// BatchResolveLocks resolve locks in a batch.
// Used it in gcworker only!
func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error) {
if len(locks) == 0 {
return true, nil
}

tikvLockResolverCountWithBatchResolve.Inc()

var expiredLocks []*Lock
expiredLocks := make([]*Lock, 0, len(locks))
for _, l := range locks {
if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) {
tikvLockResolverCountWithExpired.Inc()
Expand All @@ -202,11 +206,11 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
continue
}

status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
if err != nil {
return false, errors.Trace(err)
}
txnInfos[l.TxnID] = uint64(status)
txnInfos[l.TxnID] = uint64(status.commitTS)
}
logutil.Logger(context.Background()).Info("BatchResolveLocks: lookup txn status",
zap.Duration("cost time", time.Since(startTime)),
Expand Down Expand Up @@ -268,9 +272,10 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) {
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error) {
var msBeforeTxnExpired txnExpireTime
if len(locks) == 0 {
return
return msBeforeTxnExpired.value(), nil
}

tikvLockResolverCountWithResolve.Inc()
Expand All @@ -279,61 +284,111 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE
for _, l := range locks {
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
if msBeforeLockExpired <= 0 {
tikvLockResolverCountWithExpired.Inc()
expiredLocks = append(expiredLocks, l)
} else {
if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired {
msBeforeTxnExpired = msBeforeLockExpired
}
msBeforeTxnExpired.update(int64(l.TTL))
tikvLockResolverCountWithNotExpired.Inc()
}
}
if len(expiredLocks) == 0 {
if msBeforeTxnExpired > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return
}

// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
for _, l := range expiredLocks {
var status TxnStatus
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary)
status, err := lr.getTxnStatusFromLock(bo, l)
if err != nil {
msBeforeTxnExpired = 0
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
return
return msBeforeTxnExpired.value(), err
}

cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}
if status.ttl == 0 {
tikvLockResolverCountWithExpired.Inc()
// If the lock is committed or rollbacked, resolve lock.
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}

err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
msBeforeTxnExpired = 0
err = errors.Trace(err)
return
err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
return msBeforeTxnExpired.value(), err
}
} else {
tikvLockResolverCountWithNotExpired.Inc()
// If the lock is valid, the txn may be a pessimistic transaction.
// Update the txn expire time.
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl)
msBeforeTxnExpired.update(msBeforeLockExpired)
}
}

if msBeforeTxnExpired.value() > 0 {
tikvLockResolverCountWithWaitExpired.Inc()
}
return msBeforeTxnExpired.value(), nil
}

type txnExpireTime struct {
initialized bool
txnExpire int64
}

func (t *txnExpireTime) update(lockExpire int64) {
if lockExpire <= 0 {
lockExpire = 0
}
if !t.initialized {
t.txnExpire = lockExpire
t.initialized = true
return
}
if lockExpire < t.txnExpire {
t.txnExpire = lockExpire
}
return
}

func (t *txnExpireTime) value() int64 {
if !t.initialized {
return 0
}
return t.txnExpire
}

// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
// If the primary key is still locked, it will launch a Rollback to abort it.
// To avoid unnecessarily aborting too many txns, it is wiser to wait a few
// seconds before calling it after Prewrite.
func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) {
var status TxnStatus
bo := NewBackoffer(context.Background(), cleanupMaxBackoff)
status, err := lr.getTxnStatus(bo, txnID, primary)
return status, errors.Trace(err)
currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, currentTS)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**.
// In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call
// getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction.
if l.TTL == 0 {
return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0)
}

currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx)
if err != nil {
return TxnStatus{}, err
}
return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS)
}

func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
Expand All @@ -346,6 +401,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
Cleanup: &kvrpcpb.CleanupRequest{
Key: primary,
StartVersion: txnID,
CurrentTs: currentTS,
},
}
for {
Expand Down Expand Up @@ -373,12 +429,18 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
return status, errors.Trace(ErrBodyMissing)
}
if keyErr := cmdResp.GetError(); keyErr != nil {
// If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL.
if lockInfo := keyErr.GetLocked(); lockInfo != nil {
status.ttl = lockInfo.LockTtl
status.commitTS = 0
return status, nil
}
err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID)
logutil.Logger(context.Background()).Error("getTxnStatus error", zap.Error(err))
return status, err
}
if cmdResp.CommitVersion != 0 {
status = TxnStatus(cmdResp.GetCommitVersion())
status = TxnStatus{0, cmdResp.GetCommitVersion()}
tikvLockResolverCountWithQueryTxnStatusCommitted.Inc()
} else {
tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc()
Expand Down
Loading