From 7c3284bc22a29c9e36aad74b99fa9ba32275b1cc Mon Sep 17 00:00:00 2001 From: you06 Date: Sat, 6 May 2023 12:00:45 +0800 Subject: [PATCH] avoid dataIsNotReady error while retrying stale read on the leader(#765) (#789) * avoid dataIsNotReady error while retrying stale read on the leader (#765) * avoid dataIsNotReady error while retrying stale read on the leader Signed-off-by: artem_danilov * move StaleRead flag reset to retry section Signed-off-by: artem_danilov * move all logic to #next and allow retry on the leader Signed-off-by: artem_danilov --------- Signed-off-by: artem_danilov Co-authored-by: artem_danilov Signed-off-by: you06 * add context patcher for 65 Signed-off-by: you06 * fmt Signed-off-by: you06 --------- Signed-off-by: artem_danilov Signed-off-by: you06 Co-authored-by: Artem Danilov <329970+Tema@users.noreply.github.com> Co-authored-by: artem_danilov --- internal/locate/region_cache.go | 12 +++++++++ internal/locate/region_request.go | 29 +++++++++++++++++----- internal/locate/region_request3_test.go | 33 ++++++++++++++----------- 3 files changed, 53 insertions(+), 21 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8f3269df5..76a71cdfb 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -525,6 +525,8 @@ type RPCContext struct { ProxyStore *Store // nil means proxy is not used ProxyAddr string // valid when ProxyStore is not nil TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers. + + contextPatcher contextPatcher // kvrpcpb.Context fields that need to be overridden } func (c *RPCContext) String() string { @@ -540,6 +542,16 @@ func (c *RPCContext) String() string { return res } +type contextPatcher struct { + staleRead *bool +} + +func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) { + if patcher.staleRead != nil { + pbCtx.StaleRead = *patcher.staleRead + } +} + type storeSelectorOp struct { leaderOnly bool labels []*metapb.StoreLabel diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index b39eb5a43..9a82cb6d0 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -283,6 +283,7 @@ type replicaSelector struct { // | reachable +-----+-----+ all proxies are tried ^ // +------------+tryNewProxy+-------------------------+ // +-----------+ + type selectorState interface { next(*retry.Backoffer, *replicaSelector) (*RPCContext, error) onSendSuccess(*replicaSelector) @@ -523,6 +524,7 @@ type accessFollower struct { } func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + resetStaleRead := false if state.lastIdx < 0 { if state.tryLeader { state.lastIdx = AccessIndex(rand.Intn(len(selector.replicas))) @@ -543,6 +545,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector // if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector. if state.isGlobalStaleRead { WithLeaderOnly()(&state.option) + // retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read + resetStaleRead = true } state.lastIdx++ } @@ -561,7 +565,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector logutil.BgLogger().Warn("unable to find stores with given labels") } leader := selector.replicas[state.leaderIdx] - if leader.isEpochStale() || leader.isExhausted(1) { + if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -569,7 +573,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector state.lastIdx = state.leaderIdx selector.targetIdx = state.leaderIdx } - return selector.buildRPCContext(bo) + rpcCtx, err := selector.buildRPCContext(bo) + if err != nil || rpcCtx == nil { + return nil, err + } + if resetStaleRead { + staleRead := false + rpcCtx.contextPatcher.staleRead = &staleRead + } + return rpcCtx, nil } func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { @@ -1133,6 +1145,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { return nil, false, err } + rpcCtx.contextPatcher.applyTo(&req.Context) // judge the store limit switch. if limit := kv.StoreLimit.Load(); limit > 0 { if err := s.getStoreToken(rpcCtx.Store, limit); err != nil { @@ -1599,10 +1612,14 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()), zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()), zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()), - zap.Stringer("ctx", ctx)) - err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready")) - if err != nil { - return false, err + zap.Stringer("ctx", ctx), + ) + if !req.IsGlobalStaleRead() { + // only backoff local stale reads as global should retry immediately against the leader as a normal read + err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready")) + if err != nil { + return false, err + } } return true, nil } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index c0f579ab6..34cd7e856 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -846,30 +846,33 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) req.ReadReplicaScope = oracle.GlobalTxnScope req.TxnScope = oracle.GlobalTxnScope - req.EnableStaleRead() - for i := 0; i < 5; i++ { + for i := 0; i < 10; i++ { + req.EnableStaleRead() // The request may be sent to the leader directly. We have to distinguish it. - failureOnFollower := false + failureOnFollower := 0 + failureOnLeader := 0 s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { if addr != s.cluster.GetStore(s.storeIDs[0]).Address { - failureOnFollower = true + failureOnFollower++ + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil + } else if failureOnLeader == 0 && i%2 == 0 { + failureOnLeader++ return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil + } else { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil } - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil }} sender.SendReq(bo, req, region.Region, time.Second) state, ok := sender.replicaSelector.state.(*accessFollower) s.True(ok) - s.True(!failureOnFollower || state.option.leaderOnly) - totalAttempts := 0 - for idx, replica := range sender.replicaSelector.replicas { - totalAttempts += replica.attempts - if idx == int(state.leaderIdx) { - s.Equal(1, replica.attempts) - } else { - s.True(replica.attempts <= 1) - } + s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed + if failureOnFollower == 0 && failureOnLeader == 0 { + // if the request goes to the leader and succeeds then it is executed as a StaleRead + s.True(req.StaleRead) + } else { + // otherwise #leaderOnly flag should be set and retry request as a normal read + s.True(state.option.leaderOnly) + s.False(req.StaleRead) } - s.True(totalAttempts <= 2) } }