Skip to content

Commit

Permalink
reload region cache when store is resolved from invalid status (#843) (
Browse files Browse the repository at this point in the history
#846)

* reload region cache when store is resolved from invalid

Signed-off-by: you06 <you1474600@gmail.com>

* reload region once

Signed-off-by: you06 <you1474600@gmail.com>

* build in go1.18

Signed-off-by: you06 <you1474600@gmail.com>

* build in go1.18

Signed-off-by: you06 <you1474600@gmail.com>

* handle region reload in resolve goroutine

Signed-off-by: you06 <you1474600@gmail.com>

* retest

Signed-off-by: you06 <you1474600@gmail.com>

* fix data race (#736)

Signed-off-by: Smityz <smityz@qq.com>
Co-authored-by: disksing <i@disksing.com>
Signed-off-by: you06 <you1474600@gmail.com>

* build with go 1.18

Signed-off-by: you06 <you1474600@gmail.com>

* fix integration test (#673)

Signed-off-by: disksing <i@disksing.com>
Signed-off-by: you06 <you1474600@gmail.com>

* Update internal/locate/region_cache.go

Co-authored-by: crazycs <crazycs520@gmail.com>
Signed-off-by: you06 <you1474600@gmail.com>

* address comment

Signed-off-by: you06 <you1474600@gmail.com>

* address comment

Signed-off-by: you06 <you1474600@gmail.com>

---------

Signed-off-by: you06 <you1474600@gmail.com>
Signed-off-by: Smityz <smityz@qq.com>
Signed-off-by: disksing <i@disksing.com>
Co-authored-by: Smilencer <smityz@qq.com>
Co-authored-by: disksing <i@disksing.com>
Co-authored-by: crazycs <crazycs520@gmail.com>
  • Loading branch information
4 people authored Jun 28, 2023
1 parent 5fde309 commit b113831
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 41 deletions.
2 changes: 1 addition & 1 deletion error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ type ErrAssertionFailed struct {
*kvrpcpb.AssertionFailed
}

// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues`` is not.
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not.
type ErrLockOnlyIfExistsNoReturnValue struct {
StartTS uint64
ForUpdateTs uint64
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ type checkRequestClient struct {

func (c *checkRequestClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
resp, err := c.Client.SendRequest(ctx, addr, req, timeout)
if err != nil {
return resp, err
}
if c.priority != req.Priority {
if resp.Resp != nil {
if getResp, ok := resp.Resp.(*kvrpcpb.GetResponse); ok {
Expand Down
97 changes: 81 additions & 16 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type Region struct {
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload int32 // the region need to be reloaded in async mode
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -363,6 +364,8 @@ func (r *Region) isValid() bool {
return r != nil && !r.checkNeedReload() && r.checkRegionCacheTTL(time.Now().Unix())
}

type livenessFunc func(s *Store, bo *retry.Backoffer) livenessState

// RegionCache caches Regions loaded from PD.
// All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing
// purposes only.
Expand Down Expand Up @@ -395,7 +398,12 @@ type RegionCache struct {
testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
// requestLiveness always returns unreachable.
mockRequestLiveness func(s *Store, bo *retry.Backoffer) livenessState
mockRequestLiveness atomic.Value
}

regionsNeedReload struct {
sync.Mutex
regions []uint64
}
}

Expand Down Expand Up @@ -447,8 +455,13 @@ func (c *RegionCache) Close() {
// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
reloadRegionTicker := time.NewTicker(10 * time.Second)
defer func() {
ticker.Stop()
reloadRegionTicker.Stop()
}()
var needCheckStores []*Store
reloadNextLoop := make(map[uint64]struct{})
for {
needCheckStores = needCheckStores[:0]
select {
Expand All @@ -466,6 +479,22 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})

case <-reloadRegionTicker.C:
for regionID := range reloadNextLoop {
c.reloadRegion(regionID)
delete(reloadNextLoop, regionID)
}
c.regionsNeedReload.Lock()
for _, regionID := range c.regionsNeedReload.regions {
// will reload in next tick, wait a while for two reasons:
// 1. there may an unavailable duration while recreating the connection.
// 2. the store may just be started, and wait safe ts synced to avoid the
// possible dataIsNotReady error.
reloadNextLoop[regionID] = struct{}{}
}
c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0]
c.regionsNeedReload.Unlock()
}
}
}
Expand Down Expand Up @@ -967,7 +996,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
} else if r.checkNeedReloadAndMarkUpdated() {
// load region when it be marked as need reload.
Expand All @@ -980,7 +1009,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand Down Expand Up @@ -1113,7 +1142,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
} else {
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand All @@ -1132,7 +1161,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}

c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
return &KeyLocation{
Region: r.VerID(),
Expand All @@ -1142,6 +1171,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}, nil
}

func (c *RegionCache) scheduleReloadRegion(region *Region) {
if region == nil || !atomic.CompareAndSwapInt32(&region.asyncReload, 0, 1) {
// async reload triggered by other thread.
return
}
regionID := region.GetID()
if regionID > 0 {
c.regionsNeedReload.Lock()
c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID)
c.regionsNeedReload.Unlock()
}
}

func (c *RegionCache) reloadRegion(regionID uint64) {
bo := retry.NewNoopBackoff(c.ctx)
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
atomic.StoreInt32(&oldRegion.asyncReload, 0)
}
return
}
c.mu.Lock()
c.insertRegionToCache(lr, false)
c.mu.Unlock()
}

// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
Expand Down Expand Up @@ -1226,7 +1285,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
for _, region := range regions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}

return
Expand Down Expand Up @@ -1300,7 +1359,7 @@ func (c *RegionCache) removeVersionFromCache(oldVer RegionVerID, regionID uint64

// insertRegionToCache tries to insert the Region to cache.
// It should be protected by c.mu.Lock().
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
oldRegion := c.mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
store := cachedRegion.getStore()
Expand All @@ -1315,8 +1374,11 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
// If the region info is async reloaded, the old region is still valid.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand Down Expand Up @@ -1804,7 +1866,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext

c.mu.Lock()
for _, region := range newRegions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}
c.mu.Unlock()

Expand Down Expand Up @@ -1918,7 +1980,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
return
}
c.mu.Lock()
c.insertRegionToCache(new)
c.insertRegionToCache(new, true)
c.mu.Unlock()
}()
}
Expand Down Expand Up @@ -2371,9 +2433,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
}

func (s *Store) getResolveState() resolveState {
var state resolveState
if s == nil {
return state
return unresolved
}
return resolveState(atomic.LoadUint64(&s.state))
}
Expand Down Expand Up @@ -2544,8 +2605,12 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness
return unknown
}
}
if c != nil && c.testingKnobs.mockRequestLiveness != nil {
return c.testingKnobs.mockRequestLiveness(s, bo)

if c != nil {
lf := c.testingKnobs.mockRequestLiveness.Load()
if lf != nil {
return (*lf.(*livenessFunc))(s, bo)
}
}

if storeLivenessTimeout == 0 {
Expand Down
14 changes: 7 additions & 7 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
region := createSampleRegion([]byte("k1"), []byte("k2"))
region.meta.Id = 1
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)

r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
filterUnavailablePeers(cpRegion)
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1428,12 +1428,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
fakeRegion.setStore(cachedRegion.getStore().clone())
// no buckets
fakeRegion.getStore().buckets = nil
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// stale buckets
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// new buckets
Expand All @@ -1443,7 +1443,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
Keys: buckets.Keys,
}
fakeRegion.getStore().buckets = newBuckets
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(newBuckets, cachedRegion.getStore().buckets)

Expand Down Expand Up @@ -1576,7 +1576,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand All @@ -1587,7 +1587,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand Down
12 changes: 11 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,23 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
state.lastIdx++
}

reloadRegion := false
for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if state.isCandidate(idx, selector.replicas[idx]) {
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
state.lastIdx = idx
selector.targetIdx = idx
break
}
if selectReplica.isEpochStale() &&
selectReplica.store.getResolveState() == resolved &&
selectReplica.store.getLivenessState() == reachable {
reloadRegion = true
}
}
if reloadRegion {
selector.regionCache.scheduleReloadRegion(selector.region)
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
Expand Down
Loading

0 comments on commit b113831

Please sign in to comment.