Skip to content

Commit

Permalink
br: move ScatterRegions into split package (#51614)
Browse files Browse the repository at this point in the history
ref #51533
  • Loading branch information
lance6716 authored Mar 11, 2024
1 parent 7be9a1e commit 5cb6c0e
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 337 deletions.
115 changes: 57 additions & 58 deletions br/pkg/restore/import_retry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

package restore_test
package restore

import (
"context"
Expand All @@ -18,7 +18,6 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -58,35 +57,35 @@ func TestScanSuccess(t *testing.T) {
ctx := context.Background()

// make exclusive to inclusive.
ctl := restore.OverRegionsInRange([]byte("aa"), []byte("aay"), cli, &rs)
ctl := OverRegionsInRange([]byte("aa"), []byte("aay"), cli, &rs)
collectedRegions := []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "", "aay", "bba")

ctl = restore.OverRegionsInRange([]byte("aaz"), []byte("bb"), cli, &rs)
ctl = OverRegionsInRange([]byte("aaz"), []byte("bb"), cli, &rs)
collectedRegions = []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "aay", "bba", "bbh", "cca")

ctl = restore.OverRegionsInRange([]byte("aa"), []byte("cc"), cli, &rs)
ctl = OverRegionsInRange([]byte("aa"), []byte("cc"), cli, &rs)
collectedRegions = []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "", "aay", "bba", "bbh", "cca", "")

ctl = restore.OverRegionsInRange([]byte("aa"), []byte(""), cli, &rs)
ctl = OverRegionsInRange([]byte("aa"), []byte(""), cli, &rs)
collectedRegions = []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "", "aay", "bba", "bbh", "cca", "")
}
Expand All @@ -95,7 +94,7 @@ func TestNotLeader(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(1, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

notLeader := errorpb.Error{
Expand All @@ -109,17 +108,17 @@ func TestNotLeader(t *testing.T) {
meetRegions := []*split.RegionInfo{}
// record all regions we meet with id == 2.
idEqualsTo2Regions := []*split.RegionInfo{}
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if r.Region.Id == 2 {
idEqualsTo2Regions = append(idEqualsTo2Regions, r)
}
if r.Region.Id == 2 && (r.Leader == nil || r.Leader.Id != 42) {
return restore.RPCResult{
return RPCResult{
StoreError: &notLeader,
}
}
meetRegions = append(meetRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})

require.NoError(t, err)
Expand All @@ -135,7 +134,7 @@ func TestServerIsBusy(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

serverIsBusy := errorpb.Error{
Expand All @@ -149,16 +148,16 @@ func TestServerIsBusy(t *testing.T) {
// record all regions we meet with id == 2.
idEqualsTo2Regions := []*split.RegionInfo{}
theFirstRun := true
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if theFirstRun && r.Region.Id == 2 {
idEqualsTo2Regions = append(idEqualsTo2Regions, r)
theFirstRun = false
return restore.RPCResult{
return RPCResult{
StoreError: &serverIsBusy,
}
}
meetRegions = append(meetRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})

require.NoError(t, err)
Expand All @@ -176,7 +175,7 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

serverIsBusy := errorpb.Error{
Expand All @@ -190,16 +189,16 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) {
// record all regions we meet with id == 2.
idEqualsTo2Regions := []*split.RegionInfo{}
theFirstRun := true
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if theFirstRun && r.Region.Id == 2 {
idEqualsTo2Regions = append(idEqualsTo2Regions, r)
theFirstRun = false
return restore.RPCResult{
return RPCResult{
StoreError: &serverIsBusy,
}
}
meetRegions = append(meetRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})

require.NoError(t, err)
Expand Down Expand Up @@ -228,7 +227,7 @@ func TestEpochNotMatch(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

printPDRegion("cli", cli.regionsInfo.Regions)
Expand Down Expand Up @@ -262,18 +261,18 @@ func TestEpochNotMatch(t *testing.T) {
firstRunRegions := []*split.RegionInfo{}
secondRunRegions := []*split.RegionInfo{}
isSecondRun := false
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if !isSecondRun && r.Region.Id == left.Region.Id {
mergeRegion()
isSecondRun = true
return restore.RPCResultFromPBError(epochNotMatch)
return RPCResultFromPBError(epochNotMatch)
}
if isSecondRun {
secondRunRegions = append(secondRunRegions, r)
} else {
firstRunRegions = append(firstRunRegions, r)
}
return restore.RPCResultOK()
return RPCResultOK()
})
printRegion("first", firstRunRegions)
printRegion("second", secondRunRegions)
Expand All @@ -287,7 +286,7 @@ func TestRegionSplit(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

printPDRegion("cli", cli.regionsInfo.Regions)
Expand Down Expand Up @@ -338,18 +337,18 @@ func TestRegionSplit(t *testing.T) {
firstRunRegions := []*split.RegionInfo{}
secondRunRegions := []*split.RegionInfo{}
isSecondRun := false
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if !isSecondRun && r.Region.Id == target.Region.Id {
splitRegion()
isSecondRun = true
return restore.RPCResultFromPBError(epochNotMatch)
return RPCResultFromPBError(epochNotMatch)
}
if isSecondRun {
secondRunRegions = append(secondRunRegions, r)
} else {
firstRunRegions = append(firstRunRegions, r)
}
return restore.RPCResultOK()
return RPCResultOK()
})
printRegion("first", firstRunRegions)
printRegion("second", secondRunRegions)
Expand All @@ -363,7 +362,7 @@ func TestRetryBackoff(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, time.Millisecond, 10*time.Millisecond)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

printPDRegion("cli", cli.regionsInfo.Regions)
Expand All @@ -380,12 +379,12 @@ func TestRetryBackoff(t *testing.T) {
},
}}
isSecondRun := false
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if !isSecondRun && r.Region.Id == left.Region.Id {
isSecondRun = true
return restore.RPCResultFromPBError(epochNotLeader)
return RPCResultFromPBError(epochNotLeader)
}
return restore.RPCResultOK()
return RPCResultOK()
})
printPDRegion("cli", cli.regionsInfo.Regions)
require.Equal(t, 1, rs.Attempt())
Expand All @@ -395,10 +394,10 @@ func TestRetryBackoff(t *testing.T) {
}

func TestWrappedError(t *testing.T) {
result := restore.RPCResultFromError(errors.Trace(status.Error(codes.Unavailable, "the server is slacking. ><=·>")))
require.Equal(t, result.StrategyForRetry(), restore.StrategyFromThisRegion)
result = restore.RPCResultFromError(errors.Trace(status.Error(codes.Unknown, "the server said something hard to understand")))
require.Equal(t, result.StrategyForRetry(), restore.StrategyGiveUp)
result := RPCResultFromError(errors.Trace(status.Error(codes.Unavailable, "the server is slacking. ><=·>")))
require.Equal(t, result.StrategyForRetry(), StrategyFromThisRegion)
result = RPCResultFromError(errors.Trace(status.Error(codes.Unknown, "the server said something hard to understand")))
require.Equal(t, result.StrategyForRetry(), StrategyGiveUp)
}

func envInt(name string, def int) int {
Expand All @@ -414,22 +413,22 @@ func TestPaginateScanLeader(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, time.Millisecond, 10*time.Millisecond)
ctl := restore.OverRegionsInRange([]byte("aa"), []byte("aaz"), cli, &rs)
ctl := OverRegionsInRange([]byte("aa"), []byte("aaz"), cli, &rs)
ctx := context.Background()

cli.InjectErr = true
cli.InjectTimes = int32(envInt("PAGINATE_SCAN_LEADER_FAILURE_COUNT", 2))
collectedRegions := []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "", "aay", "bba")
}

func TestImportKVFiles(t *testing.T) {
var (
importer = restore.FileImporter{}
importer = FileImporter{}
ctx = context.Background()
shiftStartTS uint64 = 100
startTS uint64 = 200
Expand All @@ -438,7 +437,7 @@ func TestImportKVFiles(t *testing.T) {

err := importer.ImportKVFiles(
ctx,
[]*restore.LogDataFileInfo{
[]*LogDataFileInfo{
{
DataFileInfo: &backuppb.DataFileInfo{
Path: "log3",
Expand All @@ -460,7 +459,7 @@ func TestImportKVFiles(t *testing.T) {
}

func TestFilterFilesByRegion(t *testing.T) {
files := []*restore.LogDataFileInfo{
files := []*LogDataFileInfo{
{
DataFileInfo: &backuppb.DataFileInfo{
Path: "log3",
Expand All @@ -484,7 +483,7 @@ func TestFilterFilesByRegion(t *testing.T) {

testCases := []struct {
r split.RegionInfo
subfiles []*restore.LogDataFileInfo
subfiles []*LogDataFileInfo
err error
}{
{
Expand All @@ -494,7 +493,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("1110"),
},
},
subfiles: []*restore.LogDataFileInfo{},
subfiles: []*LogDataFileInfo{},
err: nil,
},
{
Expand All @@ -504,7 +503,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("1111"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[0],
},
err: nil,
Expand All @@ -516,7 +515,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("2222"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[0],
},
err: nil,
Expand All @@ -528,7 +527,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("3332"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[0],
},
err: nil,
Expand All @@ -540,7 +539,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("3332"),
},
},
subfiles: []*restore.LogDataFileInfo{},
subfiles: []*LogDataFileInfo{},
err: nil,
},
{
Expand All @@ -550,7 +549,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("3333"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[1],
},
err: nil,
Expand All @@ -562,7 +561,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("5555"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[1],
},
err: nil,
Expand All @@ -574,7 +573,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: nil,
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[1],
},
err: nil,
Expand All @@ -592,7 +591,7 @@ func TestFilterFilesByRegion(t *testing.T) {
}

for _, c := range testCases {
subfile, err := restore.FilterFilesByRegion(files, ranges, &c.r)
subfile, err := FilterFilesByRegion(files, ranges, &c.r)
require.Equal(t, err, c.err)
require.Equal(t, subfile, c.subfiles)
}
Expand Down
Loading

0 comments on commit 5cb6c0e

Please sign in to comment.