Skip to content

Commit

Permalink
backup: more stable check liveness when store heartbeat behind too mu…
Browse files Browse the repository at this point in the history
…ch (#48403)

close #48405
  • Loading branch information
3pointer authored Dec 14, 2023
1 parent c48e3cb commit 7e5a5b2
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 20 deletions.
3 changes: 2 additions & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ go_test(
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 9,
shard_count = 10,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb",
Expand All @@ -89,6 +89,7 @@ go_test(
"//pkg/util/codec",
"//pkg/util/table-filter",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
Expand Down
84 changes: 67 additions & 17 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,23 +1011,74 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findTargetPeer(ctx context.Context, key []byte, isRawKv bool, targetStoreIds map[uint64]struct{}) (*metapb.Peer, error) {
func (bc *Client) FindTargetPeer(ctx context.Context, key []byte, isRawKv bool, targetStoreIds map[uint64]struct{}) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
var leader *metapb.Peer
key = codec.EncodeBytesExt([]byte{}, key, isRawKv)
for i := 0; i < 5; i++ {
// better backoff.
state := utils.InitialRetryState(60, 100*time.Millisecond, 2*time.Second)
failpoint.Inject("retry-state-on-find-target-peer", func(v failpoint.Value) {
logutil.CL(ctx).Info("reset state for FindTargetPeer")
state = utils.InitialRetryState(v.(int), 100*time.Millisecond, 100*time.Millisecond)
})
err := utils.WithRetry(ctx, func() error {
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
failpoint.Inject("return-region-on-find-target-peer", func(v failpoint.Value) {
switch v.(string) {
case "nil":
{
region = nil
}
case "hasLeader":
{
region = &pd.Region{
Leader: &metapb.Peer{
Id: 42,
},
}
}
case "hasPeer":
{
region = &pd.Region{
Meta: &metapb.Region{
Peers: []*metapb.Peer{
{
Id: 43,
StoreId: 42,
},
},
},
}
}

case "noLeader":
{
region = &pd.Region{
Leader: nil,
}
}
case "noPeer":
{
{
region = &pd.Region{
Meta: &metapb.Region{
Peers: nil,
},
}
}
}
}
})
if err != nil || region == nil {
logutil.CL(ctx).Error("find region failed", zap.Error(err), zap.Reflect("region", region))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
return errors.Annotate(berrors.ErrPDLeaderNotFound, "cannot find region from pd client")
}
if len(targetStoreIds) == 0 {
if region.Leader != nil {
logutil.CL(ctx).Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
leader = region.Leader
return nil
}
} else {
candidates := make([]*metapb.Peer, 0, len(region.Meta.Peers))
Expand All @@ -1040,19 +1091,18 @@ func (bc *Client) findTargetPeer(ctx context.Context, key []byte, isRawKv bool,
peer := candidates[rand.Intn(len(candidates))]
logutil.CL(ctx).Info("find target peer for backup",
zap.Reflect("Peer", peer), logutil.Key("key", key))
return peer, nil
leader = peer
return nil
}
}

logutil.CL(ctx).Warn("fail to find a target peer", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(1000*i))
continue
}
logutil.CL(ctx).Error("can not find a valid target peer", logutil.Key("key", key))
if len(targetStoreIds) == 0 {
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find a valid leader for key %s", key)
return errors.Annotate(berrors.ErrPDLeaderNotFound, "cannot find leader or candidate from pd client")
}, &state)
if err != nil {
logutil.CL(ctx).Error("can not find a valid target peer after retry", logutil.Key("key", key))
return nil, err
}
return nil, errors.Errorf("can not find a valid target peer for key %s", key)
// leader cannot be nil if err is nil
return leader, nil
}

func (bc *Client) fineGrainedBackup(
Expand Down Expand Up @@ -1226,7 +1276,7 @@ func (bc *Client) handleFineGrained(
targetStoreIds map[uint64]struct{},
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
targetPeer, pderr := bc.findTargetPeer(ctx, req.StartKey, req.IsRawKv, targetStoreIds)
targetPeer, pderr := bc.FindTargetPeer(ctx, req.StartKey, req.IsRawKv, targetStoreIds)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand Down
65 changes: 64 additions & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -41,6 +42,7 @@ type testBackup struct {
cancel context.CancelFunc

mockPDClient pd.Client
mockCluster *testutils.MockCluster
mockGlue *gluetidb.MockGlue
backupClient *backup.Client

Expand All @@ -49,11 +51,12 @@ type testBackup struct {
}

func createBackupSuite(t *testing.T) *testBackup {
tikvClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
tikvClient, mockCluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
s := new(testBackup)
s.mockGlue = &gluetidb.MockGlue{}
s.mockPDClient = pdClient
s.mockCluster = mockCluster
s.ctx, s.cancel = context.WithCancel(context.Background())
mockMgr := &conn.Mgr{PdController: &pdutil.PdController{}}
mockMgr.SetPDClient(s.mockPDClient)
Expand Down Expand Up @@ -334,3 +337,63 @@ func TestCheckBackupIsLocked(t *testing.T) {
require.Error(t, err)
require.Regexp(t, "backup lock file and sst file exist in(.+)", err.Error())
}

func TestFindTargetPeer(t *testing.T) {
s := createBackupSuite(t)

ctx := context.Background()
testutils.BootstrapWithMultiRegions(s.mockCluster, []byte("g"), []byte("n"), []byte("t"))

leader1, err := s.backupClient.FindTargetPeer(ctx, []byte("a"), false, nil)
require.NoError(t, err)

leader2, err := s.backupClient.FindTargetPeer(ctx, []byte("b"), false, nil)
require.NoError(t, err)

// check passed keys on same region
require.Equal(t, leader1.GetId(), leader2.GetId())

failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)")
failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"hasLeader\")")

leader, err := s.backupClient.FindTargetPeer(ctx, []byte("m"), false, nil)
require.NoError(t, err)
// check passed keys on find leader after retry
require.Equal(t, 42, int(leader.GetId()))

failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer")

failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)")
failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "return(\"noLeader\")")

leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, nil)
// check passed keys with error on find leader after retry
require.ErrorContains(t, err, "cannot find leader")

failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer")

failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)")
failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"hasPeer\")")

storeIDMap := make(map[uint64]struct{})
storeIDMap[42] = struct{}{}
leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, storeIDMap)
require.NoError(t, err)
// check passed keys with target peer
require.Equal(t, 43, int(leader.GetId()))

failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer")

failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer", "return(2)")
failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer", "1*return(\"nil\")->1*return(\"noPeer\")")

leader, err = s.backupClient.FindTargetPeer(ctx, []byte("m"), false, storeIDMap)
// check passed keys with error and cannot find target peer
require.ErrorContains(t, err, "cannot find leader")

failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/retry-state-on-find-target-peer")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/return-region-on-find-target-peer")
}
4 changes: 3 additions & 1 deletion br/pkg/utils/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ const (
// (How about network partition between TiKV and PD? Even that is rare.)
// Also note that the offline threshold in PD is 20s, see
// https://github.com/tikv/pd/blob/c40e319f50822678cda71ae62ee2fd70a9cac010/pkg/core/store.go#L523
storeDisconnectionDuration = 100 * time.Second

// After talk to PD members 100s is not a safe number. set it to 600s
storeDisconnectionDuration = 600 * time.Second
)

// IsTypeCompatible checks whether type target is compatible with type src
Expand Down

0 comments on commit 7e5a5b2

Please sign in to comment.