Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#46202
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
mittalrishabh authored and ti-chi-bot committed Aug 23, 2023
1 parent 5212c4f commit d163cb0
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 7 deletions.
6 changes: 6 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ func (local *local) writeAndIngestByRange(
ctx, cancel := context.WithCancel(ctxt)
defer cancel()

<<<<<<< HEAD
WriteAndIngest:
for retry := 0; retry < maxRetryTimes; {
if retry != 0 {
Expand Down Expand Up @@ -1121,6 +1122,11 @@ WriteAndIngest:
logutil.Key("endKey", end), log.ShortError(err), zap.Int("retry", retry))
continue WriteAndIngest
}
=======
err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, needSplit, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
}

return err
Expand Down
23 changes: 18 additions & 5 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,15 @@ var (
func (local *local) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
if err := local.SplitAndScatterRegionByRanges(ctx, batch, needSplit); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -89,10 +87,13 @@ func (local *local) SplitAndScatterRegionInBatches(
func (local *local) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
<<<<<<< HEAD
regionSplitSize int64,
) error {
=======
) (err error) {
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
if len(ranges) == 0 {
return nil
}
Expand All @@ -108,7 +109,6 @@ func (local *local) SplitAndScatterRegionByRanges(
scatterRegions := make([]*split.RegionInfo, 0)
var retryKeys [][]byte
waitTime := splitRegionBaseBackOffTime
skippedKeys := 0
for i := 0; i < splitRetryTimes; i++ {
log.L().Info("split and scatter region",
logutil.Key("minKey", minKey),
Expand Down Expand Up @@ -170,6 +170,7 @@ func (local *local) SplitAndScatterRegionByRanges(
return nil
}

<<<<<<< HEAD
var tableRegionStats map[uint64]int64
if tableInfo != nil {
tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID)
Expand All @@ -180,6 +181,8 @@ func (local *local) SplitAndScatterRegionByRanges(
}
}

=======
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
Expand Down Expand Up @@ -289,6 +292,7 @@ func (local *local) SplitAndScatterRegionByRanges(
}
sendLoop:
for regionID, keys := range splitKeyMap {
<<<<<<< HEAD
// if region not in tableRegionStats, that means this region is newly split, so
// we can skip split it again.
regionSize, ok := tableRegionStats[regionID]
Expand All @@ -298,6 +302,8 @@ func (local *local) SplitAndScatterRegionByRanges(
if len(keys) == 1 && regionSize < regionSplitSize {
skippedKeys++
}
=======
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
select {
case ch <- &splitInfo{region: regionMap[regionID], keys: keys}:
case <-ctx.Done():
Expand Down Expand Up @@ -340,12 +346,19 @@ func (local *local) SplitAndScatterRegionByRanges(
scatterCount++
}
if scatterCount == len(scatterRegions) {
<<<<<<< HEAD
log.L().Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.L().Info("waiting for scattering regions timeout",
zap.Int("skipped_keys", skippedKeys),
=======
log.FromContext(ctx).Info("waiting for scattering regions done",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.FromContext(ctx).Info("waiting for scattering regions timeout",
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
Expand Down
97 changes: 95 additions & 2 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,15 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie
start = end
}

<<<<<<< HEAD
err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
if len(errPat) == 0 {
require.NoError(t, err)
} else {
=======
err = local.SplitAndScatterRegionByRanges(ctx, ranges, true)
if len(errPat) != 0 {
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
require.Error(t, err)
require.Regexp(t, errPat, err.Error())
return
Expand All @@ -465,6 +470,94 @@ func TestBatchSplitRegionByRanges(t *testing.T) {
doTestBatchSplitRegionByRanges(context.Background(), t, nil, "", nil)
}

<<<<<<< HEAD
=======
type checkScatterClient struct {
*testSplitClient

mu sync.Mutex
notFoundFirstTime map[uint64]struct{}
scatterCounter atomic.Int32
}

func newCheckScatterClient(inner *testSplitClient) *checkScatterClient {
return &checkScatterClient{
testSplitClient: inner,
notFoundFirstTime: map[uint64]struct{}{},
scatterCounter: atomic.Int32{},
}
}

func (c *checkScatterClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error {
c.scatterCounter.Add(1)
return nil
}

func (c *checkScatterClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.notFoundFirstTime[regionID]; !ok {
c.notFoundFirstTime[regionID] = struct{}{}
return nil, nil
}
return c.testSplitClient.GetRegionByID(ctx, regionID)
}

func TestMissingScatter(t *testing.T) {
ctx := context.Background()
splitHook := defaultHook{}
deferFunc := splitHook.setup(t)
defer deferFunc()

keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")}
client := initTestSplitClient(keys, nil)
checkClient := newCheckScatterClient(client)
local := &Backend{
splitCli: checkClient,
logger: log.L(),
}
local.RegionSplitBatchSize = 4
local.RegionSplitConcurrency = 4

// current region ranges: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
rangeStart := codec.EncodeBytes([]byte{}, []byte("b"))
rangeEnd := codec.EncodeBytes([]byte{}, []byte("c"))
regions, err := split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
require.NoError(t, err)
// regions is: [aay, bba), [bba, bbh), [bbh, cca)
checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")})

// generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz)
ranges := make([]Range, 0)
start := []byte{'b'}
for i := byte('a'); i <= 'z'; i++ {
end := []byte{'b', i}
ranges = append(ranges, Range{start: start, end: end})
start = end
}

err = local.SplitAndScatterRegionByRanges(ctx, ranges, true)
require.NoError(t, err)

splitHook.check(t, client)

// check split ranges
regions, err = split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
require.NoError(t, err)
result := [][]byte{
[]byte("b"), []byte("ba"), []byte("bb"), []byte("bba"), []byte("bbh"), []byte("bc"),
[]byte("bd"), []byte("be"), []byte("bf"), []byte("bg"), []byte("bh"), []byte("bi"), []byte("bj"),
[]byte("bk"), []byte("bl"), []byte("bm"), []byte("bn"), []byte("bo"), []byte("bp"), []byte("bq"),
[]byte("br"), []byte("bs"), []byte("bt"), []byte("bu"), []byte("bv"), []byte("bw"), []byte("bx"),
[]byte("by"), []byte("bz"), []byte("cca"),
}
checkRegionRanges(t, regions, result)

// the old regions will not be scattered. They are [..., bba), [bba, bbh), [..., cca)
require.Equal(t, len(result)-3, int(checkClient.scatterCounter.Load()))
}

>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
type batchSizeHook struct{}

func (h batchSizeHook) setup(t *testing.T) func() {
Expand Down Expand Up @@ -599,7 +692,7 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) {
})
}

err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
err := local.SplitAndScatterRegionInBatches(ctx, ranges, true, 4)
require.NoError(t, err)

rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
Expand Down Expand Up @@ -695,7 +788,7 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) {
start = e
}

err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
err := local.SplitAndScatterRegionByRanges(ctx, ranges, true)
require.NoError(t, err)

startKey := codec.EncodeBytes([]byte{}, rangeKeys[0])
Expand Down

0 comments on commit d163cb0

Please sign in to comment.