Skip to content

Commit

Permalink
Resume the max retry check for stale read processing
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk committed Jul 24, 2023
1 parent 59adec2 commit 392ff23
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
15 changes: 14 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
logutil.BgLogger().Warn("unable to find stores with given labels")
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) {
if leader.isEpochStale() || state.IsLeaderExhausted(leader) {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand All @@ -593,6 +593,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
return rpcCtx, nil
}

func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
// Allow another extra retry for the following case:
// 1. The stale read is enabled and leader peer is selected as the target peer at first.
// 2. Data is not ready is returned from the leader peer.
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
// 4. The leader peer should be retried again using snapshot read.
if state.isStaleRead && state.option.leaderOnly {
return leader.isExhausted(2)
} else {
return leader.isExhausted(1)
}
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
if selector.checkLiveness(bo, selector.targetReplica()) != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
Expand Down
56 changes: 56 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package locate

import (
"context"
"strconv"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1018,3 +1019,58 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
s.Equal(0, bo.GetTotalBackoffTimes())
}
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
value := []byte("value")
isFirstReq := true

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
// Return `DataIsNotReady` for the first time on leader.
if isFirstReq {
isFirstReq = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}}

region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID())
s.True(region.isValid())

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
ops = append(ops, WithMatchLabels(leaderLabel))

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
s.Equal(getResp.Value, value)
}

0 comments on commit 392ff23

Please sign in to comment.