Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce a random jitter to region cache ttl #1148

Merged
merged 5 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 154 additions & 64 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
Expand All @@ -81,9 +82,9 @@ import (
)

const (
btreeDegree = 32
invalidatedLastAccessTime = -1
defaultRegionsPerBatch = 128
btreeDegree = 32
expiredTTL = -1
defaultRegionsPerBatch = 128
)

// LabelFilter returns false means label doesn't match, and will ignore this store.
Expand Down Expand Up @@ -121,6 +122,29 @@ func SetRegionCacheTTLSec(t int64) {
regionCacheTTLSec = t
}

// regionCacheTTLJitterSec is the max jitter time for region cache TTL.
var regionCacheTTLJitterSec int64 = 60

// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter).
func SetRegionCacheTTLWithJitter(base int64, jitter int64) {
regionCacheTTLSec = base
regionCacheTTLJitterSec = jitter
}

// nextTTL returns a random TTL in range [ts+base, ts+base+jitter). The input ts should be an epoch timestamp in seconds.
func nextTTL(ts int64) int64 {
jitter := int64(0)
if regionCacheTTLJitterSec > 0 {
jitter = rand.Int63n(regionCacheTTLJitterSec)
}
return ts + regionCacheTTLSec + jitter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to add some constraint checks to make sure the unit of the input ts is second, to avoid being used mistakenly?

Copy link
Contributor Author

@zyguan zyguan Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pass ts here to avoid overhead of time.Now(). To validate it, maybe we can:

  • record process start ts by a global var (eg. processStartTs)
  • the ts might be invalid if ts - processStartTs > 100 years or ts < processStartTs
  • if the ts is invalid, we can fallback to call time.Now()

But do we really need to do that? There are many mechanisms to invalidate a stale region, it might be not a big problem even when we call nextTTL with epoch millis, I guess.

Or, we can just change the function signature (also many releated functions like checkRegionCacheTTL), let it (them) accept an arg of type time.Time. (I've tried this, but gave up, since it will introduce a lot of changes)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could just add some comments to explain the constraints of the input ts in the comment if it's not a big problem when used mistakenly.

}

// nextTTLWithoutJitter is used for test.
func nextTTLWithoutJitter(ts int64) int64 {
return ts + regionCacheTTLSec
}

const (
needReloadOnAccess int32 = 1 << iota // indicates the region will be reloaded on next access
needExpireAfterTTL // indicates the region will expire after RegionCacheTTL (even when it's accessed continuously)
Expand Down Expand Up @@ -150,11 +174,30 @@ const (
Other
)

func (r InvalidReason) String() string {
switch r {
case Ok:
return "Ok"
case Other:
return "Other"
case EpochNotMatch:
return "EpochNotMatch"
case RegionNotFound:
return "RegionNotFound"
case StoreNotFound:
return "StoreNotFound"
case NoLeader:
return "NoLeader"
default:
return "Unknown"
}
}

// Region presents kv region
type Region struct {
meta *metapb.Region // raw region meta from PD, immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
lastAccess int64 // last region access time, see checkRegionCacheTTL
ttl int64 // region TTL in epoch seconds, see checkRegionCacheTTL
syncFlags int32 // region need be sync later, see needReloadOnAccess, needExpireAfterTTL
invalidReason InvalidReason // the reason why the region is invalidated
}
Expand Down Expand Up @@ -338,7 +381,7 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
}

// mark region has been init accessed.
r.lastAccess = time.Now().Unix()
r.ttl = nextTTL(time.Now().Unix())
return r, nil
}

Expand All @@ -356,8 +399,7 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool {
}

func (r *Region) isCacheTTLExpired(ts int64) bool {
lastAccess := atomic.LoadInt64(&r.lastAccess)
return ts-lastAccess > regionCacheTTLSec
return ts > atomic.LoadInt64(&r.ttl)
}

// checkRegionCacheTTL returns false means the region cache is expired.
Expand All @@ -366,28 +408,36 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool {
if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil {
r.invalidate(Other)
}
newTTL := int64(0)
for {
lastAccess := atomic.LoadInt64(&r.lastAccess)
if ts-lastAccess > regionCacheTTLSec {
ttl := atomic.LoadInt64(&r.ttl)
if ts > ttl {
return false
}
if r.checkSyncFlags(needExpireAfterTTL) || atomic.CompareAndSwapInt64(&r.lastAccess, lastAccess, ts) {
// skip updating TTL when:
// 1. the region has been marked as `needExpireAfterTTL`
// 2. the TTL is far away from ts (still within jitter time)
if r.checkSyncFlags(needExpireAfterTTL) || ttl > ts+regionCacheTTLSec {
return true
}
if newTTL == 0 {
newTTL = nextTTL(ts)
}
you06 marked this conversation as resolved.
Show resolved Hide resolved
// now we have ts <= ttl <= ts+regionCacheTTLSec <= newTTL = ts+regionCacheTTLSec+randomJitter
if atomic.CompareAndSwapInt64(&r.ttl, ttl, newTTL) {
return true
}
}
}

// invalidate invalidates a region, next time it will got null result.
func (r *Region) invalidate(reason InvalidReason) {
metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc()
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
}

// invalidateWithoutMetrics invalidates a region without metrics, next time it will got null result.
func (r *Region) invalidateWithoutMetrics(reason InvalidReason) {
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
func (r *Region) invalidate(reason InvalidReason, nocount ...bool) {
if atomic.CompareAndSwapInt32((*int32)(&r.invalidReason), int32(Ok), int32(reason)) {
if len(nocount) == 0 || !nocount[0] {
metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc()
}
atomic.StoreInt64(&r.ttl, expiredTTL)
}
}

func (r *Region) getSyncFlags() int32 {
Expand Down Expand Up @@ -1068,9 +1118,15 @@ func (c *RegionCache) LocateEndKey(bo *retry.Backoffer, key []byte) (*KeyLocatio
}

func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey bool) (r *Region, err error) {
r = c.searchCachedRegion(key, isEndKey)
if r == nil {
var expired bool
r, expired = c.searchCachedRegionByKey(key, isEndKey)
tag := "ByKey"
if isEndKey {
tag = "ByEndKey"
}
if r == nil || expired {
// load region when it is not exists or expired.
observeLoadRegion(tag, r, expired, 0)
lr, err := c.loadRegion(bo, key, isEndKey, pd.WithAllowFollowerHandle())
if err != nil {
// no region data, return error if failure.
Expand All @@ -1083,6 +1139,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
c.mu.Unlock()
// just retry once, it won't bring much overhead.
if stale {
observeLoadRegion(tag+":Retry", r, expired, 0)
lr, err = c.loadRegion(bo, key, isEndKey)
if err != nil {
// no region data, return error if failure.
Expand All @@ -1101,8 +1158,10 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
err error
)
if reloadOnAccess {
observeLoadRegion(tag, r, expired, flags)
lr, err = c.loadRegion(bo, key, isEndKey)
} else {
observeLoadRegion("ByID", r, expired, flags)
lr, err = c.loadRegionByID(bo, r.GetID())
}
if err != nil {
Expand All @@ -1122,8 +1181,9 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
}

func (c *RegionCache) tryFindRegionByKey(key []byte, isEndKey bool) (r *Region) {
r = c.searchCachedRegion(key, isEndKey)
if r == nil || r.checkSyncFlags(needReloadOnAccess) {
var expired bool
r, expired = c.searchCachedRegionByKey(key, isEndKey)
if r == nil || expired || r.checkSyncFlags(needReloadOnAccess) {
return nil
}
return r
Expand Down Expand Up @@ -1242,12 +1302,11 @@ func (c *RegionCache) OnSendFail(bo *retry.Backoffer, ctx *RPCContext, scheduleR

// LocateRegionByID searches for the region with ID.
func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*KeyLocation, error) {
c.mu.RLock()
r := c.getRegionByIDFromCache(regionID)
c.mu.RUnlock()
if r != nil {
if flags := r.resetSyncFlags(needReloadOnAccess); flags > 0 {
r, expired := c.searchCachedRegionByID(regionID)
if r != nil && !expired {
if flags := r.resetSyncFlags(needReloadOnAccess | needDelayedReloadReady); flags > 0 {
reloadOnAccess := flags&needReloadOnAccess > 0
observeLoadRegion("ByID", r, expired, flags)
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
Expand All @@ -1269,6 +1328,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
return loc, nil
}

observeLoadRegion("ByID", r, expired, 0)
r, err := c.loadRegionByID(bo, regionID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1500,11 +1560,7 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
if shouldCount {
region.cachedRegion.invalidate(Other)
} else {
region.cachedRegion.invalidateWithoutMetrics(Other)
}
region.cachedRegion.invalidate(Other, !shouldCount)
}
}
// update related vars.
Expand All @@ -1513,47 +1569,33 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
return true
}

// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// it should be called with c.mu.RLock(), and the returned Region should not be
// used after c.mu is RUnlock().
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
// when processing in reverse order.
func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region {
ts := time.Now().Unix()
var r *Region
// searchCachedRegionByKey finds the region from cache by key.
func (c *RegionCache) searchCachedRegionByKey(key []byte, isEndKey bool) (*Region, bool) {
c.mu.RLock()
r = c.mu.sorted.DescendLessOrEqual(key, isEndKey, ts)
region := c.mu.sorted.SearchByKey(key, isEndKey)
c.mu.RUnlock()
if r != nil && (!isEndKey && r.Contains(key) || isEndKey && r.ContainsByEnd(key)) {
return r
if region == nil {
return nil, false
}
return nil
return region, !region.checkRegionCacheTTL(time.Now().Unix())
}

// getRegionByIDFromCache tries to get region by regionID from cache. Like
// `getCachedRegion`, it should be called with c.mu.RLock(), and the returned
// Region should not be used after c.mu is RUnlock().
func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
ts := time.Now().Unix()
// searchCachedRegionByID finds the region from cache by id.
func (c *RegionCache) searchCachedRegionByID(regionID uint64) (*Region, bool) {
c.mu.RLock()
ver, ok := c.mu.latestVersions[regionID]
if !ok {
return nil
c.mu.RUnlock()
return nil, false
}
latestRegion, ok := c.mu.regions[ver]
region, ok := c.mu.regions[ver]
c.mu.RUnlock()
if !ok {
// should not happen
logutil.BgLogger().Warn("region version not found",
zap.Uint64("regionID", regionID), zap.Stringer("version", &ver))
return nil
}
lastAccess := atomic.LoadInt64(&latestRegion.lastAccess)
if ts-lastAccess > regionCacheTTLSec {
return nil
}
if !latestRegion.checkSyncFlags(needExpireAfterTTL) {
atomic.CompareAndSwapInt64(&latestRegion.lastAccess, atomic.LoadInt64(&latestRegion.lastAccess), ts)
logutil.BgLogger().Warn("region not found", zap.Uint64("id", regionID), zap.Stringer("ver", &ver))
return nil, false
}
return latestRegion
return region, !region.checkRegionCacheTTL(time.Now().Unix())
}

// GetStoresByType gets stores by type `typ`
Expand All @@ -1570,6 +1612,53 @@ func (c *RegionCache) GetAllStores() []*Store {
})
}

var loadRegionCounters sync.Map

const (
loadRegionReasonMissing = "Missing"
loadRegionReasonExpiredNormal = "Expired:Normal"
loadRegionReasonExpiredFrozen = "Expired:Frozen"
loadRegionReasonExpiredInvalid = "Expired:Invalid:"
loadRegionReasonReloadOnAccess = "Reload:OnAccess"
loadRegionReasonReloadDelayed = "Reload:Delayed"
loadRegionReasonUpdateBuckets = "UpdateBuckets"
loadRegionReasonUnknown = "Unknown"
)

func observeLoadRegion(tag string, region *Region, expired bool, reloadFlags int32, explicitReason ...string) {
reason := loadRegionReasonUnknown
if len(explicitReason) > 0 {
reason = strings.Join(explicitReason, ":")
} else if region == nil {
reason = loadRegionReasonMissing
} else if expired {
invalidReason := InvalidReason(atomic.LoadInt32((*int32)(&region.invalidReason)))
if invalidReason != Ok {
reason = loadRegionReasonExpiredInvalid + invalidReason.String()
} else if region.checkSyncFlags(needExpireAfterTTL) {
reason = loadRegionReasonExpiredFrozen
} else {
reason = loadRegionReasonExpiredNormal
}
} else if reloadFlags > 0 {
if reloadFlags&needReloadOnAccess > 0 {
reason = loadRegionReasonReloadOnAccess
} else if reloadFlags&needDelayedReloadReady > 0 {
reason = loadRegionReasonReloadDelayed
}
}
type key struct {
t string
r string
}
counter, ok := loadRegionCounters.Load(key{tag, reason})
if !ok {
counter = metrics.TiKVLoadRegionCounter.WithLabelValues(tag, reason)
loadRegionCounters.Store(key{tag, reason}, counter)
}
counter.(prometheus.Counter).Inc()
}

// loadRegion loads region from pd client, and picks the first peer as leader.
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
// when processing in reverse order.
Expand Down Expand Up @@ -2062,6 +2151,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
// TODO(youjiali1995): use singleflight.
go func() {
bo := retry.NewBackoffer(context.Background(), 20000)
observeLoadRegion("ByID", r, false, 0, loadRegionReasonUpdateBuckets)
new, err := c.loadRegionByID(bo, regionID.id)
if err != nil {
logutil.Logger(bo.GetCtx()).Error("failed to update buckets",
Expand Down
Loading
Loading