From bb30b6172e62af6de79990ac31118b26ccc0bd69 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 15 Jun 2023 21:01:35 +0800 Subject: [PATCH 1/9] reload region cache when store is resolved from invalid Signed-off-by: you06 --- error/error.go | 2 +- internal/locate/region_cache.go | 40 ++++++++++++++++++++++++++++--- internal/locate/region_request.go | 16 +++++++------ 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/error/error.go b/error/error.go index 761ebef6d..62a75ff65 100644 --- a/error/error.go +++ b/error/error.go @@ -246,7 +246,7 @@ type ErrAssertionFailed struct { *kvrpcpb.AssertionFailed } -// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues“ is not. +// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not. type ErrLockOnlyIfExistsNoReturnValue struct { StartTS uint64 ForUpdateTs uint64 diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 59c413ecb..5ad23fe68 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -153,6 +153,7 @@ type Region struct { syncFlag int32 // region need be sync in next turn lastAccess int64 // last region access time, see checkRegionCacheTTL invalidReason InvalidReason // the reason why the region is invalidated + asyncReload atomic.Bool // the region need to be reloaded in async mode } // AccessIndex represent the index for accessIndex array @@ -1226,6 +1227,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K }, nil } +func (c *RegionCache) asyncReloadRegion(region *Region) { + if region == nil || !region.asyncReload.CompareAndSwap(false, true) { + // async reload triggered by other thread. + return + } + go func() { + // wait a while for two reasons: + // 1. there may an unavailable duration while recreating the connection. + // 2. the store may just be started, and wait safe ts synced to avoid the + // possible dataIsNotReady error. + time.Sleep(10 * time.Second) + regionID := region.GetID() + if regionID == 0 { + return + } + bo := retry.NewNoopBackoff(context.Background()) + lr, err := c.loadRegionByID(bo, regionID) + if err != nil { + // ignore error and use old region info. + logutil.Logger(bo.GetCtx()).Error("load region failure", + zap.Uint64("regionID", regionID), zap.Error(err)) + region.asyncReload.Store(false) + return + } + c.mu.Lock() + c.insertRegionToCache(lr) + c.mu.Unlock() + }() +} + // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. @@ -1399,8 +1430,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader { store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly)) } - // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. - oldRegion.invalidate(Other) + // If the region info is async reloaded, the old region is still valid. + if !oldRegion.asyncReload.Load() { + // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. + oldRegion.invalidate(Other) + } // Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which // is under transferring regions. store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load()) @@ -2507,8 +2541,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { } func (s *Store) getResolveState() resolveState { - var state resolveState if s == nil { + var state resolveState return state } return resolveState(atomic.LoadUint64(&s.state)) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c877dfadc..c745bda93 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -573,18 +573,20 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if state.option.preferLeader { state.lastIdx = state.leaderIdx } + offset := rand.Intn(len(selector.replicas)) for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - idx := AccessIndex((int(state.lastIdx) + i) % replicaSize) - // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader - // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. - for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ { - idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize) - } - if state.isCandidate(idx, selector.replicas[idx]) { + idx := AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) + selectReplica := selector.replicas[idx] + if state.isCandidate(idx, selectReplica) { state.lastIdx = idx selector.targetIdx = idx break } + if selectReplica.isEpochStale() && + selectReplica.store.getResolveState() == resolved && + selectReplica.store.getLivenessState() == reachable { + selector.regionCache.asyncReloadRegion(selector.region) + } } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { From 5119e70d80330202e78c381a7ad7d52fcdeef9a3 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 19 Jun 2023 11:49:22 +0800 Subject: [PATCH 2/9] refine code Signed-off-by: you06 --- internal/locate/region_request.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c745bda93..a94da298f 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -573,9 +573,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if state.option.preferLeader { state.lastIdx = state.leaderIdx } - offset := rand.Intn(len(selector.replicas)) for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - idx := AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) + var idx AccessIndex + if i == 0 { + idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) + } else { + // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader + // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. + idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize) + } selectReplica := selector.replicas[idx] if state.isCandidate(idx, selectReplica) { state.lastIdx = idx From 9f798e2c61043a11b5a71f4afa464b2ed3cc12a4 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 19 Jun 2023 12:01:42 +0800 Subject: [PATCH 3/9] use offset to select replica randomly Signed-off-by: you06 --- internal/locate/region_request.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index a94da298f..97719fc2b 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -573,15 +573,11 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if state.option.preferLeader { state.lastIdx = state.leaderIdx } + offset := rand.Intn(replicaSize) for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - var idx AccessIndex - if i == 0 { - idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) - } else { - // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader - // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. - idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize) - } + // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader + // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. + idx := AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) selectReplica := selector.replicas[idx] if state.isCandidate(idx, selectReplica) { state.lastIdx = idx From ba45117b246e96bbab57fd77edff544593d9210a Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 19 Jun 2023 12:46:13 +0800 Subject: [PATCH 4/9] choose leader firstly for prefer leader Signed-off-by: you06 --- internal/locate/region_request.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 97719fc2b..ea00e0992 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -574,10 +574,25 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector state.lastIdx = state.leaderIdx } offset := rand.Intn(replicaSize) + reloadRegion := false for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader - // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. - idx := AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) + var idx AccessIndex + if i == 0 { + if state.option.preferLeader { + // In prefer leader mode, the selector choose leader firstly, then randomly choose other followers. + idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) + } else { + idx = AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) + } + } else { + if state.option.preferLeader && (i+offset)%replicaSize == 0 { + // skip leader because prefer leader already tried it. + offset++ + } + // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader + // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. + idx = AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) + } selectReplica := selector.replicas[idx] if state.isCandidate(idx, selectReplica) { state.lastIdx = idx @@ -587,9 +602,12 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if selectReplica.isEpochStale() && selectReplica.store.getResolveState() == resolved && selectReplica.store.getLivenessState() == reachable { - selector.regionCache.asyncReloadRegion(selector.region) + reloadRegion = true } } + if reloadRegion { + selector.regionCache.asyncReloadRegion(selector.region) + } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { if len(state.option.labels) > 0 { From deb5a8862421ebf1d63a0a0c7dfb6645fcdaec64 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 19 Jun 2023 13:07:21 +0800 Subject: [PATCH 5/9] refine code Signed-off-by: you06 --- internal/locate/region_request.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index ea00e0992..9725d5205 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -577,18 +577,18 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector reloadRegion := false for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { var idx AccessIndex - if i == 0 { - if state.option.preferLeader { + if state.option.preferLeader { + if i == 0 { // In prefer leader mode, the selector choose leader firstly, then randomly choose other followers. idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) } else { + if state.option.preferLeader && (i+offset)%replicaSize == 0 { + // skip leader because prefer leader already tried it. + offset++ + } idx = AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) } } else { - if state.option.preferLeader && (i+offset)%replicaSize == 0 { - // skip leader because prefer leader already tried it. - offset++ - } // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. idx = AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) From 696a06a494df3f021c8cc2494e6d0cebff6951bc Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 19 Jun 2023 16:07:49 +0800 Subject: [PATCH 6/9] simplify code Signed-off-by: you06 --- internal/locate/region_request.go | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9725d5205..7bbd00816 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -573,24 +573,20 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if state.option.preferLeader { state.lastIdx = state.leaderIdx } - offset := rand.Intn(replicaSize) + var offset int + if state.lastIdx >= 0 { + offset = rand.Intn(replicaSize) + } reloadRegion := false for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { var idx AccessIndex - if state.option.preferLeader { - if i == 0 { - // In prefer leader mode, the selector choose leader firstly, then randomly choose other followers. - idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) - } else { - if state.option.preferLeader && (i+offset)%replicaSize == 0 { - // skip leader because prefer leader already tried it. - offset++ - } - idx = AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) - } + if state.lastIdx == state.leaderIdx && i == 0 { + idx = state.lastIdx } else { - // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader - // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. + // randomly select next replica, but skip state.lastIdx + if (i+offset)%replicaSize == 0 { + offset++ + } idx = AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) } selectReplica := selector.replicas[idx] From 753346d1ab8a464dce7b3f3c323ca5928e46408f Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 20 Jun 2023 18:05:25 +0800 Subject: [PATCH 7/9] handle region reload in check-resovle goroutine Signed-off-by: you06 --- internal/locate/region_cache.go | 75 ++++++++++++++++++++----------- internal/locate/region_request.go | 2 +- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 5ad23fe68..1e65dfc6c 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -458,6 +458,12 @@ type RegionCache struct { // requestLiveness always returns unreachable. mockRequestLiveness atomic.Pointer[livenessFunc] } + + regionsNeedReload struct { + sync.Mutex + regions []uint64 + toReload map[uint64]struct{} + } } // NewRegionCache creates a RegionCache. @@ -523,7 +529,11 @@ func (c *RegionCache) Close() { // asyncCheckAndResolveLoop with func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { ticker := time.NewTicker(interval) - defer ticker.Stop() + reloadRegionTicker := time.NewTicker(10 * time.Second) + defer func() { + ticker.Stop() + reloadRegionTicker.Stop() + }() var needCheckStores []*Store for { needCheckStores = needCheckStores[:0] @@ -542,6 +552,21 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { // there's a deleted store in the stores map which guaranteed by reReslve(). return state != unresolved && state != tombstone && state != deleted }) + case <-reloadRegionTicker.C: + for regionID := range c.regionsNeedReload.toReload { + c.reloadRegion(regionID) + delete(c.regionsNeedReload.toReload, regionID) + } + c.regionsNeedReload.Lock() + for _, regionID := range c.regionsNeedReload.regions { + // will reload in next tick, wait a while for two reasons: + // 1. there may an unavailable duration while recreating the connection. + // 2. the store may just be started, and wait safe ts synced to avoid the + // possible dataIsNotReady error. + c.regionsNeedReload.toReload[regionID] = struct{}{} + } + c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0] + c.regionsNeedReload.Unlock() } } } @@ -1227,34 +1252,34 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K }, nil } -func (c *RegionCache) asyncReloadRegion(region *Region) { +func (c *RegionCache) scheduleReloadRegion(region *Region) { if region == nil || !region.asyncReload.CompareAndSwap(false, true) { - // async reload triggered by other thread. + // async reload scheduled by other thread. return } - go func() { - // wait a while for two reasons: - // 1. there may an unavailable duration while recreating the connection. - // 2. the store may just be started, and wait safe ts synced to avoid the - // possible dataIsNotReady error. - time.Sleep(10 * time.Second) - regionID := region.GetID() - if regionID == 0 { - return - } - bo := retry.NewNoopBackoff(context.Background()) - lr, err := c.loadRegionByID(bo, regionID) - if err != nil { - // ignore error and use old region info. - logutil.Logger(bo.GetCtx()).Error("load region failure", - zap.Uint64("regionID", regionID), zap.Error(err)) - region.asyncReload.Store(false) - return + regionID := region.GetID() + if regionID > 0 { + c.regionsNeedReload.Lock() + c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID) + c.regionsNeedReload.Unlock() + } +} + +func (c *RegionCache) reloadRegion(regionID uint64) { + bo := retry.NewNoopBackoff(context.Background()) + lr, err := c.loadRegionByID(bo, regionID) + if err != nil { + // ignore error and use old region info. + logutil.Logger(bo.GetCtx()).Error("load region failure", + zap.Uint64("regionID", regionID), zap.Error(err)) + if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { + oldRegion.asyncReload.Store(false) } - c.mu.Lock() - c.insertRegionToCache(lr) - c.mu.Unlock() - }() + return + } + c.mu.Lock() + c.insertRegionToCache(lr) + c.mu.Unlock() } // GroupKeysByRegion separates keys into groups by their belonging Regions. diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 7bbd00816..c438dfc51 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -602,7 +602,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } } if reloadRegion { - selector.regionCache.asyncReloadRegion(selector.region) + selector.regionCache.scheduleReloadRegion(selector.region) } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { From ba1eaef14955c30889d72cdda6c018edc984a35d Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 27 Jun 2023 20:02:04 +0800 Subject: [PATCH 8/9] fix test Signed-off-by: you06 --- internal/locate/region_request.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c438dfc51..c40418be8 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -580,14 +580,17 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector reloadRegion := false for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { var idx AccessIndex - if state.lastIdx == state.leaderIdx && i == 0 { - idx = state.lastIdx - } else { - // randomly select next replica, but skip state.lastIdx - if (i+offset)%replicaSize == 0 { - offset++ + if state.option.preferLeader { + if i == 0 { + idx = state.lastIdx + } else { + // randomly select next replica, but skip state.lastIdx + if (i+offset)%replicaSize == 0 { + offset++ + } } - idx = AccessIndex((int(state.lastIdx) + i + offset) % replicaSize) + } else { + idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) } selectReplica := selector.replicas[idx] if state.isCandidate(idx, selectReplica) { From 034f36e40e19072a160a4abd2791320c2a93f0bf Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 29 Jun 2023 16:50:45 +0800 Subject: [PATCH 9/9] address comment Signed-off-by: you06 --- internal/locate/region_cache.go | 40 +++++++++++++------------ internal/locate/region_cache_test.go | 18 +++++------ internal/locate/region_request3_test.go | 4 +-- internal/locate/region_request_test.go | 4 +-- 4 files changed, 34 insertions(+), 32 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 1e65dfc6c..0090c9812 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -412,7 +412,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu { r.latestVersions = make(map[uint64]RegionVerID) r.sorted = NewSortedRegions(btreeDegree) for _, region := range rs { - r.insertRegionToCache(region) + r.insertRegionToCache(region, true) } return r } @@ -461,8 +461,7 @@ type RegionCache struct { regionsNeedReload struct { sync.Mutex - regions []uint64 - toReload map[uint64]struct{} + regions []uint64 } } @@ -517,8 +516,8 @@ func (c *RegionCache) clear() { } // thread unsafe, should use with lock -func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { - c.mu.insertRegionToCache(cachedRegion) +func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { + c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion) } // Close releases region cache's resource. @@ -535,6 +534,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { reloadRegionTicker.Stop() }() var needCheckStores []*Store + reloadNextLoop := make(map[uint64]struct{}) for { needCheckStores = needCheckStores[:0] select { @@ -553,9 +553,9 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { return state != unresolved && state != tombstone && state != deleted }) case <-reloadRegionTicker.C: - for regionID := range c.regionsNeedReload.toReload { + for regionID := range reloadNextLoop { c.reloadRegion(regionID) - delete(c.regionsNeedReload.toReload, regionID) + delete(reloadNextLoop, regionID) } c.regionsNeedReload.Lock() for _, regionID := range c.regionsNeedReload.regions { @@ -563,7 +563,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { // 1. there may an unavailable duration while recreating the connection. // 2. the store may just be started, and wait safe ts synced to avoid the // possible dataIsNotReady error. - c.regionsNeedReload.toReload[regionID] = struct{}{} + reloadNextLoop[regionID] = struct{}{} } c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0] c.regionsNeedReload.Unlock() @@ -1069,7 +1069,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } else if r.checkNeedReloadAndMarkUpdated() { // load region when it be marked as need reload. @@ -1082,7 +1082,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } } @@ -1223,7 +1223,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } else { r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } } @@ -1242,7 +1242,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() return &KeyLocation{ Region: r.VerID(), @@ -1278,7 +1278,7 @@ func (c *RegionCache) reloadRegion(regionID uint64) { return } c.mu.Lock() - c.insertRegionToCache(lr) + c.insertRegionToCache(lr, false) c.mu.Unlock() } @@ -1366,7 +1366,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey // TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information // for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it. for _, region := range regions { - c.insertRegionToCache(region) + c.insertRegionToCache(region, true) } return @@ -1440,7 +1440,9 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // insertRegionToCache tries to insert the Region to cache. // It should be protected by c.mu.l.Lock(). -func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { +// if `invalidateOldRegion` is false, the old region cache should be still valid, +// and it may still be used by some kv requests. +func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) if oldRegion != nil { store := cachedRegion.getStore() @@ -1455,8 +1457,8 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader { store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly)) } - // If the region info is async reloaded, the old region is still valid. - if !oldRegion.asyncReload.Load() { + // If the old region is still valid, do not invalidate it to avoid unnecessary backoff. + if invalidateOldRegion { // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. oldRegion.invalidate(Other) } @@ -1978,7 +1980,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext c.mu.Lock() for _, region := range newRegions { - c.insertRegionToCache(region) + c.insertRegionToCache(region, true) } c.mu.Unlock() @@ -2096,7 +2098,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV return } c.mu.Lock() - c.insertRegionToCache(new) + c.insertRegionToCache(new, true) c.mu.Unlock() }() } diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 03ef3e309..619da2d2e 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -966,7 +966,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() { region := createSampleRegion([]byte("k1"), []byte("k2")) region.meta.Id = 1 region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10} - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}} r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}} @@ -1257,7 +1257,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() { filterUnavailablePeers(cpRegion) region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) // OnSendFail should not panic s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) @@ -1293,7 +1293,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() { cpRegion := &pd.Region{Meta: cpMeta} region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) // OnSendFail should not panic s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) @@ -1466,12 +1466,12 @@ func (s *testRegionCacheSuite) TestBuckets() { fakeRegion.setStore(cachedRegion.getStore().clone()) // no buckets fakeRegion.getStore().buckets = nil - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(defaultBuckets, cachedRegion.getStore().buckets) // stale buckets fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1} - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(defaultBuckets, cachedRegion.getStore().buckets) // new buckets @@ -1481,7 +1481,7 @@ func (s *testRegionCacheSuite) TestBuckets() { Keys: buckets.Keys, } fakeRegion.getStore().buckets = newBuckets - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(newBuckets, cachedRegion.getStore().buckets) @@ -1614,7 +1614,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() { region, err := s.cache.loadRegion(s.bo, []byte("c"), false) s.Nil(err) s.Equal(region.GetID(), regions[0]) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) loc, err = s.cache.LocateKey(s.bo, []byte{'c'}) s.Nil(err) s.Equal(loc.Region.GetID(), regions[0]) @@ -1625,7 +1625,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() { region, err = s.cache.loadRegion(s.bo, []byte("e"), false) s.Nil(err) s.Equal(region.GetID(), regions[0]) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) loc, err = s.cache.LocateKey(s.bo, []byte{'e'}) s.Nil(err) s.Equal(loc.Region.GetID(), regions[0]) @@ -1739,7 +1739,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10) s.Equal(len(r), 2) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index e05c2cac5..ad29e4710 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -322,7 +322,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) cache.mu.Unlock() // Test accessFollower state with kv.ReplicaReadLearner request type. @@ -373,7 +373,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) cache.mu.Unlock() // Verify creating the replicaSelector. diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index b587e6c0f..151b2dbc1 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -608,7 +608,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) @@ -618,7 +618,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v3 := region.Region.confVer + 1 r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}} st = &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region)