Skip to content

Commit

Permalink
avoid dataIsNotReady error while retrying stale read on the leader(#765
Browse files Browse the repository at this point in the history
…) (#789)

* avoid dataIsNotReady error while retrying stale read on the leader (#765)

* avoid dataIsNotReady error while retrying stale read on the leader

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

* move StaleRead flag reset to retry section

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

* move all logic to #next and allow retry on the leader

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

---------

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>
Co-authored-by: artem_danilov <artem_danilov@airbnb.com>
Signed-off-by: you06 <you1474600@gmail.com>

* add context patcher for 65

Signed-off-by: you06 <you1474600@gmail.com>

* fmt

Signed-off-by: you06 <you1474600@gmail.com>

---------

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>
Signed-off-by: you06 <you1474600@gmail.com>
Co-authored-by: Artem Danilov <329970+Tema@users.noreply.github.com>
Co-authored-by: artem_danilov <artem_danilov@airbnb.com>
  • Loading branch information
3 people authored May 6, 2023
1 parent cd83d15 commit 7c3284b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 21 deletions.
12 changes: 12 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ type RPCContext struct {
ProxyStore *Store // nil means proxy is not used
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.

contextPatcher contextPatcher // kvrpcpb.Context fields that need to be overridden
}

func (c *RPCContext) String() string {
Expand All @@ -540,6 +542,16 @@ func (c *RPCContext) String() string {
return res
}

type contextPatcher struct {
staleRead *bool
}

func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
if patcher.staleRead != nil {
pbCtx.StaleRead = *patcher.staleRead
}
}

type storeSelectorOp struct {
leaderOnly bool
labels []*metapb.StoreLabel
Expand Down
29 changes: 23 additions & 6 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ type replicaSelector struct {
// | reachable +-----+-----+ all proxies are tried ^
// +------------+tryNewProxy+-------------------------+
// +-----------+

type selectorState interface {
next(*retry.Backoffer, *replicaSelector) (*RPCContext, error)
onSendSuccess(*replicaSelector)
Expand Down Expand Up @@ -523,6 +524,7 @@ type accessFollower struct {
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
resetStaleRead := false
if state.lastIdx < 0 {
if state.tryLeader {
state.lastIdx = AccessIndex(rand.Intn(len(selector.replicas)))
Expand All @@ -543,6 +545,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector.
if state.isGlobalStaleRead {
WithLeaderOnly()(&state.option)
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read
resetStaleRead = true
}
state.lastIdx++
}
Expand All @@ -561,15 +565,23 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
logutil.BgLogger().Warn("unable to find stores with given labels")
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || leader.isExhausted(1) {
if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
}
state.lastIdx = state.leaderIdx
selector.targetIdx = state.leaderIdx
}
return selector.buildRPCContext(bo)
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return nil, err
}
if resetStaleRead {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
}
return rpcCtx, nil
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
Expand Down Expand Up @@ -1133,6 +1145,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, false, err
}
rpcCtx.contextPatcher.applyTo(&req.Context)
// judge the store limit switch.
if limit := kv.StoreLimit.Load(); limit > 0 {
if err := s.getStoreToken(rpcCtx.Store, limit); err != nil {
Expand Down Expand Up @@ -1599,10 +1612,14 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()),
zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()),
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
if err != nil {
return false, err
zap.Stringer("ctx", ctx),
)
if !req.IsGlobalStaleRead() {
// only backoff local stale reads as global should retry immediately against the leader as a normal read
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
if err != nil {
return false, err
}
}
return true, nil
}
Expand Down
33 changes: 18 additions & 15 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,30 +846,33 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
for i := 0; i < 5; i++ {
for i := 0; i < 10; i++ {
req.EnableStaleRead()
// The request may be sent to the leader directly. We have to distinguish it.
failureOnFollower := false
failureOnFollower := 0
failureOnLeader := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr != s.cluster.GetStore(s.storeIDs[0]).Address {
failureOnFollower = true
failureOnFollower++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else if failureOnLeader == 0 && i%2 == 0 {
failureOnLeader++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}}
sender.SendReq(bo, req, region.Region, time.Second)
state, ok := sender.replicaSelector.state.(*accessFollower)
s.True(ok)
s.True(!failureOnFollower || state.option.leaderOnly)
totalAttempts := 0
for idx, replica := range sender.replicaSelector.replicas {
totalAttempts += replica.attempts
if idx == int(state.leaderIdx) {
s.Equal(1, replica.attempts)
} else {
s.True(replica.attempts <= 1)
}
s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed
if failureOnFollower == 0 && failureOnLeader == 0 {
// if the request goes to the leader and succeeds then it is executed as a StaleRead
s.True(req.StaleRead)
} else {
// otherwise #leaderOnly flag should be set and retry request as a normal read
s.True(state.option.leaderOnly)
s.False(req.StaleRead)
}
s.True(totalAttempts <= 2)
}
}

0 comments on commit 7c3284b

Please sign in to comment.