Skip to content

Commit

Permalink
br,lightning: refine GetSplitKeysOfRegions (#52235)
Browse files Browse the repository at this point in the history
ref #51533
  • Loading branch information
lance6716 authored Apr 1, 2024
1 parent f7a8004 commit 3cfea6a
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 127 deletions.
8 changes: 2 additions & 6 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,10 @@ func (rs *RegionSplitter) executeSplitByKeys(
return err
}
splitKeyMap := split.GetSplitKeysOfRegions(sortedKeys, regions, splitContext.isRawKv)
regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
}
workerPool := util.NewWorkerPool(uint(splitContext.storeCount)+1, "split keys")
eg, ectx := errgroup.WithContext(ctx)
for regionID, splitKeys := range splitKeyMap {
region := regionMap[regionID]
for region, splitKeys := range splitKeyMap {
region := region
keys := splitKeys
sctx := splitContext
workerPool.ApplyOnErrorGroup(eg, func() error {
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ func (c *pdClient) hasHealthyRegion(ctx context.Context, regionID uint64) (bool,
func (c *pdClient) SplitWaitAndScatter(
ctx context.Context, region *RegionInfo, keys [][]byte,
) (*RegionInfo, []*RegionInfo, error) {
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
failpoint.Return(nil, nil, errors.New("retryable error"))
})
if len(keys) == 0 {
return region, []*RegionInfo{region}, nil
}
Expand Down
46 changes: 35 additions & 11 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,34 +288,58 @@ func (b *BackoffMayNotCountBackoffer) Attempt() int {
}

// GetSplitKeysOfRegions checks every input key is necessary to split region on
// it. Returns a map from original region ID to split keys belongs to each
// region.
// it. Returns a map from region to split keys belongs to it.
//
// The key will be skipped if it's the region boundary.
//
// prerequisite:
// - sortedKeys are sorted in ascending order.
// - sortedRegions are continuous and sorted in ascending order by start key.
// - sortedRegions can cover all keys in sortedKeys.
// PaginateScanRegion should satisfy the above prerequisites.
func GetSplitKeysOfRegions(sortedKeys [][]byte, sortedRegions []*RegionInfo, isRawKV bool) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte, len(sortedRegions))
func GetSplitKeysOfRegions(
sortedKeys [][]byte,
sortedRegions []*RegionInfo,
isRawKV bool,
) map[*RegionInfo][][]byte {
splitKeyMap := make(map[*RegionInfo][][]byte, len(sortedRegions))
curKeyIndex := 0
splitKey := codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)

for _, region := range sortedRegions {
for ; curKeyIndex < len(sortedKeys); curKeyIndex += 1 {
for {
if len(sortedKeys[curKeyIndex]) == 0 {
continue
// should not happen?
goto nextKey
}
splitKey := codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)
// If splitKey is the boundary of the region, don't need to split on it.
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
continue
goto nextKey
}
// If splitKey is not in a region, we should move to the next region.
// If splitKey is not in this region, we should move to the next region.
if !region.ContainsInterior(splitKey) {
break
}
regionID := region.Region.GetId()
splitKeyMap[regionID] = append(splitKeyMap[regionID], sortedKeys[curKeyIndex])

splitKeyMap[region] = append(splitKeyMap[region], sortedKeys[curKeyIndex])

nextKey:
curKeyIndex++
if curKeyIndex >= len(sortedKeys) {
return splitKeyMap
}
splitKey = codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)
}
}
lastKey := sortedKeys[len(sortedKeys)-1]
endOfLastRegion := sortedRegions[len(sortedRegions)-1].Region.GetEndKey()
if !bytes.Equal(lastKey, endOfLastRegion) {
log.Error("in getSplitKeysOfRegions, regions don't cover all keys",
zap.String("firstKey", hex.EncodeToString(sortedKeys[0])),
zap.String("lastKey", hex.EncodeToString(lastKey)),
zap.String("firstRegionStartKey", hex.EncodeToString(sortedRegions[0].Region.GetStartKey())),
zap.String("lastRegionEndKey", hex.EncodeToString(endOfLastRegion)),
)
}
return splitKeyMap
}
50 changes: 26 additions & 24 deletions br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,19 +359,14 @@ func TestGetSplitKeyPerRegion(t *testing.T) {
}
result := GetSplitKeysOfRegions(sortedKeys, sortedRegions, false)
require.Equal(t, 3, len(result))
require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[1])
require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[2])
require.Equal(t, [][]byte{[]byte("l")}, result[3])
require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[sortedRegions[0]])
require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[sortedRegions[1]])
require.Equal(t, [][]byte{[]byte("l")}, result[sortedRegions[2]])

// test case moved from lightning
sortedRegions = sortedRegions[:0]
tableID := int64(1)
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Id: 1,
StoreId: 1,
}
keys := []int64{10, 100, 500, 1000, 999999, -1}
keys := []int64{1, 10, 100, 1000, 10000, -1}
sortedRegions = make([]*RegionInfo, 0, len(keys))
start := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(0))
regionStart := codec.EncodeBytes([]byte{}, start)
for i, end := range keys {
Expand All @@ -382,26 +377,25 @@ func TestGetSplitKeyPerRegion(t *testing.T) {
}
region := &RegionInfo{
Region: &metapb.Region{
Id: uint64(i),
Peers: peers,
StartKey: regionStart,
EndKey: regionEndKey,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
Id: uint64(i),
StartKey: regionStart,
EndKey: regionEndKey,
},
}
sortedRegions = append(sortedRegions, region)
regionStart = regionEndKey
}

checkKeys := map[int64]int{
0: -1,
5: 0,
99: 1,
100: -1,
512: 3,
8888: 4,
999999: -1,
100000000: 5,
0: -1,
5: 1,
6: 1,
7: 1,
50: 2,
60: 2,
70: 2,
100: -1,
50000: 5,
}
expected := map[uint64][][]byte{}
sortedKeys = make([][]byte, 0, len(checkKeys))
Expand All @@ -414,9 +408,17 @@ func TestGetSplitKeyPerRegion(t *testing.T) {
}
expected[uint64(idx)] = append(expected[uint64(idx)], key)
}

slices.SortFunc(sortedKeys, bytes.Compare)
for i := range expected {
slices.SortFunc(expected[i], bytes.Compare)
}

got := GetSplitKeysOfRegions(sortedKeys, sortedRegions, false)
require.Equal(t, expected, got)
require.Equal(t, len(expected), len(got))
for region, gotKeys := range got {
require.Equal(t, expected[region.Region.GetId()], gotKeys)
}
}

func checkRegionsBoundaries(t *testing.T, regions []*RegionInfo, expected [][]byte) {
Expand Down
4 changes: 2 additions & 2 deletions br/tests/lightning_local_backend/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning


# First, verify that inject with not leader error is fine.
export GO_FAILPOINTS='github.com/pingcap/tidb/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader");github.com/pingcap/tidb/pkg/lightning/backend/local/failToSplit=5*return("")'
export GO_FAILPOINTS='github.com/pingcap/tidb/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader");github.com/pingcap/tidb/br/pkg/restore/split/failToSplit=5*return("");github.com/pingcap/tidb/pkg/lightning/backend/local/failToSplit=5*return("")'
rm -f "$TEST_DIR/lightning-local.log"
run_sql 'DROP DATABASE IF EXISTS cpeng;'
run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "$CUR/config.toml" -L debug
grep -Eq "split regions.*retryable error" "$TEST_DIR/lightning-local.log"
grep -q "retryable error" "$TEST_DIR/lightning-local.log"

# Check that everything is correctly imported
run_sql 'SELECT count(*), sum(c) FROM cpeng.a'
Expand Down
52 changes: 27 additions & 25 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,40 +840,42 @@ func (local *Backend) prepareAndSendJob(
// if all the kv can fit in one region, skip split regions. TiDB will split one region for
// the table when table is created.
needSplit := len(initialSplitRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
var err error
// split region by given ranges
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
needSplit = true
})
logger := log.FromContext(ctx).With(zap.String("uuid", engine.ID())).Begin(zap.InfoLevel, "split and scatter ranges")
backOffTime := 10 * time.Second
maxbackoffTime := 120 * time.Second
for i := 0; i < maxRetryTimes; i++ {
failpoint.Inject("skipSplitAndScatter", func() {
failpoint.Break()
})
if needSplit {
var err error
logger := log.FromContext(ctx).With(zap.String("uuid", engine.ID())).Begin(zap.InfoLevel, "split and scatter ranges")
backOffTime := 10 * time.Second
maxbackoffTime := 120 * time.Second
for i := 0; i < maxRetryTimes; i++ {
failpoint.Inject("skipSplitAndScatter", func() {
failpoint.Break()
})

err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, needSplit, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
}
err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
}

log.FromContext(ctx).Warn("split and scatter failed in retry", zap.String("engine ID", engine.ID()),
log.ShortError(err), zap.Int("retry", i))
select {
case <-time.After(backOffTime):
case <-ctx.Done():
return ctx.Err()
log.FromContext(ctx).Warn("split and scatter failed in retry", zap.String("engine ID", engine.ID()),
log.ShortError(err), zap.Int("retry", i))
select {
case <-time.After(backOffTime):
case <-ctx.Done():
return ctx.Err()
}
backOffTime *= 2
if backOffTime > maxbackoffTime {
backOffTime = maxbackoffTime
}
}
backOffTime *= 2
if backOffTime > maxbackoffTime {
backOffTime = maxbackoffTime
logger.End(zap.ErrorLevel, err)
if err != nil {
return err
}
}
logger.End(zap.ErrorLevel, err)
if err != nil {
return err
}

return local.generateAndSendJob(
ctx,
Expand Down
61 changes: 6 additions & 55 deletions pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (
"context"
"math"
"slices"
"sort"
"strings"
"sync"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -64,15 +62,14 @@ var (
func (local *Backend) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []common.Range,
needSplit bool,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, needSplit); err != nil {
if err := local.SplitAndScatterRegionByRanges(ctx, batch); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -86,7 +83,6 @@ func (local *Backend) SplitAndScatterRegionInBatches(
func (local *Backend) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []common.Range,
needSplit bool,
) (err error) {
if len(ranges) == 0 {
return nil
Expand Down Expand Up @@ -138,42 +134,12 @@ func (local *Backend) SplitAndScatterRegionByRanges(
log.FromContext(ctx).Info("paginate scan region finished", logutil.Key("minKey", minKey), logutil.Key("maxKey", maxKey),
zap.Int("regions", len(regions)))

if !needSplit {
scatterRegions = append(scatterRegions, regions...)
break
}

needSplitRanges := make([]common.Range, 0, len(ranges))
startKey := make([]byte, 0)
endKey := make([]byte, 0)
for _, r := range ranges {
startKey = codec.EncodeBytes(startKey, r.Start)
endKey = codec.EncodeBytes(endKey, r.End)
idx := sort.Search(len(regions), func(i int) bool {
return beforeEnd(startKey, regions[i].Region.EndKey)
})
if idx < 0 || idx >= len(regions) {
log.FromContext(ctx).Error("target region not found", logutil.Key("start_key", startKey),
logutil.RegionBy("first_region", regions[0].Region),
logutil.RegionBy("last_region", regions[len(regions)-1].Region))
return errors.New("target region not found")
}
if bytes.Compare(startKey, regions[idx].Region.StartKey) > 0 || bytes.Compare(endKey, regions[idx].Region.EndKey) < 0 {
needSplitRanges = append(needSplitRanges, r)
}
}
ranges = needSplitRanges
if len(ranges) == 0 {
log.FromContext(ctx).Info("no ranges need to be split, skipped.")
return nil
}

regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
}

var splitKeyMap map[uint64][][]byte
var splitKeyMap map[*split.RegionInfo][][]byte
if len(retryKeys) > 0 {
firstKeyEnc := codec.EncodeBytes([]byte{}, retryKeys[0])
lastKeyEnc := codec.EncodeBytes([]byte{}, retryKeys[len(retryKeys)-1])
Expand Down Expand Up @@ -224,7 +190,7 @@ func (local *Backend) SplitAndScatterRegionByRanges(
logutil.Key("regionStart", splitRegion.Region.StartKey), logutil.Key("regionEnd", splitRegion.Region.EndKey),
logutil.Region(splitRegion.Region), logutil.Leader(splitRegion.Leader))
}
splitRegion, newRegions, err1 = local.BatchSplitRegions(splitCtx, splitRegion, keys[startIdx:endIdx])
splitRegion, newRegions, err1 = local.splitCli.SplitWaitAndScatter(splitCtx, splitRegion, keys[startIdx:endIdx])
if err1 != nil {
if strings.Contains(err1.Error(), "no valid key") {
for _, key := range keys {
Expand Down Expand Up @@ -275,9 +241,9 @@ func (local *Backend) SplitAndScatterRegionByRanges(
})
}
sendLoop:
for regionID, keys := range splitKeyMap {
for region, keys := range splitKeyMap {
select {
case ch <- &splitInfo{region: regionMap[regionID], keys: keys}:
case ch <- &splitInfo{region: region, keys: keys}:
case <-ctx.Done():
// outer context is canceled, can directly return
close(ch)
Expand Down Expand Up @@ -320,21 +286,6 @@ func (local *Backend) SplitAndScatterRegionByRanges(
return nil
}

// BatchSplitRegions will split regions by the given split keys and tries to
// scatter new regions. If split/scatter fails because new region is not ready,
// this function will not return error.
func (local *Backend) BatchSplitRegions(
ctx context.Context,
region *split.RegionInfo,
keys [][]byte,
) (*split.RegionInfo, []*split.RegionInfo, error) {
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
failpoint.Return(nil, nil, errors.New("retryable error"))
})

return local.splitCli.SplitWaitAndScatter(ctx, region, keys)
}

func (local *Backend) hasRegion(ctx context.Context, regionID uint64) (bool, error) {
regionInfo, err := local.splitCli.GetRegionByID(ctx, regionID)
if err != nil {
Expand All @@ -343,7 +294,7 @@ func (local *Backend) hasRegion(ctx context.Context, regionID uint64) (bool, err
return regionInfo != nil, nil
}

func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo) map[uint64][][]byte {
func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo) map[*split.RegionInfo][][]byte {
checkKeys := make([][]byte, 0)
var lastEnd []byte
for _, rg := range ranges {
Expand Down
Loading

0 comments on commit 3cfea6a

Please sign in to comment.