Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reload region cache when store is resolved from invalid status #843

Merged
merged 14 commits into from
Jul 14, 2023
2 changes: 1 addition & 1 deletion error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ type ErrAssertionFailed struct {
*kvrpcpb.AssertionFailed
}

// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues is not.
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not.
type ErrLockOnlyIfExistsNoReturnValue struct {
StartTS uint64
ForUpdateTs uint64
Expand Down
40 changes: 37 additions & 3 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Region struct {
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload atomic.Bool // the region need to be reloaded in async mode
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -1226,6 +1227,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}, nil
}

func (c *RegionCache) asyncReloadRegion(region *Region) {
if region == nil || !region.asyncReload.CompareAndSwap(false, true) {
// async reload triggered by other thread.
return
}
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be risky to spawn a reload task for each region, especially if many regions become invalid due to the related store being marked as such. To mitigate this risk, we should implement a refreshing strategy in the region cache-related workers and consider incorporating the store's "health checker."

@crazycs520 is now considering refactoring the region cache component, he could give some advice about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the v6.5.x release version, a quick fix may still be needed.

// wait a while for two reasons:
// 1. there may an unavailable duration while recreating the connection.
// 2. the store may just be started, and wait safe ts synced to avoid the
// possible dataIsNotReady error.
time.Sleep(10 * time.Second)
regionID := region.GetID()
if regionID == 0 {
return
}
bo := retry.NewNoopBackoff(context.Background())
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
region.asyncReload.Store(false)
return
}
c.mu.Lock()
c.insertRegionToCache(lr)
c.mu.Unlock()
}()
}

// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
Expand Down Expand Up @@ -1399,8 +1430,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
// If the region info is async reloaded, the old region is still valid.
if !oldRegion.asyncReload.Load() {
you06 marked this conversation as resolved.
Show resolved Hide resolved
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand Down Expand Up @@ -2507,8 +2541,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
}

func (s *Store) getResolveState() resolveState {
var state resolveState
if s == nil {
var state resolveState
return state
}
return resolveState(atomic.LoadUint64(&s.state))
Expand Down
14 changes: 9 additions & 5 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,18 +573,22 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
if state.option.preferLeader {
state.lastIdx = state.leaderIdx
}
offset := rand.Intn(replicaSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under PreferLeader mode, offset should be set to 0 to make the selection starting from leaderIdx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I changed the code and not to use offset in the first selection.

for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % replicaSize)
// If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader
// as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution.
for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ {
idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize)
}
if state.isCandidate(idx, selector.replicas[idx]) {
idx := AccessIndex((int(state.lastIdx) + i + offset) % replicaSize)
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
state.lastIdx = idx
selector.targetIdx = idx
break
}
if selectReplica.isEpochStale() &&
selectReplica.store.getResolveState() == resolved &&
selectReplica.store.getLivenessState() == reachable {
selector.regionCache.asyncReloadRegion(selector.region)
}
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
Expand Down