From e601a9d7f69926b27dbb9339d5f002dbe235b9d3 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 24 Jul 2023 17:28:36 +0800 Subject: [PATCH] Resume max retry time check for stale read retry with leader option Signed-off-by: cfzjywxk --- internal/locate/region_request.go | 15 ++++++- internal/locate/region_request3_test.go | 56 +++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 1edad08e8..f54320c08 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -574,7 +574,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector logutil.BgLogger().Warn("unable to find stores with given labels") } leader := selector.replicas[state.leaderIdx] - if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) { + if leader.isEpochStale() || state.IsLeaderExhausted(leader) { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -593,6 +593,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector return rpcCtx, nil } +func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { + // Allow another extra retry for the following case: + // 1. The stale read is enabled and leader peer is selected as the target peer at first. + // 2. Data is not ready is returned from the leader peer. + // 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer. + // 4. The leader peer should be retried again using snapshot read. + if state.isStaleRead && state.option.leaderOnly { + return leader.isExhausted(2) + } else { + return leader.isExhausted(1) + } +} + func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { if selector.checkLiveness(bo, selector.targetReplica()) != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 62912d143..d85afd01c 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -36,6 +36,7 @@ package locate import ( "context" + "strconv" "sync/atomic" "testing" "time" @@ -1018,3 +1019,58 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() s.Equal(0, bo.GetTotalBackoffTimes()) } } + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + value := []byte("value") + isFirstReq := true + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + // Return `DataIsNotReady` for the first time on leader. + if isFirstReq { + isFirstReq = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + }} + + region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + s.True(region.isValid()) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + ops = append(ops, WithMatchLabels(leaderLabel)) + + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + s.Equal(getResp.Value, value) +}