Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Update unistore for assertion #29902

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f
github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,9 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210819164333-bd5706b9d9f2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f h1:hjInxK1Ie6CYx7Jy2pYnBdEnWI8jIfr423l9Yh6LRy8=
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c h1:4D/M5eYfbswv3vs0ZtbVgNKwSRMXgAcm+9a+IbC7q0o=
github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
15 changes: 15 additions & 0 deletions store/mockstore/unistore/tikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tikv

import (
"encoding/hex"
"fmt"

deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
Expand Down Expand Up @@ -132,3 +133,17 @@ type ErrTxnNotFound struct {
func (e *ErrTxnNotFound) Error() string {
return "txn not found"
}

// ErrAssertionFailed is returned if any assertion fails on a transaction request.
type ErrAssertionFailed struct {
StartTS uint64
Key []byte
Assertion kvrpcpb.Assertion
ExistingStartTS uint64
ExistingCommitTS uint64
}

func (e *ErrAssertionFailed) Error() string {
return fmt.Sprintf("AssertionFailed { StartTS: %v, Key: %v, Assertion: %v, ExistingStartTS: %v, ExistingCommitTS: %v }",
e.StartTS, hex.EncodeToString(e.Key), e.Assertion.String(), e.ExistingStartTS, e.ExistingCommitTS)
}
42 changes: 39 additions & 3 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,17 +293,23 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
resp.Value = val
resp.CommitTs = dbMeta.CommitTS()
}
if req.ReturnValues {
if req.ReturnValues || req.CheckExistence {
for _, item := range items {
if item == nil {
resp.Values = append(resp.Values, nil)
if req.ReturnValues {
resp.Values = append(resp.Values, nil)
}
resp.NotFounds = append(resp.NotFounds, true)
continue
}
val, err1 := item.ValueCopy(nil)
if err1 != nil {
return nil, err1
}
resp.Values = append(resp.Values, val)
if req.ReturnValues {
resp.Values = append(resp.Values, val)
}
resp.NotFounds = append(resp.NotFounds, len(val) == 0)
}
}
return nil, err
Expand Down Expand Up @@ -853,6 +859,36 @@ func (store *MVCCStore) buildPrewriteLock(reqCtx *requestCtx, m *kvrpcpb.Mutatio
Value: m.Value,
Secondaries: req.Secondaries,
}
// Note that this is not fully consistent with TiKV. TiKV doesn't always get the value from Write CF. In
// AssertionLevel_Fast, TiKV skips checking assertion if Write CF is not read, in order not to harm the performance.
// However, unistore can always check it. It's better not to assume the store's behavior about assertion when the
// mode is set to AssertionLevel_Fast.
if req.AssertionLevel != kvrpcpb.AssertionLevel_Off {
if item == nil || item.IsEmpty() {
if m.Assertion == kvrpcpb.Assertion_Exist {
log.Error("ASSERTION FAIL!!! non-exist for must exist key", zap.Stringer("mutation", m))
return nil, &ErrAssertionFailed{
StartTS: req.StartVersion,
Key: m.Key,
Assertion: m.Assertion,
ExistingStartTS: 0,
ExistingCommitTS: 0,
}
}
} else {
if m.Assertion == kvrpcpb.Assertion_NotExist {
log.Error("ASSERTION FAIL!!! exist for must non-exist key", zap.Stringer("mutation", m))
userMeta := mvcc.DBUserMeta(item.UserMeta())
return nil, &ErrAssertionFailed{
StartTS: req.StartVersion,
Key: m.Key,
Assertion: m.Assertion,
ExistingStartTS: userMeta.StartTS(),
ExistingCommitTS: userMeta.CommitTS(),
}
}
}
}
var err error
lock.Op = uint8(m.Op)
if lock.Op == uint8(kvrpcpb.Op_Insert) {
Expand Down
130 changes: 127 additions & 3 deletions store/mockstore/unistore/tikv/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/badger"
"github.com/pingcap/badger/y"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/store/mockstore/unistore/config"
Expand Down Expand Up @@ -153,32 +154,54 @@ func PessimisticLock(pk []byte, key []byte, startTs uint64, lockTTL uint64, forU
// PrewriteOptimistic raises optimistic prewrite requests on store
func PrewriteOptimistic(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64,
minCommitTs uint64, useAsyncCommit bool, secondaries [][]byte, store *TestStore) error {
return PrewriteOptimisticWithAssertion(pk, key, value, startTs, lockTTL, minCommitTs, useAsyncCommit, secondaries,
kvrpcpb.Assertion_None, kvrpcpb.AssertionLevel_Off, store)
}

// PrewriteOptimisticWithAssertion raises optimistic prewrite requests on store, with specified assertion config
func PrewriteOptimisticWithAssertion(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64,
minCommitTs uint64, useAsyncCommit bool, secondaries [][]byte, assertion kvrpcpb.Assertion,
assertionLevel kvrpcpb.AssertionLevel, store *TestStore) error {
op := kvrpcpb.Op_Put
if value == nil {
op = kvrpcpb.Op_Del
}
mutation := newMutation(op, key, value)
mutation.Assertion = assertion
prewriteReq := &kvrpcpb.PrewriteRequest{
Mutations: []*kvrpcpb.Mutation{newMutation(op, key, value)},
Mutations: []*kvrpcpb.Mutation{mutation},
PrimaryLock: pk,
StartVersion: startTs,
LockTtl: lockTTL,
MinCommitTs: minCommitTs,
UseAsyncCommit: useAsyncCommit,
Secondaries: secondaries,
AssertionLevel: assertionLevel,
}
return store.MvccStore.prewriteOptimistic(store.newReqCtx(), prewriteReq.Mutations, prewriteReq)
}

// PrewritePessimistic raises pessmistic prewrite requests
// PrewritePessimistic raises pessimistic prewrite requests
func PrewritePessimistic(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64,
isPessimisticLock []bool, forUpdateTs uint64, store *TestStore) error {
return PrewritePessimisticWithAssertion(pk, key, value, startTs, lockTTL, isPessimisticLock, forUpdateTs,
kvrpcpb.Assertion_None, kvrpcpb.AssertionLevel_Off, store)
}

// PrewritePessimisticWithAssertion raises pessimistic prewrite requests, with specified assertion config
func PrewritePessimisticWithAssertion(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64,
isPessimisticLock []bool, forUpdateTs uint64, assertion kvrpcpb.Assertion, assertionLevel kvrpcpb.AssertionLevel,
store *TestStore) error {
mutation := newMutation(kvrpcpb.Op_Put, key, value)
mutation.Assertion = assertion
prewriteReq := &kvrpcpb.PrewriteRequest{
Mutations: []*kvrpcpb.Mutation{newMutation(kvrpcpb.Op_Put, key, value)},
Mutations: []*kvrpcpb.Mutation{mutation},
PrimaryLock: pk,
StartVersion: startTs,
LockTtl: lockTTL,
IsPessimisticLock: isPessimisticLock,
ForUpdateTs: forUpdateTs,
AssertionLevel: assertionLevel,
}
return store.MvccStore.prewritePessimistic(store.newReqCtx(), prewriteReq.Mutations, prewriteReq)
}
Expand Down Expand Up @@ -1649,3 +1672,104 @@ func TestAccessCommittedLocks(t *testing.T) {
}
}
}

func TestAssertion(t *testing.T) {
t.Parallel()

store, close := NewTestStore("TestAssertion", "TestAssertion", t)
defer close()

// Prepare
MustPrewriteOptimistic([]byte("k1"), []byte("k1"), []byte("v1"), 1, 100, 0, store)
MustPrewriteOptimistic([]byte("k1"), []byte("k2"), []byte("v2"), 1, 100, 0, store)
MustPrewriteOptimistic([]byte("k1"), []byte("k3"), []byte("v3"), 1, 100, 0, store)
MustCommit([]byte("k1"), 1, 2, store)
MustCommit([]byte("k2"), 1, 2, store)
MustCommit([]byte("k3"), 1, 2, store)

checkAssertionFailedError := func(err error, disable bool, startTs uint64, key []byte, assertion kvrpcpb.Assertion, existingStartTs uint64, existingCommitTs uint64) {
t.Logf("Check error: %+q", err)
if disable {
require.Nil(t, err)
return
}
require.NotNil(t, err)
e, ok := errors.Cause(err).(*ErrAssertionFailed)
require.True(t, ok)
require.Equal(t, startTs, e.StartTS)
require.Equal(t, key, e.Key)
require.Equal(t, assertion, e.Assertion)
require.Equal(t, existingStartTs, e.ExistingStartTS)
require.Equal(t, existingCommitTs, e.ExistingCommitTS)
}

for _, disable := range []bool{false, true} {
level := kvrpcpb.AssertionLevel_Strict
if disable {
level = kvrpcpb.AssertionLevel_Off
}
// Test with optimistic transaction
err := PrewriteOptimisticWithAssertion([]byte("k1"), []byte("k1"), []byte("v1"), 10, 100, 0, false, nil,
kvrpcpb.Assertion_NotExist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k1"), kvrpcpb.Assertion_NotExist, 1, 2)
err = PrewriteOptimisticWithAssertion([]byte("k11"), []byte("k11"), []byte("v11"), 10, 100, 0, false, nil,
kvrpcpb.Assertion_Exist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k11"), kvrpcpb.Assertion_Exist, 0, 0)

// Test with pessimistic transaction
MustAcquirePessimisticLock([]byte("k2"), []byte("k2"), 10, 10, store)
err = PrewritePessimisticWithAssertion([]byte("k2"), []byte("k2"), []byte("v2"), 10, 100, []bool{true}, 10,
kvrpcpb.Assertion_NotExist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k2"), kvrpcpb.Assertion_NotExist, 1, 2)
MustAcquirePessimisticLock([]byte("k22"), []byte("k22"), 10, 10, store)
err = PrewritePessimisticWithAssertion([]byte("k22"), []byte("k22"), []byte("v22"), 10, 100, []bool{true}, 10,
kvrpcpb.Assertion_Exist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k22"), kvrpcpb.Assertion_Exist, 0, 0)

// Test with pessimistic transaction (non-pessimistic-lock)
err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k3"), []byte("v3"), 10, 100, []bool{false}, 10,
kvrpcpb.Assertion_NotExist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k3"), kvrpcpb.Assertion_NotExist, 1, 2)
err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k33"), []byte("v33"), 10, 100, []bool{false}, 10,
kvrpcpb.Assertion_Exist, level, store)
checkAssertionFailedError(err, disable, 10, []byte("k33"), kvrpcpb.Assertion_Exist, 0, 0)
}

for _, k := range [][]byte{
[]byte("k1"),
[]byte("k11"),
[]byte("k2"),
[]byte("k22"),
[]byte("k3"),
[]byte("k33"),
} {
MustRollbackKey(k, 10, store)
}

// Test assertion passes
// Test with optimistic transaction
err := PrewriteOptimisticWithAssertion([]byte("k1"), []byte("k1"), []byte("v1"), 20, 100, 0, false, nil,
kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)
err = PrewriteOptimisticWithAssertion([]byte("k11"), []byte("k11"), []byte("v11"), 20, 100, 0, false, nil,
kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)

// Test with pessimistic transaction
MustAcquirePessimisticLock([]byte("k2"), []byte("k2"), 20, 10, store)
err = PrewritePessimisticWithAssertion([]byte("k2"), []byte("k2"), []byte("v2"), 20, 100, []bool{true}, 10,
kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)
MustAcquirePessimisticLock([]byte("k22"), []byte("k22"), 20, 10, store)
err = PrewritePessimisticWithAssertion([]byte("k22"), []byte("k22"), []byte("v22"), 20, 100, []bool{true}, 10,
kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)

// Test with pessimistic transaction (non-pessimistic-lock)
err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k3"), []byte("v3"), 20, 100, []bool{false}, 10,
kvrpcpb.Assertion_Exist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)
err = PrewritePessimisticWithAssertion([]byte("pk"), []byte("k33"), []byte("v33"), 20, 100, []bool{false}, 10,
kvrpcpb.Assertion_NotExist, kvrpcpb.AssertionLevel_Strict, store)
require.Nil(t, err)
}
10 changes: 10 additions & 0 deletions store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,16 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
PrimaryKey: x.PrimaryKey,
},
}
case *ErrAssertionFailed:
return &kvrpcpb.KeyError{
AssertionFailed: &kvrpcpb.AssertionFailed{
StartTs: x.StartTS,
Key: x.Key,
Assertion: x.Assertion,
ExistingStartTs: x.ExistingStartTS,
ExistingCommitTs: x.ExistingCommitTS,
},
}
default:
return &kvrpcpb.KeyError{
Abort: err.Error(),
Expand Down