Skip to content

Commit

Permalink
avoid dataIsNotReady error while retrying stale read on the leader (t…
Browse files Browse the repository at this point in the history
…ikv#765)

* avoid dataIsNotReady error while retrying stale read on the leader

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

* move StaleRead flag reset to retry section

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

* move all logic to #next and allow retry on the leader

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

---------

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>
Co-authored-by: artem_danilov <artem_danilov@airbnb.com>
  • Loading branch information
2 people authored and you06 committed May 4, 2023
1 parent cd83d15 commit 1c140f4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 24 deletions.
10 changes: 10 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,16 @@ func (c *RPCContext) String() string {
return res
}

type contextPatcher struct {
staleRead *bool
}

func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
if patcher.staleRead != nil {
pbCtx.StaleRead = *patcher.staleRead
}
}

type storeSelectorOp struct {
leaderOnly bool
labels []*metapb.StoreLabel
Expand Down
36 changes: 27 additions & 9 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,9 @@ type replicaSelector struct {
// selectorState is the interface of states of the replicaSelector.
// Here is the main state transition diagram:
//
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
//
// +-------->+ accessKnownLeader +----------------+
// | +------+------------+ |
// | | |
Expand All @@ -282,7 +283,8 @@ type replicaSelector struct {
// | leader becomes v +---+---+
// | reachable +-----+-----+ all proxies are tried ^
// +------------+tryNewProxy+-------------------------+
// +-----------+
//
// +-----------+
type selectorState interface {
next(*retry.Backoffer, *replicaSelector) (*RPCContext, error)
onSendSuccess(*replicaSelector)
Expand Down Expand Up @@ -523,6 +525,8 @@ type accessFollower struct {
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
replicaSize := len(selector.replicas)
resetStaleRead := false
if state.lastIdx < 0 {
if state.tryLeader {
state.lastIdx = AccessIndex(rand.Intn(len(selector.replicas)))
Expand All @@ -543,6 +547,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector.
if state.isGlobalStaleRead {
WithLeaderOnly()(&state.option)
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read
resetStaleRead = true
}
state.lastIdx++
}
Expand All @@ -561,15 +567,23 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
logutil.BgLogger().Warn("unable to find stores with given labels")
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || leader.isExhausted(1) {
if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
}
state.lastIdx = state.leaderIdx
selector.targetIdx = state.leaderIdx
}
return selector.buildRPCContext(bo)
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return nil, err
}
if resetStaleRead {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
}
return rpcCtx, nil
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
Expand Down Expand Up @@ -1599,10 +1613,14 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()),
zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()),
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
if err != nil {
return false, err
zap.Stringer("ctx", ctx),
)
if !req.IsGlobalStaleRead() {
// only backoff local stale reads as global should retry immediately against the leader as a normal read
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
if err != nil {
return false, err
}
}
return true, nil
}
Expand Down
33 changes: 18 additions & 15 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,30 +846,33 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
for i := 0; i < 5; i++ {
for i := 0; i < 10; i++ {
req.EnableStaleRead()
// The request may be sent to the leader directly. We have to distinguish it.
failureOnFollower := false
failureOnFollower := 0
failureOnLeader := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr != s.cluster.GetStore(s.storeIDs[0]).Address {
failureOnFollower = true
failureOnFollower++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else if failureOnLeader == 0 && i%2 == 0 {
failureOnLeader++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}}
sender.SendReq(bo, req, region.Region, time.Second)
state, ok := sender.replicaSelector.state.(*accessFollower)
s.True(ok)
s.True(!failureOnFollower || state.option.leaderOnly)
totalAttempts := 0
for idx, replica := range sender.replicaSelector.replicas {
totalAttempts += replica.attempts
if idx == int(state.leaderIdx) {
s.Equal(1, replica.attempts)
} else {
s.True(replica.attempts <= 1)
}
s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed
if failureOnFollower == 0 && failureOnLeader == 0 {
// if the request goes to the leader and succeeds then it is executed as a StaleRead
s.True(req.StaleRead)
} else {
// otherwise #leaderOnly flag should be set and retry request as a normal read
s.True(state.option.leaderOnly)
s.False(req.StaleRead)
}
s.True(totalAttempts <= 2)
}
}

0 comments on commit 1c140f4

Please sign in to comment.