From d163cb07787568ae8944fdca4d2bf7112ad36930 Mon Sep 17 00:00:00 2001 From: mittalrishabh Date: Tue, 22 Aug 2023 20:00:34 -0700 Subject: [PATCH] This is an automated cherry-pick of #46202 Signed-off-by: ti-chi-bot --- br/pkg/lightning/backend/local/local.go | 6 ++ br/pkg/lightning/backend/local/localhelper.go | 23 ++++- .../backend/local/localhelper_test.go | 97 ++++++++++++++++++- 3 files changed, 119 insertions(+), 7 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index a47e30eb43819..91bbd8312e53d 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1078,6 +1078,7 @@ func (local *local) writeAndIngestByRange( ctx, cancel := context.WithCancel(ctxt) defer cancel() +<<<<<<< HEAD WriteAndIngest: for retry := 0; retry < maxRetryTimes; { if retry != 0 { @@ -1121,6 +1122,11 @@ WriteAndIngest: logutil.Key("endKey", end), log.ShortError(err), zap.Int("retry", retry)) continue WriteAndIngest } +======= + err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, needSplit, maxBatchSplitRanges) + if err == nil || common.IsContextCanceledError(err) { + break +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) } return err diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 4f1c926d3fc44..188c9cc5c88fc 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -65,9 +65,7 @@ var ( func (local *local) SplitAndScatterRegionInBatches( ctx context.Context, ranges []Range, - tableInfo *checkpoints.TidbTableInfo, needSplit bool, - regionSplitSize int64, batchCnt int, ) error { for i := 0; i < len(ranges); i += batchCnt { @@ -75,7 +73,7 @@ func (local *local) SplitAndScatterRegionInBatches( if len(batch) > batchCnt { batch = batch[:batchCnt] } - if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil { + if err := local.SplitAndScatterRegionByRanges(ctx, batch, needSplit); err != nil { return errors.Trace(err) } } @@ -89,10 +87,13 @@ func (local *local) SplitAndScatterRegionInBatches( func (local *local) SplitAndScatterRegionByRanges( ctx context.Context, ranges []Range, - tableInfo *checkpoints.TidbTableInfo, needSplit bool, +<<<<<<< HEAD regionSplitSize int64, ) error { +======= +) (err error) { +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) if len(ranges) == 0 { return nil } @@ -108,7 +109,6 @@ func (local *local) SplitAndScatterRegionByRanges( scatterRegions := make([]*split.RegionInfo, 0) var retryKeys [][]byte waitTime := splitRegionBaseBackOffTime - skippedKeys := 0 for i := 0; i < splitRetryTimes; i++ { log.L().Info("split and scatter region", logutil.Key("minKey", minKey), @@ -170,6 +170,7 @@ func (local *local) SplitAndScatterRegionByRanges( return nil } +<<<<<<< HEAD var tableRegionStats map[uint64]int64 if tableInfo != nil { tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID) @@ -180,6 +181,8 @@ func (local *local) SplitAndScatterRegionByRanges( } } +======= +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) regionMap := make(map[uint64]*split.RegionInfo) for _, region := range regions { regionMap[region.Region.GetId()] = region @@ -289,6 +292,7 @@ func (local *local) SplitAndScatterRegionByRanges( } sendLoop: for regionID, keys := range splitKeyMap { +<<<<<<< HEAD // if region not in tableRegionStats, that means this region is newly split, so // we can skip split it again. regionSize, ok := tableRegionStats[regionID] @@ -298,6 +302,8 @@ func (local *local) SplitAndScatterRegionByRanges( if len(keys) == 1 && regionSize < regionSplitSize { skippedKeys++ } +======= +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) select { case ch <- &splitInfo{region: regionMap[regionID], keys: keys}: case <-ctx.Done(): @@ -340,12 +346,19 @@ func (local *local) SplitAndScatterRegionByRanges( scatterCount++ } if scatterCount == len(scatterRegions) { +<<<<<<< HEAD log.L().Info("waiting for scattering regions done", zap.Int("skipped_keys", skippedKeys), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } else { log.L().Info("waiting for scattering regions timeout", zap.Int("skipped_keys", skippedKeys), +======= + log.FromContext(ctx).Info("waiting for scattering regions done", + zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) + } else { + log.FromContext(ctx).Info("waiting for scattering regions timeout", +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) zap.Int("scatterCount", scatterCount), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 48ce64da5e3b6..7ffdd8068b840 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -437,10 +437,15 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie start = end } +<<<<<<< HEAD err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000) if len(errPat) == 0 { require.NoError(t, err) } else { +======= + err = local.SplitAndScatterRegionByRanges(ctx, ranges, true) + if len(errPat) != 0 { +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) require.Error(t, err) require.Regexp(t, errPat, err.Error()) return @@ -465,6 +470,94 @@ func TestBatchSplitRegionByRanges(t *testing.T) { doTestBatchSplitRegionByRanges(context.Background(), t, nil, "", nil) } +<<<<<<< HEAD +======= +type checkScatterClient struct { + *testSplitClient + + mu sync.Mutex + notFoundFirstTime map[uint64]struct{} + scatterCounter atomic.Int32 +} + +func newCheckScatterClient(inner *testSplitClient) *checkScatterClient { + return &checkScatterClient{ + testSplitClient: inner, + notFoundFirstTime: map[uint64]struct{}{}, + scatterCounter: atomic.Int32{}, + } +} + +func (c *checkScatterClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { + c.scatterCounter.Add(1) + return nil +} + +func (c *checkScatterClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.notFoundFirstTime[regionID]; !ok { + c.notFoundFirstTime[regionID] = struct{}{} + return nil, nil + } + return c.testSplitClient.GetRegionByID(ctx, regionID) +} + +func TestMissingScatter(t *testing.T) { + ctx := context.Background() + splitHook := defaultHook{} + deferFunc := splitHook.setup(t) + defer deferFunc() + + keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} + client := initTestSplitClient(keys, nil) + checkClient := newCheckScatterClient(client) + local := &Backend{ + splitCli: checkClient, + logger: log.L(), + } + local.RegionSplitBatchSize = 4 + local.RegionSplitConcurrency = 4 + + // current region ranges: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) + rangeStart := codec.EncodeBytes([]byte{}, []byte("b")) + rangeEnd := codec.EncodeBytes([]byte{}, []byte("c")) + regions, err := split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) + require.NoError(t, err) + // regions is: [aay, bba), [bba, bbh), [bbh, cca) + checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")}) + + // generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz) + ranges := make([]Range, 0) + start := []byte{'b'} + for i := byte('a'); i <= 'z'; i++ { + end := []byte{'b', i} + ranges = append(ranges, Range{start: start, end: end}) + start = end + } + + err = local.SplitAndScatterRegionByRanges(ctx, ranges, true) + require.NoError(t, err) + + splitHook.check(t, client) + + // check split ranges + regions, err = split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5) + require.NoError(t, err) + result := [][]byte{ + []byte("b"), []byte("ba"), []byte("bb"), []byte("bba"), []byte("bbh"), []byte("bc"), + []byte("bd"), []byte("be"), []byte("bf"), []byte("bg"), []byte("bh"), []byte("bi"), []byte("bj"), + []byte("bk"), []byte("bl"), []byte("bm"), []byte("bn"), []byte("bo"), []byte("bp"), []byte("bq"), + []byte("br"), []byte("bs"), []byte("bt"), []byte("bu"), []byte("bv"), []byte("bw"), []byte("bx"), + []byte("by"), []byte("bz"), []byte("cca"), + } + checkRegionRanges(t, regions, result) + + // the old regions will not be scattered. They are [..., bba), [bba, bbh), [..., cca) + require.Equal(t, len(result)-3, int(checkClient.scatterCounter.Load())) +} + +>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202)) type batchSizeHook struct{} func (h batchSizeHook) setup(t *testing.T) func() { @@ -599,7 +692,7 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) { }) } - err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4) + err := local.SplitAndScatterRegionInBatches(ctx, ranges, true, 4) require.NoError(t, err) rangeStart := codec.EncodeBytes([]byte{}, []byte("a")) @@ -695,7 +788,7 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) { start = e } - err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000) + err := local.SplitAndScatterRegionByRanges(ctx, ranges, true) require.NoError(t, err) startKey := codec.EncodeBytes([]byte{}, rangeKeys[0])