From bd5d90d22fafdf7959378c9c184419390482d1df Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 8 Mar 2024 12:12:33 +0800 Subject: [PATCH 1/3] --wip-- [skip ci] --- br/pkg/restore/split.go | 105 +++-------------------------- br/pkg/restore/split/client.go | 89 ++++++++++++++++++++++-- br/pkg/restore/split/split_test.go | 75 +++++++++++++++++++-- br/pkg/restore/split_test.go | 67 ------------------ 4 files changed, 162 insertions(+), 174 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 0790999e8be24..43052e8ef6588 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "sort" - "strconv" "strings" "sync" "time" @@ -24,11 +23,8 @@ import ( "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" - "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) type Granularity string @@ -254,7 +250,10 @@ func (rs *RegionSplitter) splitAndScatterRegions( } return nil, errors.Trace(err) } - rs.ScatterRegions(ctx, append(newRegions, regionInfo)) + err2 := rs.client.ScatterRegions(ctx, append(newRegions, regionInfo)) + if err2 != nil { + log.Warn("failed to scatter regions", zap.Error(err2)) + } return newRegions, nil } @@ -274,32 +273,6 @@ func (rs *RegionSplitter) splitRegions( return newRegions, nil } -// scatterRegions scatter the regions. -// for same reason just log and ignore error. -// See the comments of function waitRegionScattered. -func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*split.RegionInfo) { - log.Info("start to scatter regions", zap.Int("regions", len(newRegions))) - // the retry is for the temporary network errors during sending request. - err := utils.WithRetry(ctx, func() error { - err := rs.client.ScatterRegions(ctx, newRegions) - if isUnsupportedError(err) { - log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) - rs.ScatterRegionsSequentially( - ctx, newRegions, - // backoff about 6s, or we give up scattering this region. - &split.ExponentialBackoffer{ - Attempts: 7, - BaseBackoff: 100 * time.Millisecond, - }) - return nil - } - return err - }, &split.ExponentialBackoffer{Attempts: 3, BaseBackoff: 500 * time.Millisecond}) - if err != nil { - log.Warn("failed to scatter regions", logutil.ShortError(err)) - } -} - // waitRegionsSplitted check multiple regions have finished the split. func (rs *RegionSplitter) waitRegionsSplitted(ctx context.Context, splitRegions []*split.RegionInfo) { // Wait for a while until the regions successfully split. @@ -350,50 +323,6 @@ func (rs *RegionSplitter) waitRegionsScattered(ctx context.Context, scatterRegio } } -// ScatterRegionsSequentially scatter the region with some backoffer. -// This function is for testing the retry mechanism. -// For a real cluster, directly use ScatterRegions would be fine. -func (rs *RegionSplitter) ScatterRegionsSequentially(ctx context.Context, newRegions []*split.RegionInfo, backoffer utils.Backoffer) { - newRegionSet := make(map[uint64]*split.RegionInfo, len(newRegions)) - for _, newRegion := range newRegions { - newRegionSet[newRegion.Region.Id] = newRegion - } - - if err := utils.WithRetry(ctx, func() error { - log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet))) - var errs error - for _, region := range newRegionSet { - err := rs.client.ScatterRegion(ctx, region) - if err == nil { - // it is safe according to the Go language spec. - delete(newRegionSet, region.Region.Id) - } else if !split.PdErrorCanRetry(err) { - log.Warn("scatter meet error cannot be retried, skipping", - logutil.ShortError(err), - logutil.Region(region.Region), - ) - delete(newRegionSet, region.Region.Id) - } - errs = multierr.Append(errs, err) - } - return errs - }, backoffer); err != nil { - log.Warn("Some regions haven't been scattered because errors.", - zap.Int("count", len(newRegionSet)), - // if all region are failed to scatter, the short error might also be verbose... - logutil.ShortError(err), - logutil.AbbreviatedArray("failed-regions", newRegionSet, func(i any) []string { - m := i.(map[uint64]*split.RegionInfo) - result := make([]string, 0, len(m)) - for id := range m { - result = append(result, strconv.Itoa(int(id))) - } - return result - }), - ) - } -} - // hasHealthyRegion is used to check whether region splitted success func (rs *RegionSplitter) hasHealthyRegion(ctx context.Context, regionID uint64) (bool, error) { regionInfo, err := rs.client.GetRegionByID(ctx, regionID) @@ -462,7 +391,10 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi } if len(reScatterRegions) > 0 { - rs.ScatterRegions(ctx, reScatterRegions) + err2 := rs.client.ScatterRegions(ctx, reScatterRegions) + if err2 != nil { + log.Warn("failed to scatter regions", zap.Error(err2)) + } } if time.Since(startTime) > timeout { @@ -950,27 +882,6 @@ func (splitIter *LogFilesIterWithSplitHelper) TryNext(ctx context.Context) iter. return res } -// isUnsupportedError checks whether we should fallback to ScatterRegion API when meeting the error. -func isUnsupportedError(err error) bool { - s, ok := status.FromError(errors.Cause(err)) - if !ok { - // Not a gRPC error. Something other went wrong. - return false - } - // In two conditions, we fallback to ScatterRegion: - // (1) If the RPC endpoint returns UNIMPLEMENTED. (This is just for making test cases not be so magic.) - // (2) If the Message is "region 0 not found": - // In fact, PD reuses the gRPC endpoint `ScatterRegion` for the batch version of scattering. - // When the request contains the field `regionIDs`, it would use the batch version, - // Otherwise, it uses the old version and scatter the region with `regionID` in the request. - // When facing 4.x, BR(which uses v5.x PD clients and call `ScatterRegions`!) would set `regionIDs` - // which would be ignored by protocol buffers, and leave the `regionID` be zero. - // Then the older version of PD would try to search the region with ID 0. - // (Then it consistently fails, and returns "region 0 not found".) - return s.Code() == codes.Unimplemented || - strings.Contains(s.Message(), "region 0 not found") -} - type splitBackoffer struct { state utils.RetryState } diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index cad86e6602cca..edfd18fd484c4 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "crypto/tls" + "strconv" "strings" "sync" "time" @@ -23,6 +24,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/utils" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" "go.uber.org/multierr" @@ -138,10 +140,27 @@ func (c *pdClient) needScatter(ctx context.Context) bool { return c.needScatterVal } -// ScatterRegions scatters regions in a batch. -func (c *pdClient) ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error { - c.mu.Lock() - defer c.mu.Unlock() +func (c *pdClient) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) error { + log.Info("scatter regions", zap.Int("regions", len(newRegions))) + // the retry is for the temporary network errors during sending request. + return utils.WithRetry(ctx, func() error { + err := c.scatterRegions(ctx, newRegions) + if isUnsupportedError(err) { + log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) + c.scatterRegionsSequentially( + ctx, newRegions, + // backoff about 6s, or we give up scattering this region. + &ExponentialBackoffer{ + Attempts: 7, + BaseBackoff: 100 * time.Millisecond, + }) + return nil + } + return err + }, &ExponentialBackoffer{Attempts: 3, BaseBackoff: 500 * time.Millisecond}) +} + +func (c *pdClient) scatterRegions(ctx context.Context, regionInfo []*RegionInfo) error { regionsID := make([]uint64, 0, len(regionInfo)) for _, v := range regionInfo { regionsID = append(regionsID, v.Region.Id) @@ -564,6 +583,47 @@ func (c *pdClient) SetStoresLabel( return nil } +func (c *pdClient) scatterRegionsSequentially(ctx context.Context, newRegions []*RegionInfo, backoffer utils.Backoffer) { + newRegionSet := make(map[uint64]*RegionInfo, len(newRegions)) + for _, newRegion := range newRegions { + newRegionSet[newRegion.Region.Id] = newRegion + } + + if err := utils.WithRetry(ctx, func() error { + log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet))) + var errs error + for _, region := range newRegionSet { + err := c.ScatterRegion(ctx, region) + if err == nil { + // it is safe according to the Go language spec. + delete(newRegionSet, region.Region.Id) + } else if !PdErrorCanRetry(err) { + log.Warn("scatter meet error cannot be retried, skipping", + logutil.ShortError(err), + logutil.Region(region.Region), + ) + delete(newRegionSet, region.Region.Id) + } + errs = multierr.Append(errs, err) + } + return errs + }, backoffer); err != nil { + log.Warn("Some regions haven't been scattered because errors.", + zap.Int("count", len(newRegionSet)), + // if all region are failed to scatter, the short error might also be verbose... + logutil.ShortError(err), + logutil.AbbreviatedArray("failed-regions", newRegionSet, func(i any) []string { + m := i.(map[uint64]*RegionInfo) + result := make([]string, 0, len(m)) + for id := range m { + result = append(result, strconv.Itoa(int(id))) + } + return result + }), + ) + } +} + func (c *pdClient) IsScatterRegionFinished( ctx context.Context, regionID uint64, @@ -665,3 +725,24 @@ func (b *ExponentialBackoffer) NextBackoff(error) time.Duration { func (b *ExponentialBackoffer) Attempt() int { return b.Attempts } + +// isUnsupportedError checks whether we should fallback to ScatterRegion API when meeting the error. +func isUnsupportedError(err error) bool { + s, ok := status.FromError(errors.Cause(err)) + if !ok { + // Not a gRPC error. Something other went wrong. + return false + } + // In two conditions, we fallback to ScatterRegion: + // (1) If the RPC endpoint returns UNIMPLEMENTED. (This is just for making test cases not be so magic.) + // (2) If the Message is "region 0 not found": + // In fact, PD reuses the gRPC endpoint `ScatterRegion` for the batch version of scattering. + // When the request contains the field `regionIDs`, it would use the batch version, + // Otherwise, it uses the old version and scatter the region with `regionID` in the request. + // When facing 4.x, BR(which uses v5.x PD clients and call `ScatterRegions`!) would set `regionIDs` + // which would be ignored by protocol buffers, and leave the `regionID` be zero. + // Then the older version of PD would try to search the region with ID 0. + // (Then it consistently fails, and returns "region 0 not found".) + return s.Code() == codes.Unimplemented || + strings.Contains(s.Message(), "region 0 not found") +} diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index 1b76a9fafc693..3b1c9470222c4 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -1,20 +1,24 @@ // Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. -package split_test +package split import ( "context" "testing" + "time" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestScanRegionBackOfferWithSuccess(t *testing.T) { var counter int - bo := split.NewWaitRegionOnlineBackoffer() + bo := NewWaitRegionOnlineBackoffer() err := utils.WithRetry(context.Background(), func() error { defer func() { @@ -37,7 +41,7 @@ func TestScanRegionBackOfferWithFail(t *testing.T) { }() var counter int - bo := split.NewWaitRegionOnlineBackoffer() + bo := NewWaitRegionOnlineBackoffer() err := utils.WithRetry(context.Background(), func() error { defer func() { @@ -46,7 +50,7 @@ func TestScanRegionBackOfferWithFail(t *testing.T) { return berrors.ErrPDBatchScanRegion }, bo) require.Error(t, err) - require.Equal(t, counter, split.WaitRegionOnlineAttemptTimes) + require.Equal(t, counter, WaitRegionOnlineAttemptTimes) } func TestScanRegionBackOfferWithStopRetry(t *testing.T) { @@ -56,7 +60,7 @@ func TestScanRegionBackOfferWithStopRetry(t *testing.T) { }() var counter int - bo := split.NewWaitRegionOnlineBackoffer() + bo := NewWaitRegionOnlineBackoffer() err := utils.WithRetry(context.Background(), func() error { defer func() { @@ -71,3 +75,62 @@ func TestScanRegionBackOfferWithStopRetry(t *testing.T) { require.Error(t, err) require.Equal(t, counter, 6) } + +type mockScatterPDClient struct { + pd.Client + failed map[uint64]int + failedBefore int +} + +func (c *mockScatterPDClient) ScatterRegion(ctx context.Context, regionID uint64) error { + if c.failed == nil { + c.failed = make(map[uint64]int) + } + c.failed[regionID]++ + if c.failed[regionID] > c.failedBefore { + return nil + } + return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionID) +} + +type recordCntBackoffer struct { + already int +} + +func (b *recordCntBackoffer) NextBackoff(err error) time.Duration { + b.already++ + return 0 +} + +func (b *recordCntBackoffer) Attempt() int { + return 100 +} + +func TestScatterSequentiallyRetryCnt(t *testing.T) { + client := pdClient{ + needScatterVal: true, + client: &mockScatterPDClient{failedBefore: 7}, + } + client.needScatterInit.Do(func() {}) + + ctx := context.Background() + regions := []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 1, + }, + }, + { + Region: &metapb.Region{ + Id: 2, + }, + }, + } + backoffer := &recordCntBackoffer{} + client.scatterRegionsSequentially( + ctx, + regions, + backoffer, + ) + require.Equal(t, 7, backoffer.already) +} diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 15a5f0a12663d..42008849ec58d 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/store/pdtypes" @@ -254,34 +253,6 @@ func (c *TestClient) IsScatterRegionFinished( return split.IsScatterRegionFinished(resp) } -type assertRetryLessThanBackoffer struct { - max int - already int - t *testing.T -} - -func assertRetryLessThan(t *testing.T, times int) utils.Backoffer { - return &assertRetryLessThanBackoffer{ - max: times, - already: 0, - t: t, - } -} - -// NextBackoff returns a duration to wait before retrying again -func (b *assertRetryLessThanBackoffer) NextBackoff(err error) time.Duration { - b.already++ - if b.already >= b.max { - b.t.Logf("retry more than %d time: test failed", b.max) - b.t.FailNow() - } - return 0 -} - -// Attempt returns the remain attempt times -func (b *assertRetryLessThanBackoffer) Attempt() int { - return b.max - b.already -} func TestScanEmptyRegion(t *testing.T) { client := initTestClient(false) ranges := initRanges() @@ -296,44 +267,6 @@ func TestScanEmptyRegion(t *testing.T) { require.NoError(t, err) } -func TestScatterFinishInTime(t *testing.T) { - client := initTestClient(false) - ranges := initRanges() - rewriteRules := initRewriteRules() - regionSplitter := restore.NewRegionSplitter(client) - - ctx := context.Background() - err := regionSplitter.ExecuteSplit(ctx, ranges, rewriteRules, 1, false, func(key [][]byte) {}) - require.NoError(t, err) - regions := client.GetAllRegions() - if !validateRegions(regions) { - for _, region := range regions { - t.Logf("region: %v\n", region.Region) - } - t.Log("get wrong result") - t.Fail() - } - - regionInfos := make([]*split.RegionInfo, 0, len(regions)) - for _, info := range regions { - regionInfos = append(regionInfos, info) - } - failed := map[uint64]int{} - client.injectInScatter = func(r *split.RegionInfo) error { - failed[r.Region.Id]++ - if failed[r.Region.Id] > 7 { - return nil - } - return status.Errorf(codes.Unknown, "region %d is not fully replicated", r.Region.Id) - } - - // When using a exponential backoffer, if we try to backoff more than 40 times in 10 regions, - // it would cost time unacceptable. - regionSplitter.ScatterRegionsSequentially(ctx, - regionInfos, - assertRetryLessThan(t, 40)) -} - // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) // range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) // rewrite rules: aa -> xx, cc -> bb From 5449a92dcaf6efdc944c195c91ea85c94d28111c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 8 Mar 2024 13:00:12 +0800 Subject: [PATCH 2/3] try fix UT Signed-off-by: lance6716 --- br/pkg/restore/import_retry_test.go | 115 ++++++++++++++-------------- br/pkg/restore/range_test.go | 11 ++- br/pkg/restore/split/split_test.go | 52 ++++++++++++- br/pkg/restore/split_test.go | 100 +++++++++++------------- br/pkg/restore/util_test.go | 79 ++++++++++--------- 5 files changed, 192 insertions(+), 165 deletions(-) diff --git a/br/pkg/restore/import_retry_test.go b/br/pkg/restore/import_retry_test.go index 97d1d10aacae0..6dbc05e2d402e 100644 --- a/br/pkg/restore/import_retry_test.go +++ b/br/pkg/restore/import_retry_test.go @@ -1,6 +1,6 @@ // Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0. -package restore_test +package restore import ( "context" @@ -18,7 +18,6 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/kv" @@ -58,35 +57,35 @@ func TestScanSuccess(t *testing.T) { ctx := context.Background() // make exclusive to inclusive. - ctl := restore.OverRegionsInRange([]byte("aa"), []byte("aay"), cli, &rs) + ctl := OverRegionsInRange([]byte("aa"), []byte("aay"), cli, &rs) collectedRegions := []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { collectedRegions = append(collectedRegions, r) - return restore.RPCResultOK() + return RPCResultOK() }) assertRegions(t, collectedRegions, "", "aay", "bba") - ctl = restore.OverRegionsInRange([]byte("aaz"), []byte("bb"), cli, &rs) + ctl = OverRegionsInRange([]byte("aaz"), []byte("bb"), cli, &rs) collectedRegions = []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { collectedRegions = append(collectedRegions, r) - return restore.RPCResultOK() + return RPCResultOK() }) assertRegions(t, collectedRegions, "aay", "bba", "bbh", "cca") - ctl = restore.OverRegionsInRange([]byte("aa"), []byte("cc"), cli, &rs) + ctl = OverRegionsInRange([]byte("aa"), []byte("cc"), cli, &rs) collectedRegions = []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { collectedRegions = append(collectedRegions, r) - return restore.RPCResultOK() + return RPCResultOK() }) assertRegions(t, collectedRegions, "", "aay", "bba", "bbh", "cca", "") - ctl = restore.OverRegionsInRange([]byte("aa"), []byte(""), cli, &rs) + ctl = OverRegionsInRange([]byte("aa"), []byte(""), cli, &rs) collectedRegions = []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { collectedRegions = append(collectedRegions, r) - return restore.RPCResultOK() + return RPCResultOK() }) assertRegions(t, collectedRegions, "", "aay", "bba", "bbh", "cca", "") } @@ -95,7 +94,7 @@ func TestNotLeader(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(1, 0, 0) - ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs) ctx := context.Background() notLeader := errorpb.Error{ @@ -109,17 +108,17 @@ func TestNotLeader(t *testing.T) { meetRegions := []*split.RegionInfo{} // record all regions we meet with id == 2. idEqualsTo2Regions := []*split.RegionInfo{} - err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { if r.Region.Id == 2 { idEqualsTo2Regions = append(idEqualsTo2Regions, r) } if r.Region.Id == 2 && (r.Leader == nil || r.Leader.Id != 42) { - return restore.RPCResult{ + return RPCResult{ StoreError: ¬Leader, } } meetRegions = append(meetRegions, r) - return restore.RPCResultOK() + return RPCResultOK() }) require.NoError(t, err) @@ -135,7 +134,7 @@ func TestServerIsBusy(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, 0, 0) - ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs) ctx := context.Background() serverIsBusy := errorpb.Error{ @@ -149,16 +148,16 @@ func TestServerIsBusy(t *testing.T) { // record all regions we meet with id == 2. idEqualsTo2Regions := []*split.RegionInfo{} theFirstRun := true - err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { if theFirstRun && r.Region.Id == 2 { idEqualsTo2Regions = append(idEqualsTo2Regions, r) theFirstRun = false - return restore.RPCResult{ + return RPCResult{ StoreError: &serverIsBusy, } } meetRegions = append(meetRegions, r) - return restore.RPCResultOK() + return RPCResultOK() }) require.NoError(t, err) @@ -176,7 +175,7 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, 0, 0) - ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs) ctx := context.Background() serverIsBusy := errorpb.Error{ @@ -190,16 +189,16 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) { // record all regions we meet with id == 2. idEqualsTo2Regions := []*split.RegionInfo{} theFirstRun := true - err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { if theFirstRun && r.Region.Id == 2 { idEqualsTo2Regions = append(idEqualsTo2Regions, r) theFirstRun = false - return restore.RPCResult{ + return RPCResult{ StoreError: &serverIsBusy, } } meetRegions = append(meetRegions, r) - return restore.RPCResultOK() + return RPCResultOK() }) require.NoError(t, err) @@ -228,7 +227,7 @@ func TestEpochNotMatch(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, 0, 0) - ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs) ctx := context.Background() printPDRegion("cli", cli.regionsInfo.Regions) @@ -262,18 +261,18 @@ func TestEpochNotMatch(t *testing.T) { firstRunRegions := []*split.RegionInfo{} secondRunRegions := []*split.RegionInfo{} isSecondRun := false - err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { if !isSecondRun && r.Region.Id == left.Region.Id { mergeRegion() isSecondRun = true - return restore.RPCResultFromPBError(epochNotMatch) + return RPCResultFromPBError(epochNotMatch) } if isSecondRun { secondRunRegions = append(secondRunRegions, r) } else { firstRunRegions = append(firstRunRegions, r) } - return restore.RPCResultOK() + return RPCResultOK() }) printRegion("first", firstRunRegions) printRegion("second", secondRunRegions) @@ -287,7 +286,7 @@ func TestRegionSplit(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, 0, 0) - ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs) ctx := context.Background() printPDRegion("cli", cli.regionsInfo.Regions) @@ -338,18 +337,18 @@ func TestRegionSplit(t *testing.T) { firstRunRegions := []*split.RegionInfo{} secondRunRegions := []*split.RegionInfo{} isSecondRun := false - err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { if !isSecondRun && r.Region.Id == target.Region.Id { splitRegion() isSecondRun = true - return restore.RPCResultFromPBError(epochNotMatch) + return RPCResultFromPBError(epochNotMatch) } if isSecondRun { secondRunRegions = append(secondRunRegions, r) } else { firstRunRegions = append(firstRunRegions, r) } - return restore.RPCResultOK() + return RPCResultOK() }) printRegion("first", firstRunRegions) printRegion("second", secondRunRegions) @@ -363,7 +362,7 @@ func TestRetryBackoff(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, time.Millisecond, 10*time.Millisecond) - ctl := restore.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := OverRegionsInRange([]byte(""), []byte(""), cli, &rs) ctx := context.Background() printPDRegion("cli", cli.regionsInfo.Regions) @@ -380,12 +379,12 @@ func TestRetryBackoff(t *testing.T) { }, }} isSecondRun := false - err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { if !isSecondRun && r.Region.Id == left.Region.Id { isSecondRun = true - return restore.RPCResultFromPBError(epochNotLeader) + return RPCResultFromPBError(epochNotLeader) } - return restore.RPCResultOK() + return RPCResultOK() }) printPDRegion("cli", cli.regionsInfo.Regions) require.Equal(t, 1, rs.Attempt()) @@ -395,10 +394,10 @@ func TestRetryBackoff(t *testing.T) { } func TestWrappedError(t *testing.T) { - result := restore.RPCResultFromError(errors.Trace(status.Error(codes.Unavailable, "the server is slacking. ><=·>"))) - require.Equal(t, result.StrategyForRetry(), restore.StrategyFromThisRegion) - result = restore.RPCResultFromError(errors.Trace(status.Error(codes.Unknown, "the server said something hard to understand"))) - require.Equal(t, result.StrategyForRetry(), restore.StrategyGiveUp) + result := RPCResultFromError(errors.Trace(status.Error(codes.Unavailable, "the server is slacking. ><=·>"))) + require.Equal(t, result.StrategyForRetry(), StrategyFromThisRegion) + result = RPCResultFromError(errors.Trace(status.Error(codes.Unknown, "the server said something hard to understand"))) + require.Equal(t, result.StrategyForRetry(), StrategyGiveUp) } func envInt(name string, def int) int { @@ -414,22 +413,22 @@ func TestPaginateScanLeader(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, time.Millisecond, 10*time.Millisecond) - ctl := restore.OverRegionsInRange([]byte("aa"), []byte("aaz"), cli, &rs) + ctl := OverRegionsInRange([]byte("aa"), []byte("aaz"), cli, &rs) ctx := context.Background() cli.InjectErr = true cli.InjectTimes = int32(envInt("PAGINATE_SCAN_LEADER_FAILURE_COUNT", 2)) collectedRegions := []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) restore.RPCResult { + ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { collectedRegions = append(collectedRegions, r) - return restore.RPCResultOK() + return RPCResultOK() }) assertRegions(t, collectedRegions, "", "aay", "bba") } func TestImportKVFiles(t *testing.T) { var ( - importer = restore.FileImporter{} + importer = FileImporter{} ctx = context.Background() shiftStartTS uint64 = 100 startTS uint64 = 200 @@ -438,7 +437,7 @@ func TestImportKVFiles(t *testing.T) { err := importer.ImportKVFiles( ctx, - []*restore.LogDataFileInfo{ + []*LogDataFileInfo{ { DataFileInfo: &backuppb.DataFileInfo{ Path: "log3", @@ -460,7 +459,7 @@ func TestImportKVFiles(t *testing.T) { } func TestFilterFilesByRegion(t *testing.T) { - files := []*restore.LogDataFileInfo{ + files := []*LogDataFileInfo{ { DataFileInfo: &backuppb.DataFileInfo{ Path: "log3", @@ -484,7 +483,7 @@ func TestFilterFilesByRegion(t *testing.T) { testCases := []struct { r split.RegionInfo - subfiles []*restore.LogDataFileInfo + subfiles []*LogDataFileInfo err error }{ { @@ -494,7 +493,7 @@ func TestFilterFilesByRegion(t *testing.T) { EndKey: []byte("1110"), }, }, - subfiles: []*restore.LogDataFileInfo{}, + subfiles: []*LogDataFileInfo{}, err: nil, }, { @@ -504,7 +503,7 @@ func TestFilterFilesByRegion(t *testing.T) { EndKey: []byte("1111"), }, }, - subfiles: []*restore.LogDataFileInfo{ + subfiles: []*LogDataFileInfo{ files[0], }, err: nil, @@ -516,7 +515,7 @@ func TestFilterFilesByRegion(t *testing.T) { EndKey: []byte("2222"), }, }, - subfiles: []*restore.LogDataFileInfo{ + subfiles: []*LogDataFileInfo{ files[0], }, err: nil, @@ -528,7 +527,7 @@ func TestFilterFilesByRegion(t *testing.T) { EndKey: []byte("3332"), }, }, - subfiles: []*restore.LogDataFileInfo{ + subfiles: []*LogDataFileInfo{ files[0], }, err: nil, @@ -540,7 +539,7 @@ func TestFilterFilesByRegion(t *testing.T) { EndKey: []byte("3332"), }, }, - subfiles: []*restore.LogDataFileInfo{}, + subfiles: []*LogDataFileInfo{}, err: nil, }, { @@ -550,7 +549,7 @@ func TestFilterFilesByRegion(t *testing.T) { EndKey: []byte("3333"), }, }, - subfiles: []*restore.LogDataFileInfo{ + subfiles: []*LogDataFileInfo{ files[1], }, err: nil, @@ -562,7 +561,7 @@ func TestFilterFilesByRegion(t *testing.T) { EndKey: []byte("5555"), }, }, - subfiles: []*restore.LogDataFileInfo{ + subfiles: []*LogDataFileInfo{ files[1], }, err: nil, @@ -574,7 +573,7 @@ func TestFilterFilesByRegion(t *testing.T) { EndKey: nil, }, }, - subfiles: []*restore.LogDataFileInfo{ + subfiles: []*LogDataFileInfo{ files[1], }, err: nil, @@ -592,7 +591,7 @@ func TestFilterFilesByRegion(t *testing.T) { } for _, c := range testCases { - subfile, err := restore.FilterFilesByRegion(files, ranges, &c.r) + subfile, err := FilterFilesByRegion(files, ranges, &c.r) require.Equal(t, err, c.err) require.Equal(t, subfile, c.subfiles) } diff --git a/br/pkg/restore/range_test.go b/br/pkg/restore/range_test.go index a03271de3da03..322789ec023c1 100644 --- a/br/pkg/restore/range_test.go +++ b/br/pkg/restore/range_test.go @@ -1,12 +1,11 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package restore_test +package restore import ( "testing" "github.com/pingcap/kvproto/pkg/import_sstpb" - "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/stretchr/testify/require" @@ -25,7 +24,7 @@ func TestSortRange(t *testing.T) { {OldKeyPrefix: tablecodec.GenTableRecordPrefix(1), NewKeyPrefix: tablecodec.GenTableRecordPrefix(4)}, {OldKeyPrefix: tablecodec.GenTableRecordPrefix(2), NewKeyPrefix: tablecodec.GenTableRecordPrefix(5)}, } - rewriteRules := &restore.RewriteRules{ + rewriteRules := &RewriteRules{ Data: dataRules, } ranges1 := []rtree.Range{ @@ -34,7 +33,7 @@ func TestSortRange(t *testing.T) { EndKey: append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...), Files: nil, }, } - rs1, err := restore.SortRanges(ranges1, rewriteRules) + rs1, err := SortRanges(ranges1, rewriteRules) require.NoErrorf(t, err, "sort range1 failed: %v", err) rangeEquals(t, rs1, []rtree.Range{ { @@ -49,13 +48,13 @@ func TestSortRange(t *testing.T) { EndKey: append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...), Files: nil, }, } - _, err = restore.SortRanges(ranges2, rewriteRules) + _, err = SortRanges(ranges2, rewriteRules) require.Error(t, err) require.Regexp(t, "table id mismatch.*", err.Error()) ranges3 := initRanges() rewriteRules1 := initRewriteRules() - rs3, err := restore.SortRanges(ranges3, rewriteRules1) + rs3, err := SortRanges(ranges3, rewriteRules1) require.NoErrorf(t, err, "sort range1 failed: %v", err) rangeEquals(t, rs3, []rtree.Range{ {StartKey: []byte("bbd"), EndKey: []byte("bbf"), Files: nil}, diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index 3b1c9470222c4..060f09b688632 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -8,6 +8,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" @@ -76,13 +77,13 @@ func TestScanRegionBackOfferWithStopRetry(t *testing.T) { require.Equal(t, counter, 6) } -type mockScatterPDClient struct { +type mockScatterFailedPDClient struct { pd.Client failed map[uint64]int failedBefore int } -func (c *mockScatterPDClient) ScatterRegion(ctx context.Context, regionID uint64) error { +func (c *mockScatterFailedPDClient) ScatterRegion(ctx context.Context, regionID uint64) error { if c.failed == nil { c.failed = make(map[uint64]int) } @@ -97,7 +98,7 @@ type recordCntBackoffer struct { already int } -func (b *recordCntBackoffer) NextBackoff(err error) time.Duration { +func (b *recordCntBackoffer) NextBackoff(error) time.Duration { b.already++ return 0 } @@ -109,7 +110,7 @@ func (b *recordCntBackoffer) Attempt() int { func TestScatterSequentiallyRetryCnt(t *testing.T) { client := pdClient{ needScatterVal: true, - client: &mockScatterPDClient{failedBefore: 7}, + client: &mockScatterFailedPDClient{failedBefore: 7}, } client.needScatterInit.Do(func() {}) @@ -134,3 +135,46 @@ func TestScatterSequentiallyRetryCnt(t *testing.T) { ) require.Equal(t, 7, backoffer.already) } + +type mockOldPDClient struct { + pd.Client + + scattered map[uint64]struct{} +} + +func (c *mockOldPDClient) ScatterRegion(_ context.Context, regionID uint64) error { + if c.scattered == nil { + c.scattered = make(map[uint64]struct{}) + } + c.scattered[regionID] = struct{}{} + return nil +} + +func (c *mockOldPDClient) ScatterRegions(context.Context, []uint64, ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error) { + return nil, status.Error(codes.Unimplemented, "Ah, yep") +} + +func TestScatterBackwardCompatibility(t *testing.T) { + client := pdClient{ + needScatterVal: true, + client: &mockOldPDClient{}, + } + client.needScatterInit.Do(func() {}) + + ctx := context.Background() + regions := []*RegionInfo{ + { + Region: &metapb.Region{ + Id: 1, + }, + }, + { + Region: &metapb.Region{ + Id: 2, + }, + }, + } + err := client.ScatterRegions(ctx, regions) + require.NoError(t, err) + require.Equal(t, map[uint64]struct{}{1: {}, 2: {}}, client.client.(*mockOldPDClient).scattered) +} diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 42008849ec58d..a2a266bf51bdc 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -1,6 +1,6 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package restore_test +package restore import ( "bytes" @@ -19,7 +19,6 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils/iter" @@ -36,14 +35,13 @@ import ( type TestClient struct { split.SplitClient - mu sync.RWMutex - stores map[uint64]*metapb.Store - regions map[uint64]*split.RegionInfo - regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions - nextRegionID uint64 - injectInScatter func(*split.RegionInfo) error - injectInOperator func(uint64) (*pdpb.GetOperatorResponse, error) - supportBatchScatter bool + mu sync.RWMutex + stores map[uint64]*metapb.Store + regions map[uint64]*split.RegionInfo + regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions + nextRegionID uint64 + injectInScatter func(*split.RegionInfo) error + injectInOperator func(uint64) (*pdpb.GetOperatorResponse, error) scattered map[uint64]bool InjectErr bool @@ -69,15 +67,8 @@ func NewTestClient( } } -func (c *TestClient) InstallBatchScatterSupport() { - c.supportBatchScatter = true -} - // ScatterRegions scatters regions in a batch. func (c *TestClient) ScatterRegions(ctx context.Context, regionInfo []*split.RegionInfo) error { - if !c.supportBatchScatter { - return status.Error(codes.Unimplemented, "Ah, yep") - } regions := map[uint64]*split.RegionInfo{} for _, region := range regionInfo { regions[region.Region.Id] = region @@ -259,7 +250,7 @@ func TestScanEmptyRegion(t *testing.T) { // make ranges has only one ranges = ranges[0:1] rewriteRules := initRewriteRules() - regionSplitter := restore.NewRegionSplitter(client) + regionSplitter := NewRegionSplitter(client) ctx := context.Background() err := regionSplitter.ExecuteSplit(ctx, ranges, rewriteRules, 1, false, func(key [][]byte) {}) @@ -276,17 +267,11 @@ func TestScanEmptyRegion(t *testing.T) { // [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, ) func TestSplitAndScatter(t *testing.T) { t.Run("BatchScatter", func(t *testing.T) { - client := initTestClient(false) - client.InstallBatchScatterSupport() - runTestSplitAndScatterWith(t, client) - }) - t.Run("BackwardCompatibility", func(t *testing.T) { client := initTestClient(false) runTestSplitAndScatterWith(t, client) }) t.Run("WaitScatter", func(t *testing.T) { client := initTestClient(false) - client.InstallBatchScatterSupport() runWaitScatter(t, client) }) } @@ -383,7 +368,7 @@ func runWaitScatter(t *testing.T, client *TestClient) { for _, info := range regionsMap { regions = append(regions, info) } - regionSplitter := restore.NewRegionSplitter(client) + regionSplitter := NewRegionSplitter(client) leftCnt := regionSplitter.WaitForScatterRegionsTimeout(ctx, regions, 2000*time.Second) require.Equal(t, leftCnt, 0) } @@ -391,7 +376,7 @@ func runWaitScatter(t *testing.T, client *TestClient) { func runTestSplitAndScatterWith(t *testing.T, client *TestClient) { ranges := initRanges() rewriteRules := initRewriteRules() - regionSplitter := restore.NewRegionSplitter(client) + regionSplitter := NewRegionSplitter(client) ctx := context.Background() err := regionSplitter.ExecuteSplit(ctx, ranges, rewriteRules, 1, false, func(key [][]byte) {}) @@ -418,7 +403,8 @@ func runTestSplitAndScatterWith(t *testing.T, client *TestClient) { scattered[regionInfo.Region.Id] = true return nil } - regionSplitter.ScatterRegions(ctx, regionInfos) + err = regionSplitter.client.ScatterRegions(ctx, regionInfos) + require.NoError(t, err) for key := range regions { if key == alwaysFailedRegionID { require.Falsef(t, scattered[key], "always failed region %d was scattered successfully", key) @@ -439,7 +425,7 @@ func TestRawSplit(t *testing.T) { client := initTestClient(true) ctx := context.Background() - regionSplitter := restore.NewRegionSplitter(client) + regionSplitter := NewRegionSplitter(client) err := regionSplitter.ExecuteSplit(ctx, ranges, nil, 1, true, func(key [][]byte) {}) require.NoError(t, err) regions := client.GetAllRegions() @@ -512,7 +498,7 @@ func initRanges() []rtree.Range { return ranges[:] } -func initRewriteRules() *restore.RewriteRules { +func initRewriteRules() *RewriteRules { var rules [2]*import_sstpb.RewriteRule rules[0] = &import_sstpb.RewriteRule{ OldKeyPrefix: []byte("aa"), @@ -522,7 +508,7 @@ func initRewriteRules() *restore.RewriteRules { OldKeyPrefix: []byte("cc"), NewKeyPrefix: []byte("bb"), } - return &restore.RewriteRules{ + return &RewriteRules{ Data: rules[:], } } @@ -641,7 +627,7 @@ type fakeRestorer struct { tableIDIsInsequence bool } -func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *restore.RewriteRules, updateCh glue.Progress, isRawKv bool) error { +func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool) error { f.mu.Lock() defer f.mu.Unlock() @@ -658,7 +644,7 @@ func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, re return nil } -func (f *fakeRestorer) RestoreSSTFiles(ctx context.Context, tableIDWithFiles []restore.TableIDWithFiles, rewriteRules *restore.RewriteRules, updateCh glue.Progress) error { +func (f *fakeRestorer) RestoreSSTFiles(ctx context.Context, tableIDWithFiles []TableIDWithFiles, rewriteRules *RewriteRules, updateCh glue.Progress) error { f.mu.Lock() defer f.mu.Unlock() @@ -676,7 +662,7 @@ func (f *fakeRestorer) RestoreSSTFiles(ctx context.Context, tableIDWithFiles []r return err } -func fakeRanges(keys ...string) (r restore.DrainResult) { +func fakeRanges(keys ...string) (r DrainResult) { for i := range keys { if i+1 == len(keys) { return @@ -687,7 +673,7 @@ func fakeRanges(keys ...string) (r restore.DrainResult) { Files: []*backuppb.File{{Name: "fake.sst"}}, }) r.TableEndOffsetInRanges = append(r.TableEndOffsetInRanges, len(r.Ranges)) - r.TablesToSend = append(r.TablesToSend, restore.CreatedTable{ + r.TablesToSend = append(r.TablesToSend, CreatedTable{ Table: &model.TableInfo{ ID: int64(i), }, @@ -702,7 +688,7 @@ type errorInTimeSink struct { t *testing.T } -func (e errorInTimeSink) EmitTables(tables ...restore.CreatedTable) {} +func (e errorInTimeSink) EmitTables(tables ...CreatedTable) {} func (e errorInTimeSink) EmitError(err error) { e.errCh <- err @@ -729,7 +715,7 @@ func assertErrorEmitInTime(ctx context.Context, t *testing.T) errorInTimeSink { } func TestRestoreFailed(t *testing.T) { - ranges := []restore.DrainResult{ + ranges := []DrainResult{ fakeRanges("aax", "abx", "abz"), fakeRanges("abz", "bbz", "bcy"), fakeRanges("bcy", "cad", "xxy"), @@ -737,7 +723,7 @@ func TestRestoreFailed(t *testing.T) { r := &fakeRestorer{ tableIDIsInsequence: true, } - sender, err := restore.NewTiKVSender(context.TODO(), r, nil, 1, string(restore.FineGrained)) + sender, err := NewTiKVSender(context.TODO(), r, nil, 1, string(FineGrained)) require.NoError(t, err) dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -754,13 +740,13 @@ func TestRestoreFailed(t *testing.T) { } func TestSplitFailed(t *testing.T) { - ranges := []restore.DrainResult{ + ranges := []DrainResult{ fakeRanges("aax", "abx", "abz"), fakeRanges("abz", "bbz", "bcy"), fakeRanges("bcy", "cad", "xxy"), } r := &fakeRestorer{errorInSplit: true, tableIDIsInsequence: true} - sender, err := restore.NewTiKVSender(context.TODO(), r, nil, 1, string(restore.FineGrained)) + sender, err := NewTiKVSender(context.TODO(), r, nil, 1, string(FineGrained)) require.NoError(t, err) dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -785,7 +771,7 @@ func TestSplitPoint(t *testing.T) { ctx := context.Background() var oldTableID int64 = 50 var tableID int64 = 100 - rewriteRules := &restore.RewriteRules{ + rewriteRules := &RewriteRules{ Data: []*import_sstpb.RewriteRule{ { OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), @@ -808,8 +794,8 @@ func TestSplitPoint(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) - iter := restore.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) - err := restore.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []split.Valued) error { + iter := NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) + err := SplitPoint(ctx, iter, client, func(ctx context.Context, rs *RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []split.Valued) error { require.Equal(t, u, uint64(0)) require.Equal(t, o, int64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) @@ -835,7 +821,7 @@ func TestSplitPoint2(t *testing.T) { ctx := context.Background() var oldTableID int64 = 50 var tableID int64 = 100 - rewriteRules := &restore.RewriteRules{ + rewriteRules := &RewriteRules{ Data: []*import_sstpb.RewriteRule{ { OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), @@ -866,8 +852,8 @@ func TestSplitPoint2(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "o"), keyWithTablePrefix(tableID+1, "a")) firstSplit := true - iter := restore.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) - err := restore.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *restore.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []split.Valued) error { + iter := NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) + err := SplitPoint(ctx, iter, client, func(ctx context.Context, rs *RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []split.Valued) error { if firstSplit { require.Equal(t, u, uint64(0)) require.Equal(t, o, int64(0)) @@ -940,7 +926,7 @@ func TestGetRewriteTableID(t *testing.T) { var tableID int64 = 76 var oldTableID int64 = 80 { - rewriteRules := &restore.RewriteRules{ + rewriteRules := &RewriteRules{ Data: []*import_sstpb.RewriteRule{ { OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), @@ -949,12 +935,12 @@ func TestGetRewriteTableID(t *testing.T) { }, } - newTableID := restore.GetRewriteTableID(oldTableID, rewriteRules) + newTableID := GetRewriteTableID(oldTableID, rewriteRules) require.Equal(t, tableID, newTableID) } { - rewriteRules := &restore.RewriteRules{ + rewriteRules := &RewriteRules{ Data: []*import_sstpb.RewriteRule{ { OldKeyPrefix: tablecodec.GenTableRecordPrefix(oldTableID), @@ -963,7 +949,7 @@ func TestGetRewriteTableID(t *testing.T) { }, } - newTableID := restore.GetRewriteTableID(oldTableID, rewriteRules) + newTableID := GetRewriteTableID(oldTableID, rewriteRules) require.Equal(t, tableID, newTableID) } } @@ -972,12 +958,12 @@ type mockLogIter struct { next int } -func (m *mockLogIter) TryNext(ctx context.Context) iter.IterResult[*restore.LogDataFileInfo] { +func (m *mockLogIter) TryNext(ctx context.Context) iter.IterResult[*LogDataFileInfo] { if m.next > 10000 { - return iter.Done[*restore.LogDataFileInfo]() + return iter.Done[*LogDataFileInfo]() } m.next += 1 - return iter.Emit(&restore.LogDataFileInfo{ + return iter.Emit(&LogDataFileInfo{ DataFileInfo: &backuppb.DataFileInfo{ StartKey: []byte(fmt.Sprintf("a%d", m.next)), EndKey: []byte("b"), @@ -989,7 +975,7 @@ func (m *mockLogIter) TryNext(ctx context.Context) iter.IterResult[*restore.LogD func TestLogFilesIterWithSplitHelper(t *testing.T) { var tableID int64 = 76 var oldTableID int64 = 80 - rewriteRules := &restore.RewriteRules{ + rewriteRules := &RewriteRules{ Data: []*import_sstpb.RewriteRule{ { OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), @@ -997,12 +983,12 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) { }, }, } - rewriteRulesMap := map[int64]*restore.RewriteRules{ + rewriteRulesMap := map[int64]*RewriteRules{ oldTableID: rewriteRules, } mockIter := &mockLogIter{} ctx := context.Background() - logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, newFakeSplitClient(), 144*1024*1024, 1440000) + logIter := NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, newFakeSplitClient(), 144*1024*1024, 1440000) next := 0 for r := logIter.TryNext(ctx); !r.Finished; r = logIter.TryNext(ctx) { require.NoError(t, r.Err) @@ -1056,7 +1042,7 @@ func TestSplitCheckPartRegionConsistency(t *testing.T) { } func TestGetSplitSortedKeysFromSortedRegions(t *testing.T) { - splitContext := restore.SplitContext{} + splitContext := SplitContext{} sortedKeys := [][]byte{ []byte("b"), []byte("d"), @@ -1087,7 +1073,7 @@ func TestGetSplitSortedKeysFromSortedRegions(t *testing.T) { }, }, } - result := restore.TestGetSplitSortedKeysFromSortedRegionsTest(splitContext, sortedKeys, sortedRegions) + result := TestGetSplitSortedKeysFromSortedRegionsTest(splitContext, sortedKeys, sortedRegions) require.Equal(t, 3, len(result)) require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[1]) require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[2]) diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 2740594a79cf1..e6edc8f81334d 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -1,6 +1,6 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package restore_test +package restore import ( "context" @@ -14,7 +14,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" recover_data "github.com/pingcap/kvproto/pkg/recoverdatapb" berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/tablecodec" @@ -23,19 +22,19 @@ import ( ) func TestParseQuoteName(t *testing.T) { - schema, table := restore.ParseQuoteName("`a`.`b`") + schema, table := ParseQuoteName("`a`.`b`") require.Equal(t, "a", schema) require.Equal(t, "b", table) - schema, table = restore.ParseQuoteName("`a``b`.``````") + schema, table = ParseQuoteName("`a``b`.``````") require.Equal(t, "a`b", schema) require.Equal(t, "``", table) - schema, table = restore.ParseQuoteName("`.`.`.`") + schema, table = ParseQuoteName("`.`.`.`") require.Equal(t, ".", schema) require.Equal(t, ".", table) - schema, table = restore.ParseQuoteName("`.``.`.`.`") + schema, table = ParseQuoteName("`.``.`.`.`") require.Equal(t, ".`.", schema) require.Equal(t, ".", table) } @@ -54,7 +53,7 @@ func TestGetSSTMetaFromFile(t *testing.T) { StartKey: []byte("t2abc"), EndKey: []byte("t3a"), } - sstMeta, err := restore.GetSSTMetaFromFile([]byte{}, file, region, rule, restore.RewriteModeLegacy) + sstMeta, err := GetSSTMetaFromFile([]byte{}, file, region, rule, RewriteModeLegacy) require.Nil(t, err) require.Equal(t, "t2abc", string(sstMeta.GetRange().GetStart())) require.Equal(t, "t2\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff", string(sstMeta.GetRange().GetEnd())) @@ -91,14 +90,14 @@ func TestMapTableToFiles(t *testing.T) { }, } - result := restore.MapTableToFiles(append(filesOfTable2, filesOfTable1...)) + result := MapTableToFiles(append(filesOfTable2, filesOfTable1...)) require.Equal(t, filesOfTable1, result[1]) require.Equal(t, filesOfTable2, result[2]) } func TestValidateFileRewriteRule(t *testing.T) { - rules := &restore.RewriteRules{ + rules := &RewriteRules{ Data: []*import_sstpb.RewriteRule{{ OldKeyPrefix: []byte(tablecodec.EncodeTablePrefix(1)), NewKeyPrefix: []byte(tablecodec.EncodeTablePrefix(2)), @@ -106,7 +105,7 @@ func TestValidateFileRewriteRule(t *testing.T) { } // Empty start/end key is not allowed. - err := restore.ValidateFileRewriteRule( + err := ValidateFileRewriteRule( &backuppb.File{ Name: "file_write.sst", StartKey: []byte(""), @@ -118,7 +117,7 @@ func TestValidateFileRewriteRule(t *testing.T) { require.Regexp(t, ".*cannot find rewrite rule.*", err.Error()) // Range is not overlap, no rule found. - err = restore.ValidateFileRewriteRule( + err = ValidateFileRewriteRule( &backuppb.File{ Name: "file_write.sst", StartKey: tablecodec.EncodeTablePrefix(0), @@ -130,7 +129,7 @@ func TestValidateFileRewriteRule(t *testing.T) { require.Regexp(t, ".*cannot find rewrite rule.*", err.Error()) // No rule for end key. - err = restore.ValidateFileRewriteRule( + err = ValidateFileRewriteRule( &backuppb.File{ Name: "file_write.sst", StartKey: tablecodec.EncodeTablePrefix(1), @@ -146,7 +145,7 @@ func TestValidateFileRewriteRule(t *testing.T) { OldKeyPrefix: tablecodec.EncodeTablePrefix(2), NewKeyPrefix: tablecodec.EncodeTablePrefix(3), }) - err = restore.ValidateFileRewriteRule( + err = ValidateFileRewriteRule( &backuppb.File{ Name: "file_write.sst", StartKey: tablecodec.EncodeTablePrefix(1), @@ -162,7 +161,7 @@ func TestValidateFileRewriteRule(t *testing.T) { OldKeyPrefix: tablecodec.EncodeTablePrefix(2), NewKeyPrefix: tablecodec.EncodeTablePrefix(1), }) - err = restore.ValidateFileRewriteRule( + err = ValidateFileRewriteRule( &backuppb.File{ Name: "file_write.sst", StartKey: tablecodec.EncodeTablePrefix(1), @@ -345,7 +344,7 @@ func (c *regionOnlineSlowClient) ScanRegions(ctx context.Context, key, endKey [] } func TestRewriteFileKeys(t *testing.T) { - rewriteRules := restore.RewriteRules{ + rewriteRules := RewriteRules{ Data: []*import_sstpb.RewriteRule{ { NewKeyPrefix: tablecodec.GenTablePrefix(2), @@ -362,7 +361,7 @@ func TestRewriteFileKeys(t *testing.T) { StartKey: tablecodec.GenTableRecordPrefix(1), EndKey: tablecodec.GenTableRecordPrefix(1).PrefixNext(), } - start, end, err := restore.GetRewriteRawKeys(&rawKeyFile, &rewriteRules) + start, end, err := GetRewriteRawKeys(&rawKeyFile, &rewriteRules) require.NoError(t, err) _, end, err = codec.DecodeBytes(end, nil) require.NoError(t, err) @@ -376,7 +375,7 @@ func TestRewriteFileKeys(t *testing.T) { StartKey: codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(1)), EndKey: codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(1).PrefixNext()), } - start, end, err = restore.GetRewriteEncodedKeys(&encodeKeyFile, &rewriteRules) + start, end, err = GetRewriteEncodedKeys(&encodeKeyFile, &rewriteRules) require.NoError(t, err) require.Equal(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(2)), start) require.Equal(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(2).PrefixNext()), end) @@ -388,12 +387,12 @@ func TestRewriteFileKeys(t *testing.T) { EndKey: codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(767).PrefixNext()), } // use raw rewrite should no error but not equal - start, end, err = restore.GetRewriteRawKeys(&encodeKeyFile767, &rewriteRules) + start, end, err = GetRewriteRawKeys(&encodeKeyFile767, &rewriteRules) require.NoError(t, err) require.NotEqual(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(511)), start) require.NotEqual(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(511).PrefixNext()), end) // use encode rewrite should no error and equal - start, end, err = restore.GetRewriteEncodedKeys(&encodeKeyFile767, &rewriteRules) + start, end, err = GetRewriteEncodedKeys(&encodeKeyFile767, &rewriteRules) require.NoError(t, err) require.Equal(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(511)), start) require.Equal(t, codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(511).PrefixNext()), end) @@ -410,8 +409,8 @@ func newPeerMeta( commitIndex uint64, version uint64, tombstone bool, -) *restore.RecoverRegion { - return &restore.RecoverRegion{ +) *RecoverRegion { + return &RecoverRegion{ &recover_data.RegionMeta{ RegionId: regionId, PeerId: peerId, @@ -427,12 +426,12 @@ func newPeerMeta( } } -func newRecoverRegionInfo(r *restore.RecoverRegion) *restore.RecoverRegionInfo { - return &restore.RecoverRegionInfo{ +func newRecoverRegionInfo(r *RecoverRegion) *RecoverRegionInfo { + return &RecoverRegionInfo{ RegionVersion: r.Version, RegionId: r.RegionId, - StartKey: restore.PrefixStartKey(r.StartKey), - EndKey: restore.PrefixEndKey(r.EndKey), + StartKey: PrefixStartKey(r.StartKey), + EndKey: PrefixEndKey(r.EndKey), TombStone: r.Tombstone, } } @@ -441,7 +440,7 @@ func TestSortRecoverRegions(t *testing.T) { selectedPeer1 := newPeerMeta(9, 11, 2, []byte("aa"), nil, 2, 0, 0, 0, false) selectedPeer2 := newPeerMeta(19, 22, 3, []byte("bbb"), nil, 2, 1, 0, 1, false) selectedPeer3 := newPeerMeta(29, 30, 1, []byte("c"), nil, 2, 1, 1, 2, false) - regions := map[uint64][]*restore.RecoverRegion{ + regions := map[uint64][]*RecoverRegion{ 9: { // peer 11 should be selected because of log term newPeerMeta(9, 10, 1, []byte("a"), nil, 1, 1, 1, 1, false), @@ -461,8 +460,8 @@ func TestSortRecoverRegions(t *testing.T) { newPeerMeta(29, 32, 3, []byte("ccc"), nil, 2, 1, 0, 0, false), }, } - regionsInfos := restore.SortRecoverRegions(regions) - expectRegionInfos := []*restore.RecoverRegionInfo{ + regionsInfos := SortRecoverRegions(regions) + expectRegionInfos := []*RecoverRegionInfo{ newRecoverRegionInfo(selectedPeer3), newRecoverRegionInfo(selectedPeer2), newRecoverRegionInfo(selectedPeer1), @@ -476,13 +475,13 @@ func TestCheckConsistencyAndValidPeer(t *testing.T) { validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false) validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 1, 2, false) - validRegionInfos := []*restore.RecoverRegionInfo{ + validRegionInfos := []*RecoverRegionInfo{ newRecoverRegionInfo(validPeer1), newRecoverRegionInfo(validPeer2), newRecoverRegionInfo(validPeer3), } - validPeer, err := restore.CheckConsistencyAndValidPeer(validRegionInfos) + validPeer, err := CheckConsistencyAndValidPeer(validRegionInfos) require.NoError(t, err) require.Equal(t, 3, len(validPeer)) var regions = make(map[uint64]struct{}, 3) @@ -497,13 +496,13 @@ func TestCheckConsistencyAndValidPeer(t *testing.T) { invalidPeer2 := newPeerMeta(19, 22, 3, []byte("dd"), []byte("cc"), 2, 1, 0, 1, false) invalidPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte("dd"), 2, 1, 1, 2, false) - invalidRegionInfos := []*restore.RecoverRegionInfo{ + invalidRegionInfos := []*RecoverRegionInfo{ newRecoverRegionInfo(invalidPeer1), newRecoverRegionInfo(invalidPeer2), newRecoverRegionInfo(invalidPeer3), } - _, err = restore.CheckConsistencyAndValidPeer(invalidRegionInfos) + _, err = CheckConsistencyAndValidPeer(invalidRegionInfos) require.Error(t, err) require.Regexp(t, ".*invalid restore range.*", err.Error()) } @@ -514,13 +513,13 @@ func TestLeaderCandidates(t *testing.T) { validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false) validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false) - peers := []*restore.RecoverRegion{ + peers := []*RecoverRegion{ validPeer1, validPeer2, validPeer3, } - candidates, err := restore.LeaderCandidates(peers) + candidates, err := LeaderCandidates(peers) require.NoError(t, err) require.Equal(t, 3, len(candidates)) } @@ -530,30 +529,30 @@ func TestSelectRegionLeader(t *testing.T) { validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false) validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false) - peers := []*restore.RecoverRegion{ + peers := []*RecoverRegion{ validPeer1, validPeer2, validPeer3, } // init store banlance score all is 0 storeBalanceScore := make(map[uint64]int, len(peers)) - leader := restore.SelectRegionLeader(storeBalanceScore, peers) + leader := SelectRegionLeader(storeBalanceScore, peers) require.Equal(t, validPeer1, leader) // change store banlance store storeBalanceScore[2] = 3 storeBalanceScore[3] = 2 storeBalanceScore[1] = 1 - leader = restore.SelectRegionLeader(storeBalanceScore, peers) + leader = SelectRegionLeader(storeBalanceScore, peers) require.Equal(t, validPeer3, leader) // one peer - peer := []*restore.RecoverRegion{ + peer := []*RecoverRegion{ validPeer3, } // init store banlance score all is 0 storeScore := make(map[uint64]int, len(peer)) - leader = restore.SelectRegionLeader(storeScore, peer) + leader = SelectRegionLeader(storeScore, peer) require.Equal(t, validPeer3, leader) } @@ -567,7 +566,7 @@ func TestLogFilesSkipMap(t *testing.T) { ) for ratio < 1 { - skipmap := restore.NewLogFilesSkipMap() + skipmap := NewLogFilesSkipMap() nativemap := make(map[string]map[int]map[int]struct{}) count := 0 for i := 0; i < int(ratio*float64(metaNum*groupNum*fileNum)); i++ { From 885de77c23fbca3f8dc7f007471178f702ab40b2 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 8 Mar 2024 13:03:30 +0800 Subject: [PATCH 3/3] fix bazel Signed-off-by: lance6716 --- br/pkg/restore/split/BUILD.bazel | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 45b2d8907f718..2663c89bcd56c 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -47,13 +47,18 @@ go_test( "split_test.go", "sum_sorted_test.go", ], + embed = [":split"], flaky = True, - shard_count = 4, + shard_count = 6, deps = [ - ":split", "//br/pkg/errors", "//br/pkg/utils", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/pdpb", "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", ], )