Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fallback to follower when leader is busy #916

Merged
merged 5 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,17 @@ func (c *RPCContext) String() string {
}

type contextPatcher struct {
staleRead *bool
staleRead *bool
replicaRead *bool
}

func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
if patcher.staleRead != nil {
pbCtx.StaleRead = *patcher.staleRead
}
if patcher.replicaRead != nil {
pbCtx.ReplicaRead = *patcher.replicaRead
}
}

type storeSelectorOp struct {
Expand Down Expand Up @@ -1191,9 +1195,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
c.mu.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an extra fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a possible data race.

if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
atomic.StoreInt32(&oldRegion.asyncReload, 0)
atomic.CompareAndSwapInt32(&oldRegion.asyncReload, 1, 0)
}
c.mu.RUnlock()
return
}
c.mu.Lock()
Expand Down
92 changes: 77 additions & 15 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,38 +371,75 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
// the leader will be updated to replicas[0] and give it another chance.
type tryFollower struct {
stateBase
leaderIdx AccessIndex
lastIdx AccessIndex
// if the leader is unavailable, but it still holds the leadership, fallbackFromLeader is true and replica read is enabled.
fallbackFromLeader bool
you06 marked this conversation as resolved.
Show resolved Hide resolved
leaderIdx AccessIndex
lastIdx AccessIndex
labels []*metapb.StoreLabel
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
var targetReplica *replica
// Search replica that is not attempted from the last accessed replica
for i := 1; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if idx == state.leaderIdx {
continue
filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) {
for i := 0; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if idx == state.leaderIdx {
continue
}
selectReplica := selector.replicas[idx]
if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable {
return idx, selectReplica
}
}
targetReplica = selector.replicas[idx]
// Each follower is only tried once
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable {
return -1, nil
}

if len(state.labels) > 0 {
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return selectReplica.store.IsLabelsMatch(state.labels)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the selectReplica.isExhausted(1) check missing here? How about putting it into the default filterReplicas and pass a nil checker function if there's no labels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The replica may be exhausted by data-is-not-ready, which does not affect follower read.

})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
selector.targetIdx = idx
}
// labels only take effect for first try.
state.labels = nil
}
if selector.targetIdx < 0 {
// Search replica that is not attempted from the last accessed replica
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
return !selectReplica.isExhausted(1)
})
if selectReplica != nil && idx >= 0 {
state.lastIdx = idx
selector.targetIdx = idx
break
}
}

// If all followers are tried and fail, backoff and retry.
if selector.targetIdx < 0 {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
}
return selector.buildRPCContext(bo)
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return rpcCtx, err
}
if state.fallbackFromLeader {
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
return rpcCtx, nil
}

func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The former naming and meaning of the switchWorkLeaderToPeer function is quite confusing, I don't understand what's the purpose of it..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The former usage of tryFollower is after the failure of accessKnownLeader, in this case, if one of the follower can serve the leader-read request, it's the new leader, so switch the leader to this peer.

panic("the store must exist")
if !state.fallbackFromLeader {
peer := selector.targetReplica().peer
if !selector.region.switchWorkLeaderToPeer(peer) {
logutil.BgLogger().Warn("the store must exist",
zap.Uint64("store", peer.StoreId),
zap.Uint64("peer", peer.Id))
}
}
}

Expand Down Expand Up @@ -888,6 +925,27 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
s.region.invalidate(StoreNotFound)
}

// For some reason, the leader is unreachable by now, try followers instead.
func (s *replicaSelector) fallback2Follower(ctx *RPCContext) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By now is the only situation that would be used the stale read fallback -> leader -> fallback replicas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when fallbacking to replica from leader, it's a follower read request, not stale read.

if ctx == nil || s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if state.lastIdx != state.leaderIdx {
return false
}
s.state = &tryFollower{
fallbackFromLeader: true,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
return true
}

func (s *replicaSelector) invalidateRegion() {
if s.region != nil {
s.region.invalidate(Other)
Expand Down Expand Up @@ -1566,6 +1624,10 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
if s.replicaSelector.fallback2Follower(ctx) {
// immediately retry on followers.
return true, nil
}
if ctx != nil && ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() {
err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
} else {
Expand Down
92 changes: 91 additions & 1 deletion internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
}
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Leader() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Expand Down Expand Up @@ -1100,3 +1100,93 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
s.NotNil(regionErr.GetEpochNotMatch())
s.Nil(regionErr.GetDiskFull())
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
var followerID *uint64
for _, storeID := range s.storeIDs {
if storeID != leaderStore.storeID {
followerID = &storeID
break
}
}
s.NotNil(followerID)
followerLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(*followerID, 10),
},
}

regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)

dataIsNotReady := false
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
if dataIsNotReady && req.StaleRead {
dataIsNotReady = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
if addr == leaderStore.addr {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
}}}, nil
}
if !req.ReplicaRead {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
NotLeader: &errorpb.NotLeader{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil
}}

for _, localLeader := range []bool{true, false} {
dataIsNotReady = true
// data is not ready, then server is busy in the first round,
// directly server is busy in the second round.
for i := 0; i < 2; i++ {
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
if localLeader {
ops = append(ops, WithMatchLabels(leaderLabel))
} else {
ops = append(ops, WithMatchLabels(followerLabel))
}

ctx, _ := context.WithTimeout(context.Background(), 10000*time.Second)
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
if localLeader {
s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value))
} else {
s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value))
}
}
}
}
Loading