Skip to content

Commit

Permalink
txnkv: add new API for lock->put optimization (tikv#754)
Browse files Browse the repository at this point in the history
Signed-off-by: zyguan <zhongyangguan@gmail.com>
  • Loading branch information
zyguan authored Mar 29, 2023
1 parent ba41798 commit fc18f67
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 17 deletions.
146 changes: 145 additions & 1 deletion integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1731,7 +1732,7 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() {

forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
handle := db.IterWithFlags(key, nil).Handle()
mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle)
mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle, nil)
})

forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
Expand Down Expand Up @@ -1763,3 +1764,146 @@ func (s *testCommitterSuite) TestExtractKeyExistsErr() {
s.True(txn.GetMemBuffer().TryLock())
txn.GetMemBuffer().Unlock()
}

func (s *testCommitterSuite) TestSetLockedKeyValue() {
ctx := context.Background()
k1 := []byte("k1")
k2 := []byte("t00000001_i000000001")
v1 := []byte("v1")
v2 := []byte("v2")

mustLockKey := func(txn transaction.TxnProbe, key []byte) {
s.Require().NoError(txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}, key))
}
checkByOpVals := func(opVals ...interface{}) func(m transaction.CommitterMutations) {
s.Require().Equal(0, len(opVals)%2)
return func(m transaction.CommitterMutations) {
s.Require().Equal(m.Len(), len(opVals)/2)
for i := 0; i < len(opVals); i += 2 {
s.Require().Equal(opVals[i], m.GetOp(0))
if opVals[i+1] == nil {
s.Require().Nil(m.GetValue(0))
} else {
s.Require().Equal(opVals[i+1], m.GetValue(0))
}
}
}
}

for _, tt := range []struct {
name string
actions []func(txn transaction.TxnProbe)
checkPessimistic func(m transaction.CommitterMutations)
checkOptimisitc func(m transaction.CommitterMutations)
}{
{
"NoLock",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
},
checkByOpVals(),
checkByOpVals(),
},
{
"LockOnly",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k1) },
},
checkByOpVals(kvrpcpb.Op_Put, v1),
checkByOpVals(kvrpcpb.Op_Lock, nil),
},
{
"LockAndSet",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k1) },
func(txn transaction.TxnProbe) { s.Require().NoError(txn.Set(k1, v2)) },
},
checkByOpVals(kvrpcpb.Op_Put, v2),
checkByOpVals(kvrpcpb.Op_Put, v2),
},
{
"LockAndSetUnnecessaryKeyWithSameValue",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k2, v2) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k2) },
func(txn transaction.TxnProbe) { s.Require().NoError(txn.Set(k2, v2)) },
},
checkByOpVals(kvrpcpb.Op_Put, v2),
checkByOpVals(kvrpcpb.Op_Lock, v2),
},
{
"LockAndSetUnnecessaryKeyWithDiffValue",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k2, v1) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k2) },
func(txn transaction.TxnProbe) { s.Require().NoError(txn.Set(k2, v2)) },
},
checkByOpVals(kvrpcpb.Op_Put, v1),
checkByOpVals(kvrpcpb.Op_Lock, v2),
},
{
"LockAndDelete",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k1) },
func(txn transaction.TxnProbe) { s.Require().NoError(txn.Delete(k1)) },
},
checkByOpVals(kvrpcpb.Op_Del, []byte{}),
checkByOpVals(kvrpcpb.Op_Del, []byte{}),
},
{
"LockAndDeleteYourWrite",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k1) },
func(txn transaction.TxnProbe) {
s.Require().NoError(txn.GetMemBuffer().DeleteWithFlags(k1, kv.SetNewlyInserted))
},
},
checkByOpVals(kvrpcpb.Op_Lock, []byte{}),
checkByOpVals(kvrpcpb.Op_Lock, []byte{}),
},
} {
var testAll func(name string, state []bool, actions []func(txn transaction.TxnProbe))
testAll = func(name string, state []bool, actions []func(txn transaction.TxnProbe)) {
if len(actions) == len(tt.actions) {
s.Run("Pessimistic"+name, func() {
txn := s.begin()
defer txn.Rollback()
txn.SetKVFilter(kvFilter{})
txn.SetPessimistic(true)
for _, action := range actions {
action(txn)
}
c, err := txn.NewCommitter(1)
s.Require().NoError(err)
tt.checkPessimistic(c.GetMutations())
s.Require().NoError(txn.Rollback())
})
s.Run("Optimistic"+name, func() {
txn := s.begin()
defer txn.Rollback()
txn.SetKVFilter(kvFilter{})
for _, action := range actions {
action(txn)
}
c, err := txn.NewCommitter(1)
s.Require().NoError(err)
tt.checkOptimisitc(c.GetMutations())
})
return
}
for i, used := range state {
if used {
continue
}
state[i] = true
testAll(name+"-"+strconv.Itoa(i), state, append(actions, tt.actions[i]))
state[i] = false
}
}
testAll(tt.name, make([]bool, len(tt.actions)), nil)
}
}
69 changes: 54 additions & 15 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ type memBufferMutations struct {
// MSB LSB
// [13 bits: Op][1 bit: assertNotExist][1 bit: assertExist][1 bit: isPessimisticLock]
handles []unionstore.MemKeyHandle
// overlay of mutation values
overlay map[unionstore.MemKeyHandle][]byte
}

func newMemBufferMutations(sizeHint int, storage *unionstore.MemDB) *memBufferMutations {
Expand All @@ -211,7 +213,13 @@ func (m *memBufferMutations) GetKeys() [][]byte {
}

func (m *memBufferMutations) GetValue(i int) []byte {
v, _ := m.storage.GetValueByHandle(m.handles[i])
h := m.handles[i]
if m.overlay != nil {
if v, ok := m.overlay[h]; ok {
return v
}
}
v, _ := m.storage.GetValueByHandle(h)
return v
}

Expand All @@ -235,10 +243,11 @@ func (m *memBufferMutations) Slice(from, to int) CommitterMutations {
return &memBufferMutations{
handles: m.handles[from:to],
storage: m.storage,
overlay: m.overlay,
}
}

func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle) {
func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle, value []byte) {
// See comments of `m.handles` field about the format of the user data `aux`.
aux := uint16(op) << 3
if isPessimisticLock {
Expand All @@ -252,6 +261,18 @@ func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist,
}
handle.UserData = aux
m.handles = append(m.handles, handle)
if len(value) > 0 {
if op != kvrpcpb.Op_Put {
panic("op must be PUT when pushing with value")
}
if !isPessimisticLock {
panic("key must be locked when pushing with value")
}
if m.overlay == nil {
m.overlay = make(map[unionstore.MemKeyHandle][]byte)
}
m.overlay[handle] = value
}
}

// CommitterMutationFlags represents various bit flags of mutations.
Expand Down Expand Up @@ -495,7 +516,7 @@ func (c *twoPhaseCommitter) checkSchemaOnAssertionFail(ctx context.Context, asse
}

func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
var size, putCnt, delCnt, lockCnt, checkCnt int
var size, putCnt, delCnt, lockCnt, checkCnt, putFromLockCnt int

txn := c.txn
memBuf := txn.GetMemBuffer()
Expand All @@ -510,15 +531,25 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
_ = err
key := it.Key()
flags := it.Flags()
var value []byte
var op kvrpcpb.Op
var (
value []byte
cachedValue []byte = nil
op kvrpcpb.Op
)

if !it.HasValue() {
if !flags.HasLocked() {
continue
}
op = kvrpcpb.Op_Lock
lockCnt++
if val, ok := txn.getValueByLockedKey(key); ok && len(val) > 0 && c.isPessimistic {
// Change the LOCK into PUT if the value of this key has a cached value.
cachedValue = val
op = kvrpcpb.Op_Put
putFromLockCnt++
} else {
op = kvrpcpb.Op_Lock
lockCnt++
}
} else {
value = it.Value()
var isUnnecessaryKV bool
Expand All @@ -533,11 +564,18 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
if !flags.HasLocked() {
continue
}
// If the key was locked before, we should prewrite the lock even if
// the KV needn't be committed according to the filter. Otherwise, we
// were forgetting removing pessimistic locks added before.
op = kvrpcpb.Op_Lock
lockCnt++
if val, ok := txn.getValueByLockedKey(key); ok && len(val) > 0 && c.isPessimistic {
// Change the LOCK into PUT if the value of this key has a cached value.
cachedValue = val
op = kvrpcpb.Op_Put
putFromLockCnt++
} else {
// If the key was locked before, we should prewrite the lock even if
// the KV needn't be committed according to the filter. Otherwise, we
// were forgetting removing pessimistic locks added before.
op = kvrpcpb.Op_Lock
lockCnt++
}
} else {
op = kvrpcpb.Op_Put
if flags.HasPresumeKeyNotExists() {
Expand Down Expand Up @@ -583,8 +621,8 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
if c.txn.schemaAmender != nil || c.txn.assertionLevel == kvrpcpb.AssertionLevel_Off {
mustExist, mustNotExist, hasAssertUnknown = false, false, false
}
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle())
size += len(key) + len(value)
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle(), cachedValue)
size += len(key) + len(value) + len(cachedValue)

if c.txn.assertionLevel != kvrpcpb.AssertionLevel_Off {
// Check mutations for pessimistic-locked keys with the read results of pessimistic lock requests.
Expand Down Expand Up @@ -637,6 +675,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
zap.Int("dels", delCnt),
zap.Int("locks", lockCnt),
zap.Int("checks", checkCnt),
zap.Int("putsFromLocks", putFromLockCnt),
zap.Uint64("txnStartTS", txn.startTS))
}

Expand Down Expand Up @@ -1760,7 +1799,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch
return false, err
}
handle := c.txn.GetMemBuffer().IterWithFlags(key, nil).Handle()
c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle)
c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle, nil)
}
}
return false, nil
Expand Down
26 changes: 25 additions & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ type KVTxn struct {
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
mu sync.Mutex // For thread-safe LockKeys function.
mu sync.Mutex // For thread-safe LockKeys, SetLockedKeyValue functions.
setCnt int64
vars *tikv.Variables
committer *twoPhaseCommitter
lockedCnt int
// lockedKV is used to cache kv pairs that have been locked, the 2pc committer will read this map when init
// mutations, convert lock into put if needed.
lockedKVs map[string][]byte

valid bool

Expand Down Expand Up @@ -778,6 +781,27 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
return nil
}

// SetLockedKeyValue caches a key-value pair whose key has been locked. Those key-value pairs may be turned to PUT
// record if possible.
func (txn *KVTxn) SetLockedKeyValue(key []byte, value []byte) {
txn.mu.Lock()
if txn.lockedKVs == nil {
txn.lockedKVs = make(map[string][]byte)
}
txn.lockedKVs[string(key)] = value
txn.mu.Unlock()
}

// getValueByLockedKey returns the cached value of the given locked key.
func (txn *KVTxn) getValueByLockedKey(key []byte) (value []byte, ok bool) {
txn.mu.Lock()
if txn.lockedKVs != nil {
value, ok = txn.lockedKVs[string(key)]
}
txn.mu.Unlock()
return
}

// deduplicateKeys deduplicate the keys, it use sort instead of map to avoid memory allocation.
func deduplicateKeys(keys [][]byte) [][]byte {
sort.Slice(keys, func(i, j int) bool {
Expand Down

0 comments on commit fc18f67

Please sign in to comment.