Skip to content

Commit

Permalink
Cherry pick some bug fixes to the tidb-6.5 branch (tikv#681)
Browse files Browse the repository at this point in the history
* RuntimeStat: Clone `SnapshotRuntimeStats` Completely (tikv#641)

* add more info for cloning SnapshotRuntimeStats

Signed-off-by: TonsnakeLin <lpbgytong@163.com>

* add more info for cloning SnapshotRuntimeStats

Signed-off-by: TonsnakeLin <lpbgytong@163.com>

* shallow copy SnapshotRuntimeStats

Signed-off-by: TonsnakeLin <lpbgytong@163.com>

Signed-off-by: TonsnakeLin <lpbgytong@163.com>
Co-authored-by: Yilin Chen <sticnarf@gmail.com>

* fix data race in the LockKeys (tikv#655)

Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>

* Add a background region cache GC goroutine (tikv#664)

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* Rewind after cacheGC reaches the end (tikv#678)

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

---------

Signed-off-by: TonsnakeLin <lpbgytong@163.com>
Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
Co-authored-by: TonsnakeLin <87681388+TonsnakeLin@users.noreply.github.com>
Co-authored-by: Weizhen Wang <wangweizhen@pingcap.com>
  • Loading branch information
3 people authored Jan 31, 2023
1 parent 6d5cb2d commit cd83d15
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 1 deletion.
62 changes: 62 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool {
return atomic.CompareAndSwapPointer(&r.store, unsafe.Pointer(oldStore), unsafe.Pointer(newStore))
}

func (r *Region) isCacheTTLExpired(ts int64) bool {
lastAccess := atomic.LoadInt64(&r.lastAccess)
return ts-lastAccess > regionCacheTTLSec
}

func (r *Region) checkRegionCacheTTL(ts int64) bool {
// Only consider use percentage on this failpoint, for example, "2%return"
if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil {
Expand Down Expand Up @@ -417,6 +422,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
interval := config.GetGlobalConfig().StoresRefreshInterval
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
go c.cacheGC()
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
return c
}
Expand Down Expand Up @@ -1906,6 +1912,62 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
}
}

const cleanCacheInterval = time.Second
const cleanRegionNumPerRound = 50

// This function is expected to run in a background goroutine.
// It keeps iterating over the whole region cache, searching for stale region
// info. It runs at cleanCacheInterval and checks only cleanRegionNumPerRound
// regions. In this way, the impact of this background goroutine should be
// negligible.
func (c *RegionCache) cacheGC() {
ticker := time.NewTicker(cleanCacheInterval)
defer ticker.Stop()

beginning := newBtreeSearchItem([]byte(""))
iterItem := beginning
expired := make([]*btreeItem, cleanRegionNumPerRound)
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
count := 0
expired = expired[:0]

// Only RLock when checking TTL to avoid blocking other readers
c.mu.RLock()
ts := time.Now().Unix()
c.mu.sorted.b.AscendGreaterOrEqual(iterItem, func(item *btreeItem) bool {
if count > cleanRegionNumPerRound {
iterItem = item
return false
}
count++
if item.cachedRegion.isCacheTTLExpired(ts) {
expired = append(expired, item)
}
return true
})
c.mu.RUnlock()

// Reach the end of the region cache, start from the beginning
if count <= cleanRegionNumPerRound {
iterItem = beginning
}

if len(expired) > 0 {
c.mu.Lock()
for _, item := range expired {
c.mu.sorted.b.Delete(item)
c.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID())
}
c.mu.Unlock()
}
}
}
}

// btreeItem is BTree's Item that uses []byte to compare.
type btreeItem struct {
key []byte
Expand Down
57 changes: 57 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"fmt"
"math/rand"
"reflect"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1608,3 +1609,59 @@ func (s *testRegionCacheSuite) TestShouldNotRetryFlashback() {
s.Error(err)
s.False(shouldRetry)
}

func (s *testRegionCacheSuite) TestBackgroundCacheGC() {
// Prepare 100 regions
regionCnt := 100
regions := s.cluster.AllocIDs(regionCnt)
regions = append([]uint64{s.region1}, regions...)
peers := [][]uint64{{s.peer1, s.peer2}}
for i := 0; i < regionCnt; i++ {
peers = append(peers, s.cluster.AllocIDs(2))
}
for i := 0; i < regionCnt; i++ {
s.cluster.Split(regions[i], regions[i+1], []byte(fmt.Sprintf(regionSplitKeyFormat, i)), peers[i+1], peers[i+1][0])
}
loadRegionsToCache(s.cache, regionCnt)
s.checkCache(regionCnt)

// Make parts of the regions stale
remaining := 0
s.cache.mu.Lock()
now := time.Now().Unix()
for verID, r := range s.cache.mu.regions {
if verID.id%3 == 0 {
atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10)
} else {
remaining++
}
}
s.cache.mu.Unlock()

s.Eventually(func() bool {
s.cache.mu.RLock()
defer s.cache.mu.RUnlock()
return len(s.cache.mu.regions) == remaining
}, 3*time.Second, 200*time.Millisecond)
s.checkCache(remaining)

// Make another part of the regions stale
remaining = 0
s.cache.mu.Lock()
now = time.Now().Unix()
for verID, r := range s.cache.mu.regions {
if verID.id%3 == 1 {
atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10)
} else {
remaining++
}
}
s.cache.mu.Unlock()

s.Eventually(func() bool {
s.cache.mu.RLock()
defer s.cache.mu.RUnlock()
return len(s.cache.mu.regions) == remaining
}, 3*time.Second, 200*time.Millisecond)
s.checkCache(remaining)
}
4 changes: 3 additions & 1 deletion internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {

cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region)
cache.mu.Unlock()

// Verify creating the replicaSelector.
replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req)
Expand Down Expand Up @@ -537,7 +539,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)

// Test accessFollower state filtering epoch-stale stores.
region.lastAccess = time.Now().Unix()
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
refreshEpochs(regionStore)
// Mark all followers as stale.
tiKVNum := regionStore.accessStoreNum(tiKVOnly)
Expand Down
16 changes: 16 additions & 0 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,17 @@ func (txn *KVTxn) filterAggressiveLockedKeys(lockCtx *tikv.LockCtx, allKeys [][]
// LockKeys tries to lock the entries with the keys in KV store.
// lockCtx is the context for lock, lockCtx.lockWaitTime in ms
func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error {
return txn.lockKeys(ctx, lockCtx, nil, keysInput...)
}

// LockKeysFunc tries to lock the entries with the keys in KV store.
// lockCtx is the context for lock, lockCtx.lockWaitTime in ms
// fn is a function which run before the lock is released.
func (txn *KVTxn) LockKeysFunc(ctx context.Context, lockCtx *tikv.LockCtx, fn func(), keysInput ...[]byte) error {
return txn.lockKeys(ctx, lockCtx, fn, keysInput...)
}

func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func(), keysInput ...[]byte) error {
if txn.interceptor != nil {
// User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we
// need to bind it to ctx so that the internal client can perceive and execute
Expand Down Expand Up @@ -869,6 +880,11 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
}
}()
defer func() {
if fn != nil {
fn()
}
}()

if !txn.IsPessimistic() && txn.aggressiveLockingContext != nil {
return errors.New("trying to perform aggressive locking in optimistic transaction")
Expand Down
13 changes: 13 additions & 0 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,19 @@ func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats {
newRs.backoffTimes[k] += v
}
}

if rs.scanDetail != nil {
newRs.scanDetail = rs.scanDetail
}

if rs.timeDetail != nil {
newRs.timeDetail = rs.timeDetail
}

if rs.resolveLockDetail != nil {
newRs.resolveLockDetail = rs.resolveLockDetail
}

return &newRs
}

Expand Down

0 comments on commit cd83d15

Please sign in to comment.