Skip to content

Commit

Permalink
(6.1) lock_resolver: handle pessimistic locks in BatchResolveLocks (#794
Browse files Browse the repository at this point in the history
) (#859)

* lock_resolver: handle pessimistic locks in BatchResolveLocks (#794)

* lock_resolver: handle pessimistic locks in BatchResolveLocks

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add test but failed on unistore because unistore's ScanLock doesnt return lock type

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Address comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix test

Signed-off-by: zyguan <zhongyangguan@gmail.com>

* Fix golangci

Signed-off-by: zyguan <zhongyangguan@gmail.com>

---------

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: zyguan <zhongyangguan@gmail.com>
Co-authored-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix test not running; run on CI

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

---------

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: zyguan <zhongyangguan@gmail.com>
Co-authored-by: zyguan <zhongyangguan@gmail.com>
Co-authored-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 30, 2023
1 parent fc18f67 commit 01e2395
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 8 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Integration Test

on:
push:
branches: [ master ]
branches: [ master, tidb-6.1 ]
pull_request:
branches: [ master ]
branches: [ master, tidb-6.1 ]

jobs:

Expand Down Expand Up @@ -53,14 +53,14 @@ jobs:
uses: shrink/actions-docker-extract@v1
id: extract-pd
with:
image: pingcap/pd:nightly
image: pingcap/pd:v6.1.6
path: /pd-server

- name: Fetch TiKV
uses: shrink/actions-docker-extract@v1
id: extract-tikv
with:
image: pingcap/tikv:nightly
image: pingcap/tikv:v6.1.6
path: /tikv-server

- name: Run PD & TiKV
Expand Down Expand Up @@ -92,14 +92,14 @@ jobs:
uses: shrink/actions-docker-extract@v1
id: extract-pd
with:
image: pingcap/pd:nightly
image: pingcap/pd:v6.1.6
path: /pd-server

- name: Fetch TiKV
uses: shrink/actions-docker-extract@v1
id: extract-tikv
with:
image: pingcap/tikv:nightly
image: pingcap/tikv:v6.1.6
path: /tikv-server

- name: Run PD & TiKV
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Unit Test

on:
push:
branches: [ master ]
branches: [ master, tidb-6.1 ]
pull_request:
branches: [ master ]
branches: [ master, tidb-6.1 ]

jobs:
test:
Expand Down
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
run:
timeout: 5m
linters:
disable-all: true
enable:
Expand Down
217 changes: 217 additions & 0 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ package tikv_test
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"sync"
"sync/atomic"
"testing"
Expand All @@ -47,8 +50,11 @@ import (
"github.com/pingcap/failpoint"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -57,6 +63,7 @@ import (
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
)

var getMaxBackoff = tikv.ConfigProbe{}.GetGetMaxBackoff()
Expand All @@ -65,6 +72,10 @@ func TestLock(t *testing.T) {
suite.Run(t, new(testLockSuite))
}

func TestLockWithTiKV(t *testing.T) {
suite.Run(t, new(testLockWithTiKVSuite))
}

type testLockSuite struct {
suite.Suite
store tikv.StoreProbe
Expand Down Expand Up @@ -1007,3 +1018,209 @@ func (s *testLockSuite) TestLockWaitTimeLimit() {
s.Nil(txn1.Rollback())
s.Nil(txn2.Rollback())
}

type testLockWithTiKVSuite struct {
suite.Suite
store tikv.StoreProbe
}

func (s *testLockWithTiKVSuite) SetupTest() {
if *withTiKV {
s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())}
} else {
s.store = tikv.StoreProbe{KVStore: NewTestUniStore(s.T())}
}
}

func (s *testLockWithTiKVSuite) TearDownTest() {
s.store.Close()
}

func (s *testLockWithTiKVSuite) trySetTiKVConfig(name string, value interface{}) func() {
stores, err := s.store.GetPDClient().GetAllStores(context.Background())
s.NoError(err)

type configItem struct {
url string
name string
value interface{}
}

var recoverConfigs []configItem

httpScheme := "http"
if c, err := config.GetGlobalConfig().Security.ToTLSConfig(); err == nil && c != nil {
httpScheme = "https"
}

t := s.Suite.T()

setCfg := func(url, name string, value interface{}) error {
postBody, err := json.Marshal(map[string]interface{}{name: value})
if err != nil {
return err
}
resp, err := http.Post(url, "text/json", bytes.NewReader(postBody))
if err != nil {
return err
}
s.NoError(resp.Body.Close())
if resp.StatusCode != 200 {
return errors.Errorf("post config got unexpected status code: %v, request body: %s", resp.StatusCode, postBody)
}
t.Logf("set config for tikv at %s finished: %s", url, string(postBody))
return nil
}

storeIter:
for _, store := range stores {
if store.State != metapb.StoreState_Up {
continue
}
for _, label := range store.Labels {
if label.Key == "engine" && label.Value != "tikv" {
continue storeIter
}
}

err := func() (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.Errorf("set config for store at %v panicked: %v", store.StatusAddress, r)
}
}()

url := fmt.Sprintf("%s://%s/config", httpScheme, store.StatusAddress)
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return errors.Errorf("unexpected response status: %v", resp.Status)
}
oldCfgRaw, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

oldCfg := make(map[string]interface{})
err = json.Unmarshal(oldCfgRaw, &oldCfg)
if err != nil {
return err
}

oldValue := oldCfg["pessimistic-txn"].(map[string]interface{})["in-memory"]
if assert.ObjectsAreEqual(oldValue, value) {
return nil
}

err = setCfg(url, name, value)
if err != nil {
return err
}

recoverConfigs = append(recoverConfigs, configItem{
url: url,
name: name,
value: oldValue,
})

return nil
}()

if err != nil {
t.Logf("failed to set config for store at %s: %v", store.StatusAddress, err)
}
}

// Prevent goleak from complaining about its internal connections.
http.DefaultClient.CloseIdleConnections()

if len(recoverConfigs) > 0 {
// Sleep for a while to ensure the new configs are applied.
time.Sleep(time.Second)
}

return func() {
for _, item := range recoverConfigs {
err = setCfg(item.url, item.name, item.value)
if err != nil {
t.Logf("failed to recover config for store at %s: %v", item.url, err)
}
}

// Prevent goleak from complaining about its internal connections.
http.DefaultClient.CloseIdleConnections()
}
}

func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {
if *withTiKV {
recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", false)
defer recoverFunc()
} else {
s.T().Skip("this test only works with tikv")
}

s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return("skip")`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries"))
s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
}()

k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
v2, v3 := []byte("v2"), []byte("v3")

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))

txn, err := s.store.Begin()
s.NoError(err)
txn.SetPessimistic(true)

{
// Produce write conflict on key k2
txn2, err := s.store.Begin()
s.NoError(err)
s.NoError(txn2.Set(k2, []byte("v0")))
s.NoError(txn2.Commit(ctx))
}

lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now())
err = txn.LockKeys(ctx, lockCtx, k1, k2)
s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err))

// k1 has txn's stale pessimistic lock now.

forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now())
s.NoError(txn.LockKeys(ctx, lockCtx, k2, k3))

s.NoError(txn.Set(k2, v2))
s.NoError(txn.Set(k3, v3))
s.NoError(txn.Commit(ctx))

// k3 has txn's stale prewrite lock now.

// Perform ScanLock - BatchResolveLock.
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
s.NoError(s.store.GCResolveLockPhase(ctx, currentTS, 1))

// Check data consistency
readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
snapshot := s.store.GetSnapshot(readTS)
_, err = snapshot.Get(ctx, k1)
s.Equal(tikverr.ErrNotExist, err)
v, err := snapshot.Get(ctx, k2)
s.NoError(err)
s.Equal(v2, v)
v, err = snapshot.Get(ctx, k3)
s.NoError(err)
s.Equal(v3, v)
}
5 changes: 5 additions & 0 deletions tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) {
s.setSafeTS(storeID, safeTS)
}

// GCResolveLockPhase performs the resolve-locks phase of GC, which scans all locks and resolves them.
func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, concurrency int) error {
return s.resolveLocks(ctx, safepoint, concurrency)
}

// LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose.
type LockResolverProbe struct {
*txnlock.LockResolverProbe
Expand Down
16 changes: 16 additions & 0 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,27 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
txnInfos := make(map[uint64]uint64)
startTime := time.Now()
for _, l := range expiredLocks {
logutil.Logger(bo.GetCtx()).Debug("BatchResolveLocks handling lock", zap.Stringer("lock", l))

if _, ok := txnInfos[l.TxnID]; ok {
continue
}
metrics.LockResolverCountWithExpired.Inc()

if l.LockType == kvrpcpb.Op_PessimisticLock {
// BatchResolveLocks forces resolving the locks ignoring whether whey are expired.
// For pessimistic locks, committing them makes no sense, but it won't affect transaction
// correctness if we always roll back them.
// Pessimistic locks needs special handling logic because their primary may not point
// to the real primary of that transaction, and their state cannot be put in `txnInfos`.
// (see: https://github.com/pingcap/tidb/issues/42937).
err := lr.resolvePessimisticLock(bo, l)
if err != nil {
return false, err
}
continue
}

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
Expand Down

0 comments on commit 01e2395

Please sign in to comment.