diff --git a/go.mod b/go.mod index b047aabfad63b..dc95049345efb 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3 github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f + github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible diff --git a/go.sum b/go.sum index 1ab204f14388d..53398f3042ae9 100644 --- a/go.sum +++ b/go.sum @@ -582,8 +582,9 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210819164333-bd5706b9d9f2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f h1:hjInxK1Ie6CYx7Jy2pYnBdEnWI8jIfr423l9Yh6LRy8= github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c h1:4D/M5eYfbswv3vs0ZtbVgNKwSRMXgAcm+9a+IbC7q0o= +github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/store/mockstore/unistore/tikv/errors.go b/store/mockstore/unistore/tikv/errors.go index 6043f2fd7a753..59635eb073a1b 100644 --- a/store/mockstore/unistore/tikv/errors.go +++ b/store/mockstore/unistore/tikv/errors.go @@ -15,6 +15,7 @@ package tikv import ( + "encoding/hex" "fmt" deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" @@ -132,3 +133,17 @@ type ErrTxnNotFound struct { func (e *ErrTxnNotFound) Error() string { return "txn not found" } + +// ErrAssertionFailed is returned if any assertion fails on a transaction request. +type ErrAssertionFailed struct { + StartTS uint64 + Key []byte + Assertion kvrpcpb.Assertion + ExistingStartTS uint64 + ExistingCommitTS uint64 +} + +func (e *ErrAssertionFailed) Error() string { + return fmt.Sprintf("AssertionFailed { StartTS: %v, Key: %v, Assertion: %v, ExistingStartTS: %v, ExistingCommitTS: %v }", + e.StartTS, hex.EncodeToString(e.Key), e.Assertion.String(), e.ExistingStartTS, e.ExistingCommitTS) +} diff --git a/store/mockstore/unistore/tikv/mvcc.go b/store/mockstore/unistore/tikv/mvcc.go index bba8d53409352..47300b35f5994 100644 --- a/store/mockstore/unistore/tikv/mvcc.go +++ b/store/mockstore/unistore/tikv/mvcc.go @@ -293,17 +293,23 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi resp.Value = val resp.CommitTs = dbMeta.CommitTS() } - if req.ReturnValues { + if req.ReturnValues || req.CheckExistence { for _, item := range items { if item == nil { - resp.Values = append(resp.Values, nil) + if req.ReturnValues { + resp.Values = append(resp.Values, nil) + } + resp.NotFounds = append(resp.NotFounds, true) continue } val, err1 := item.ValueCopy(nil) if err1 != nil { return nil, err1 } - resp.Values = append(resp.Values, val) + if req.ReturnValues { + resp.Values = append(resp.Values, val) + } + resp.NotFounds = append(resp.NotFounds, len(val) == 0) } } return nil, err @@ -853,6 +859,36 @@ func (store *MVCCStore) buildPrewriteLock(reqCtx *requestCtx, m *kvrpcpb.Mutatio Value: m.Value, Secondaries: req.Secondaries, } + // Note that this is not fully consistent with TiKV. TiKV doesn't always get the value from Write CF. In + // AssertionLevel_Fast, TiKV skips checking assertion if Write CF is not read, in order not to harm the performance. + // However, unistore can always check it. It's better not to assume the store's behavior about assertion when the + // mode is set to AssertionLevel_Fast. + if req.AssertionLevel != kvrpcpb.AssertionLevel_Off { + if item == nil || item.IsEmpty() { + if m.Assertion == kvrpcpb.Assertion_Exist { + log.Error("ASSERTION FAIL!!! non-exist for must exist key", zap.Stringer("mutation", m)) + return nil, &ErrAssertionFailed{ + StartTS: req.StartVersion, + Key: m.Key, + Assertion: m.Assertion, + ExistingStartTS: 0, + ExistingCommitTS: 0, + } + } + } else { + if m.Assertion == kvrpcpb.Assertion_NotExist { + log.Error("ASSERTION FAIL!!! exist for must non-exist key", zap.Stringer("mutation", m)) + userMeta := mvcc.DBUserMeta(item.UserMeta()) + return nil, &ErrAssertionFailed{ + StartTS: req.StartVersion, + Key: m.Key, + Assertion: m.Assertion, + ExistingStartTS: userMeta.StartTS(), + ExistingCommitTS: userMeta.CommitTS(), + } + } + } + } var err error lock.Op = uint8(m.Op) if lock.Op == uint8(kvrpcpb.Op_Insert) { diff --git a/store/mockstore/unistore/tikv/mvcc_test.go b/store/mockstore/unistore/tikv/mvcc_test.go index f9d681511ced4..cb55a5548860f 100644 --- a/store/mockstore/unistore/tikv/mvcc_test.go +++ b/store/mockstore/unistore/tikv/mvcc_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/badger" "github.com/pingcap/badger/y" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/mockstore/unistore/config" @@ -153,32 +154,54 @@ func PessimisticLock(pk []byte, key []byte, startTs uint64, lockTTL uint64, forU // PrewriteOptimistic raises optimistic prewrite requests on store func PrewriteOptimistic(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64, minCommitTs uint64, useAsyncCommit bool, secondaries [][]byte, store *TestStore) error { + return PrewriteOptimisticWithAssertion(pk, key, value, startTs, lockTTL, minCommitTs, useAsyncCommit, secondaries, + kvrpcpb.Assertion_None, kvrpcpb.AssertionLevel_Off, store) +} + +// PrewriteOptimisticWithAssertion raises optimistic prewrite requests on store, with specified assertion config +func PrewriteOptimisticWithAssertion(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64, + minCommitTs uint64, useAsyncCommit bool, secondaries [][]byte, assertion kvrpcpb.Assertion, + assertionLevel kvrpcpb.AssertionLevel, store *TestStore) error { op := kvrpcpb.Op_Put if value == nil { op = kvrpcpb.Op_Del } + mutation := newMutation(op, key, value) + mutation.Assertion = assertion prewriteReq := &kvrpcpb.PrewriteRequest{ - Mutations: []*kvrpcpb.Mutation{newMutation(op, key, value)}, + Mutations: []*kvrpcpb.Mutation{mutation}, PrimaryLock: pk, StartVersion: startTs, LockTtl: lockTTL, MinCommitTs: minCommitTs, UseAsyncCommit: useAsyncCommit, Secondaries: secondaries, + AssertionLevel: assertionLevel, } return store.MvccStore.prewriteOptimistic(store.newReqCtx(), prewriteReq.Mutations, prewriteReq) } -// PrewritePessimistic raises pessmistic prewrite requests +// PrewritePessimistic raises pessimistic prewrite requests func PrewritePessimistic(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64, isPessimisticLock []bool, forUpdateTs uint64, store *TestStore) error { + return PrewritePessimisticWithAssertion(pk, key, value, startTs, lockTTL, isPessimisticLock, forUpdateTs, + kvrpcpb.Assertion_None, kvrpcpb.AssertionLevel_Off, store) +} + +// PrewritePessimisticWithAssertion raises pessimistic prewrite requests, with specified assertion config +func PrewritePessimisticWithAssertion(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64, + isPessimisticLock []bool, forUpdateTs uint64, assertion kvrpcpb.Assertion, assertionLevel kvrpcpb.AssertionLevel, + store *TestStore) error { + mutation := newMutation(kvrpcpb.Op_Put, key, value) + mutation.Assertion = assertion prewriteReq := &kvrpcpb.PrewriteRequest{ - Mutations: []*kvrpcpb.Mutation{newMutation(kvrpcpb.Op_Put, key, value)}, + Mutations: []*kvrpcpb.Mutation{mutation}, PrimaryLock: pk, StartVersion: startTs, LockTtl: lockTTL, IsPessimisticLock: isPessimisticLock, ForUpdateTs: forUpdateTs, + AssertionLevel: assertionLevel, } return store.MvccStore.prewritePessimistic(store.newReqCtx(), prewriteReq.Mutations, prewriteReq) } @@ -1649,3 +1672,104 @@ func TestAccessCommittedLocks(t *testing.T) { } } } + +func TestAssertion(t *testing.T) { + t.Parallel() + + store, close := NewTestStore("TestAssertion", "TestAssertion", t) + defer close() + + // Prepare + MustPrewriteOptimistic([]byte("k1"), []byte("k1"), []byte("v1"), 1, 100, 0, store) + MustPrewriteOptimistic([]byte("k1"), []byte("k2"), []byte("v2"), 1, 100, 0, store) + MustPrewriteOptimistic([]byte("k1"), []byte("k3"), []byte("v3"), 1, 100, 0, store) + MustCommit([]byte("k1"), 1, 2, store) + MustCommit([]byte("k2"), 1, 2, store) + MustCommit([]byte("k3"), 1, 2, store) + + checkAssertionFailedError := func(err error, disable bool, startTs uint64, key []byte, assertion kvrpcpb.Assertion, existingStartTs uint64, existingCommitTs uint64) { + t.Logf("Check error: %+q", err) + if disable { + require.Nil(t, err) + return + } + require.NotNil(t, err) + e, ok := errors.Cause(err).(*ErrAssertionFailed) + require.True(t, ok) + require.Equal(t, startTs, e.StartTS) + require.Equal(t, key, e.Key) + require.Equal(t, assertion, e.Assertion) + require.Equal(t, existingStartTs, e.ExistingStartTS) + require.Equal(t, existingCommitTs, e.ExistingCommitTS) + } + + for _, disable := range []bool{false, true} { + level := kvrpcpb.AssertionLevel_Strict + if disable { + level = kvrpcpb.AssertionLevel_Off + } + // Test with optimistic transaction + err := PrewriteOptimisticWithAssertion([]byte("k1"), []byte("k1"), []byte("v1"), 10, 100, 0, false, nil, + kvrpcpb.Assertion_NotExist, level, store) + checkAssertionFailedError(err, disable, 10, []byte("k1"), kvrpcpb.Assertion_NotExist, 1, 2) + err = PrewriteOptimisticWithAssertion([]byte("k11"), []byte("k11"), []byte("v11"), 10, 100, 0, false, nil, + kvrpcpb.Assertion_Exist, level, store) + checkAssertionFailedError(err, disable, 10, []byte("k11"), kvrpcpb.Assertion_Exist, 0, 0) + + // Test with pessimistic transaction + MustAcquirePessimisticLock([]byte("k2"), []byte("k2"), 10, 10, store) + err = PrewritePessimisticWithAssertion([]byte("k2"), []byte("k2"), []byte("v2"), 10, 100, []bool{true}, 10, + kvrpcpb.Assertion_NotExist, level, store) + checkAssertionFailedError(err, disable, 10, []byte("k2"), kvrpcpb.Assertion_NotExist, 1, 2) + MustAcquirePessimisticLock([]byte("k22"), []byte("k22"), 10, 10, store) + err = PrewritePessimisticWithAssertion([]byte("k22"), []byte("k22"), []byte("v22"), 10, 100, []bool{true}, 10, + kvrpcpb.Assertion_Exist, level, store) + checkAssertionFailedError(err, disable, 10, []byte("k22"), kvrpcpb.Assertion_Exist, 0, 0) + + // Test with pessimistic transaction (non-pessimistic-lock) + err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k3"), []byte("v3"), 10, 100, []bool{false}, 10, + kvrpcpb.Assertion_NotExist, level, store) + checkAssertionFailedError(err, disable, 10, []byte("k3"), kvrpcpb.Assertion_NotExist, 1, 2) + err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k33"), []byte("v33"), 10, 100, []bool{false}, 10, + kvrpcpb.Assertion_Exist, level, store) + checkAssertionFailedError(err, disable, 10, []byte("k33"), kvrpcpb.Assertion_Exist, 0, 0) + } + + for _, k := range [][]byte{ + []byte("k1"), + []byte("k11"), + []byte("k2"), + []byte("k22"), + []byte("k3"), + []byte("k33"), + } { + MustRollbackKey(k, 10, store) + } + + // Test assertion passes + // Test with optimistic transaction + err := PrewriteOptimisticWithAssertion([]byte("k1"), []byte("k1"), []byte("v1"), 20, 100, 0, false, nil, + kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store) + require.Nil(t, err) + err = PrewriteOptimisticWithAssertion([]byte("k11"), []byte("k11"), []byte("v11"), 20, 100, 0, false, nil, + kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store) + require.Nil(t, err) + + // Test with pessimistic transaction + MustAcquirePessimisticLock([]byte("k2"), []byte("k2"), 20, 10, store) + err = PrewritePessimisticWithAssertion([]byte("k2"), []byte("k2"), []byte("v2"), 20, 100, []bool{true}, 10, + kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store) + require.Nil(t, err) + MustAcquirePessimisticLock([]byte("k22"), []byte("k22"), 20, 10, store) + err = PrewritePessimisticWithAssertion([]byte("k22"), []byte("k22"), []byte("v22"), 20, 100, []bool{true}, 10, + kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store) + require.Nil(t, err) + + // Test with pessimistic transaction (non-pessimistic-lock) + err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k3"), []byte("v3"), 20, 100, []bool{false}, 10, + kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store) + require.Nil(t, err) + err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k33"), []byte("v33"), 20, 100, []bool{false}, 10, + kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store) + require.Nil(t, err) +} diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index 940f2770ecf0a..fd99e055d8f55 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -1031,6 +1031,16 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { PrimaryKey: x.PrimaryKey, }, } + case *ErrAssertionFailed: + return &kvrpcpb.KeyError{ + AssertionFailed: &kvrpcpb.AssertionFailed{ + StartTs: x.StartTS, + Key: x.Key, + Assertion: x.Assertion, + ExistingStartTs: x.ExistingStartTS, + ExistingCommitTs: x.ExistingCommitTS, + }, + } default: return &kvrpcpb.KeyError{ Abort: err.Error(),