Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: move ScatterRegions into split package #51614

Merged
merged 4 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 57 additions & 58 deletions br/pkg/restore/import_retry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

package restore_test
package restore

import (
"context"
Expand All @@ -18,7 +18,6 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -58,35 +57,35 @@ func TestScanSuccess(t *testing.T) {
ctx := context.Background()

// make exclusive to inclusive.
ctl := restore.OverRegionsInRange([]byte("aa"), []byte("aay"), cli, &rs)
ctl := OverRegionsInRange([]byte("aa"), []byte("aay"), cli, &rs)
collectedRegions := []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "", "aay", "bba")

ctl = restore.OverRegionsInRange([]byte("aaz"), []byte("bb"), cli, &rs)
ctl = OverRegionsInRange([]byte("aaz"), []byte("bb"), cli, &rs)
collectedRegions = []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "aay", "bba", "bbh", "cca")

ctl = restore.OverRegionsInRange([]byte("aa"), []byte("cc"), cli, &rs)
ctl = OverRegionsInRange([]byte("aa"), []byte("cc"), cli, &rs)
collectedRegions = []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "", "aay", "bba", "bbh", "cca", "")

ctl = restore.OverRegionsInRange([]byte("aa"), []byte(""), cli, &rs)
ctl = OverRegionsInRange([]byte("aa"), []byte(""), cli, &rs)
collectedRegions = []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "", "aay", "bba", "bbh", "cca", "")
}
Expand All @@ -95,7 +94,7 @@ func TestNotLeader(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(1, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

notLeader := errorpb.Error{
Expand All @@ -109,17 +108,17 @@ func TestNotLeader(t *testing.T) {
meetRegions := []*split.RegionInfo{}
// record all regions we meet with id == 2.
idEqualsTo2Regions := []*split.RegionInfo{}
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if r.Region.Id == 2 {
idEqualsTo2Regions = append(idEqualsTo2Regions, r)
}
if r.Region.Id == 2 && (r.Leader == nil || r.Leader.Id != 42) {
return restore.RPCResult{
return RPCResult{
StoreError: &notLeader,
}
}
meetRegions = append(meetRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})

require.NoError(t, err)
Expand All @@ -135,7 +134,7 @@ func TestServerIsBusy(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

serverIsBusy := errorpb.Error{
Expand All @@ -149,16 +148,16 @@ func TestServerIsBusy(t *testing.T) {
// record all regions we meet with id == 2.
idEqualsTo2Regions := []*split.RegionInfo{}
theFirstRun := true
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if theFirstRun && r.Region.Id == 2 {
idEqualsTo2Regions = append(idEqualsTo2Regions, r)
theFirstRun = false
return restore.RPCResult{
return RPCResult{
StoreError: &serverIsBusy,
}
}
meetRegions = append(meetRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})

require.NoError(t, err)
Expand All @@ -176,7 +175,7 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

serverIsBusy := errorpb.Error{
Expand All @@ -190,16 +189,16 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) {
// record all regions we meet with id == 2.
idEqualsTo2Regions := []*split.RegionInfo{}
theFirstRun := true
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if theFirstRun && r.Region.Id == 2 {
idEqualsTo2Regions = append(idEqualsTo2Regions, r)
theFirstRun = false
return restore.RPCResult{
return RPCResult{
StoreError: &serverIsBusy,
}
}
meetRegions = append(meetRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})

require.NoError(t, err)
Expand Down Expand Up @@ -228,7 +227,7 @@ func TestEpochNotMatch(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

printPDRegion("cli", cli.regionsInfo.Regions)
Expand Down Expand Up @@ -262,18 +261,18 @@ func TestEpochNotMatch(t *testing.T) {
firstRunRegions := []*split.RegionInfo{}
secondRunRegions := []*split.RegionInfo{}
isSecondRun := false
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if !isSecondRun && r.Region.Id == left.Region.Id {
mergeRegion()
isSecondRun = true
return restore.RPCResultFromPBError(epochNotMatch)
return RPCResultFromPBError(epochNotMatch)
}
if isSecondRun {
secondRunRegions = append(secondRunRegions, r)
} else {
firstRunRegions = append(firstRunRegions, r)
}
return restore.RPCResultOK()
return RPCResultOK()
})
printRegion("first", firstRunRegions)
printRegion("second", secondRunRegions)
Expand All @@ -287,7 +286,7 @@ func TestRegionSplit(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, 0, 0)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

printPDRegion("cli", cli.regionsInfo.Regions)
Expand Down Expand Up @@ -338,18 +337,18 @@ func TestRegionSplit(t *testing.T) {
firstRunRegions := []*split.RegionInfo{}
secondRunRegions := []*split.RegionInfo{}
isSecondRun := false
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if !isSecondRun && r.Region.Id == target.Region.Id {
splitRegion()
isSecondRun = true
return restore.RPCResultFromPBError(epochNotMatch)
return RPCResultFromPBError(epochNotMatch)
}
if isSecondRun {
secondRunRegions = append(secondRunRegions, r)
} else {
firstRunRegions = append(firstRunRegions, r)
}
return restore.RPCResultOK()
return RPCResultOK()
})
printRegion("first", firstRunRegions)
printRegion("second", secondRunRegions)
Expand All @@ -363,7 +362,7 @@ func TestRetryBackoff(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, time.Millisecond, 10*time.Millisecond)
ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs)
ctx := context.Background()

printPDRegion("cli", cli.regionsInfo.Regions)
Expand All @@ -380,12 +379,12 @@ func TestRetryBackoff(t *testing.T) {
},
}}
isSecondRun := false
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
if !isSecondRun && r.Region.Id == left.Region.Id {
isSecondRun = true
return restore.RPCResultFromPBError(epochNotLeader)
return RPCResultFromPBError(epochNotLeader)
}
return restore.RPCResultOK()
return RPCResultOK()
})
printPDRegion("cli", cli.regionsInfo.Regions)
require.Equal(t, 1, rs.Attempt())
Expand All @@ -395,10 +394,10 @@ func TestRetryBackoff(t *testing.T) {
}

func TestWrappedError(t *testing.T) {
result := restore.RPCResultFromError(errors.Trace(status.Error(codes.Unavailable, "the server is slacking. ><=·>")))
require.Equal(t, result.StrategyForRetry(), restore.StrategyFromThisRegion)
result = restore.RPCResultFromError(errors.Trace(status.Error(codes.Unknown, "the server said something hard to understand")))
require.Equal(t, result.StrategyForRetry(), restore.StrategyGiveUp)
result := RPCResultFromError(errors.Trace(status.Error(codes.Unavailable, "the server is slacking. ><=·>")))
require.Equal(t, result.StrategyForRetry(), StrategyFromThisRegion)
result = RPCResultFromError(errors.Trace(status.Error(codes.Unknown, "the server said something hard to understand")))
require.Equal(t, result.StrategyForRetry(), StrategyGiveUp)
}

func envInt(name string, def int) int {
Expand All @@ -414,22 +413,22 @@ func TestPaginateScanLeader(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
rs := utils.InitialRetryState(2, time.Millisecond, 10*time.Millisecond)
ctl := restore.OverRegionsInRange([]byte("aa"), []byte("aaz"), cli, &rs)
ctl := OverRegionsInRange([]byte("aa"), []byte("aaz"), cli, &rs)
ctx := context.Background()

cli.InjectErr = true
cli.InjectTimes = int32(envInt("PAGINATE_SCAN_LEADER_FAILURE_COUNT", 2))
collectedRegions := []*split.RegionInfo{}
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult {
ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
collectedRegions = append(collectedRegions, r)
return restore.RPCResultOK()
return RPCResultOK()
})
assertRegions(t, collectedRegions, "", "aay", "bba")
}

func TestImportKVFiles(t *testing.T) {
var (
importer = restore.FileImporter{}
importer = FileImporter{}
ctx = context.Background()
shiftStartTS uint64 = 100
startTS uint64 = 200
Expand All @@ -438,7 +437,7 @@ func TestImportKVFiles(t *testing.T) {

err := importer.ImportKVFiles(
ctx,
[]*restore.LogDataFileInfo{
[]*LogDataFileInfo{
{
DataFileInfo: &backuppb.DataFileInfo{
Path: "log3",
Expand All @@ -460,7 +459,7 @@ func TestImportKVFiles(t *testing.T) {
}

func TestFilterFilesByRegion(t *testing.T) {
files := []*restore.LogDataFileInfo{
files := []*LogDataFileInfo{
{
DataFileInfo: &backuppb.DataFileInfo{
Path: "log3",
Expand All @@ -484,7 +483,7 @@ func TestFilterFilesByRegion(t *testing.T) {

testCases := []struct {
r split.RegionInfo
subfiles []*restore.LogDataFileInfo
subfiles []*LogDataFileInfo
err error
}{
{
Expand All @@ -494,7 +493,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("1110"),
},
},
subfiles: []*restore.LogDataFileInfo{},
subfiles: []*LogDataFileInfo{},
err: nil,
},
{
Expand All @@ -504,7 +503,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("1111"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[0],
},
err: nil,
Expand All @@ -516,7 +515,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("2222"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[0],
},
err: nil,
Expand All @@ -528,7 +527,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("3332"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[0],
},
err: nil,
Expand All @@ -540,7 +539,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("3332"),
},
},
subfiles: []*restore.LogDataFileInfo{},
subfiles: []*LogDataFileInfo{},
err: nil,
},
{
Expand All @@ -550,7 +549,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("3333"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[1],
},
err: nil,
Expand All @@ -562,7 +561,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: []byte("5555"),
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[1],
},
err: nil,
Expand All @@ -574,7 +573,7 @@ func TestFilterFilesByRegion(t *testing.T) {
EndKey: nil,
},
},
subfiles: []*restore.LogDataFileInfo{
subfiles: []*LogDataFileInfo{
files[1],
},
err: nil,
Expand All @@ -592,7 +591,7 @@ func TestFilterFilesByRegion(t *testing.T) {
}

for _, c := range testCases {
subfile, err := restore.FilterFilesByRegion(files, ranges, &c.r)
subfile, err := FilterFilesByRegion(files, ranges, &c.r)
require.Equal(t, err, c.err)
require.Equal(t, subfile, c.subfiles)
}
Expand Down
Loading