Skip to content

Commit

Permalink
br,lightning: move GetSplitKeyPerRegion into shared package (#52052)
Browse files Browse the repository at this point in the history
ref #51533
  • Loading branch information
lance6716 authored Mar 25, 2024
1 parent ca2a568 commit 90b21d9
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 177 deletions.
52 changes: 4 additions & 48 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ func (local *Backend) SplitAndScatterRegionByRanges(
logutil.Key("lastRegionEnd", regions[len(regions)-1].Region.EndKey))
return errors.New("check split keys failed")
}
splitKeyMap = getSplitKeys(retryKeys, regions, log.FromContext(ctx))
splitKeyMap = split.GetSplitKeysOfRegions(retryKeys, regions, false)
retryKeys = retryKeys[:0]
} else {
splitKeyMap = getSplitKeysByRanges(ranges, regions, log.FromContext(ctx))
splitKeyMap = getSplitKeysByRanges(ranges, regions)
}

type splitInfo struct {
Expand Down Expand Up @@ -343,7 +343,7 @@ func (local *Backend) hasRegion(ctx context.Context, regionID uint64) (bool, err
return regionInfo != nil, nil
}

func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo, logger log.Logger) map[uint64][][]byte {
func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo) map[uint64][][]byte {
checkKeys := make([][]byte, 0)
var lastEnd []byte
for _, rg := range ranges {
Expand All @@ -353,51 +353,7 @@ func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo, lo
checkKeys = append(checkKeys, rg.End)
lastEnd = rg.End
}
return getSplitKeys(checkKeys, regions, logger)
}

func getSplitKeys(checkKeys [][]byte, regions []*split.RegionInfo, logger log.Logger) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte)
for _, key := range checkKeys {
if region := needSplit(key, regions, logger); region != nil {
splitKeys, ok := splitKeyMap[region.Region.GetId()]
if !ok {
splitKeys = make([][]byte, 0, 1)
}
splitKeyMap[region.Region.GetId()] = append(splitKeys, key)
logger.Debug("get key for split region",
zap.Binary("key", key),
zap.Binary("startKey", region.Region.StartKey),
zap.Binary("endKey", region.Region.EndKey))
}
}
return splitKeyMap
}

// needSplit checks whether a key is necessary to split, if true returns the split region
func needSplit(key []byte, regions []*split.RegionInfo, logger log.Logger) *split.RegionInfo {
// If splitKey is the max key.
if len(key) == 0 {
return nil
}
splitKey := codec.EncodeBytes([]byte{}, key)

idx := sort.Search(len(regions), func(i int) bool {
return beforeEnd(splitKey, regions[i].Region.EndKey)
})
if idx < len(regions) {
// If splitKey is in a region
if bytes.Compare(splitKey, regions[idx].Region.GetStartKey()) > 0 && beforeEnd(splitKey, regions[idx].Region.GetEndKey()) {
logger.Debug("need split",
zap.Binary("splitKey", key),
zap.Binary("encodedKey", splitKey),
zap.Binary("region start", regions[idx].Region.GetStartKey()),
zap.Binary("region end", regions[idx].Region.GetEndKey()),
)
return regions[idx]
}
}
return nil
return split.GetSplitKeysOfRegions(checkKeys, regions, false)
}

func beforeEnd(key []byte, end []byte) bool {
Expand Down
52 changes: 0 additions & 52 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,58 +752,6 @@ func TestBatchSplitByRangesWithClusteredIndexEpochNotMatch(t *testing.T) {
doTestBatchSplitByRangesWithClusteredIndex(t, &splitRegionEpochNotMatchHookRandom{})
}

func TestNeedSplit(t *testing.T) {
tableID := int64(1)
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Id: 1,
StoreId: 1,
}
keys := []int64{10, 100, 500, 1000, 999999, -1}
start := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(0))
regionStart := codec.EncodeBytes([]byte{}, start)
regions := make([]*split.RegionInfo, 0)
for _, end := range keys {
var regionEndKey []byte
if end >= 0 {
endKey := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(end))
regionEndKey = codec.EncodeBytes([]byte{}, endKey)
}
region := &split.RegionInfo{
Region: &metapb.Region{
Id: 1,
Peers: peers,
StartKey: regionStart,
EndKey: regionEndKey,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
},
}
regions = append(regions, region)
regionStart = regionEndKey
}

checkMap := map[int64]int{
0: -1,
5: 0,
99: 1,
100: -1,
512: 3,
8888: 4,
999999: -1,
100000000: 5,
}

for hdl, idx := range checkMap {
checkKey := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(hdl))
res := needSplit(checkKey, regions, log.L())
if idx < 0 {
require.Nil(t, res)
} else {
require.Equal(t, regions[idx], res)
}
}
}

func TestStoreWriteLimiter(t *testing.T) {
// Test create store write limiter with limit math.MaxInt.
limiter := newStoreWriteLimiter(math.MaxInt)
Expand Down
36 changes: 1 addition & 35 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (rs *RegionSplitter) executeSplitByKeys(
if err != nil {
return err
}
splitKeyMap := getSplitSortedKeysFromSortedRegions(splitContext, sortedKeys, regions)
splitKeyMap := split.GetSplitKeysOfRegions(sortedKeys, regions, splitContext.isRawKv)
regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
Expand Down Expand Up @@ -261,40 +261,6 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi
return leftRegions
}

// TestGetSplitSortedKeysFromSortedRegionsTest is used only in unit test
var TestGetSplitSortedKeysFromSortedRegionsTest = getSplitSortedKeysFromSortedRegions

// getSplitSortedKeysFromSortedRegions checks if the sorted regions should be split by the end key of
// the sorted ranges, and groups the split keys by region id.
//
// ASSERT: sortedRegions[0].StartKey <= sortedKeys[0]
func getSplitSortedKeysFromSortedRegions(splitContext SplitContext, sortedKeys [][]byte, sortedRegions []*split.RegionInfo) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte)
curKeyIndex := 0
for _, region := range sortedRegions {
for ; curKeyIndex < len(sortedKeys); curKeyIndex += 1 {
if len(sortedKeys[curKeyIndex]) == 0 {
continue
}
splitKey := codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], splitContext.isRawKv)
// If splitKey is the boundary of the region
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
continue
}
// If splitKey is not in a region
if !region.ContainsInterior(splitKey) {
break
}
splitKeys, ok := splitKeyMap[region.Region.GetId()]
if !ok {
splitKeys = make([][]byte, 0, 1)
}
splitKeyMap[region.Region.GetId()] = append(splitKeys, sortedKeys[curKeyIndex])
}
}
return splitKeyMap
}

func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *sst.RewriteRule) {
// We should search the dataRules firstly.
for _, rule := range rewriteRules.Data {
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//br/pkg/redact",
"//br/pkg/utils",
"//pkg/kv",
"//pkg/util/codec",
"//pkg/util/intest",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
Expand Down Expand Up @@ -52,10 +53,13 @@ go_test(
],
embed = [":split"],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
"//br/pkg/errors",
"//br/pkg/utils",
"//pkg/kv",
"//pkg/tablecodec",
"//pkg/util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/restore/split/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ type RegionInfo struct {
// that the key does not fall on the boundary (start key) of the region.
func (region *RegionInfo) ContainsInterior(key []byte) bool {
return bytes.Compare(key, region.Region.GetStartKey()) > 0 &&
(len(region.Region.GetEndKey()) == 0 ||
bytes.Compare(key, region.Region.GetEndKey()) < 0)
beforeEnd(key, region.Region.GetEndKey())
}

func beforeEnd(key []byte, end []byte) bool {
return bytes.Compare(key, end) < 0 || len(end) == 0
}

// ToZapFields returns zap fields for the RegionInfo. It can handle nil RegionInfo.
Expand Down
34 changes: 34 additions & 0 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/util/codec"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -285,3 +286,36 @@ func (b *BackoffMayNotCountBackoffer) NextBackoff(err error) time.Duration {
func (b *BackoffMayNotCountBackoffer) Attempt() int {
return b.state.Attempt()
}

// GetSplitKeysOfRegions checks every input key is necessary to split region on
// it. Returns a map from original region ID to split keys belongs to each
// region.
//
// prerequisite:
// - sortedKeys are sorted in ascending order.
// - sortedRegions are continuous and sorted in ascending order by start key.
// - sortedRegions can cover all keys in sortedKeys.
// PaginateScanRegion should satisfy the above prerequisites.
func GetSplitKeysOfRegions(sortedKeys [][]byte, sortedRegions []*RegionInfo, isRawKV bool) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte, len(sortedRegions))
curKeyIndex := 0
for _, region := range sortedRegions {
for ; curKeyIndex < len(sortedKeys); curKeyIndex += 1 {
if len(sortedKeys[curKeyIndex]) == 0 {
continue
}
splitKey := codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)
// If splitKey is the boundary of the region, don't need to split on it.
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
continue
}
// If splitKey is not in a region, we should move to the next region.
if !region.ContainsInterior(splitKey) {
break
}
regionID := region.Region.GetId()
splitKeyMap[regionID] = append(splitKeyMap[regionID], sortedKeys[curKeyIndex])
}
}
return splitKeyMap
}
99 changes: 99 additions & 0 deletions br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
package split

import (
"bytes"
"context"
goerrors "errors"
"slices"
"testing"
"time"

Expand All @@ -14,6 +16,9 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -316,3 +321,97 @@ func TestSplitCtxCancel(t *testing.T) {
_, _, err := client.SplitWaitAndScatter(ctx, &RegionInfo{}, [][]byte{{1}})
require.ErrorIs(t, err, context.Canceled)
}

func TestGetSplitKeyPerRegion(t *testing.T) {
// test case moved from BR
sortedKeys := [][]byte{
[]byte("b"),
[]byte("d"),
[]byte("g"),
[]byte("j"),
[]byte("l"),
}
sortedRegions := []*RegionInfo{
{
Region: &metapb.Region{
Id: 1,
StartKey: []byte("a"),
EndKey: []byte("g"),
},
},
{
Region: &metapb.Region{
Id: 2,
StartKey: []byte("g"),
EndKey: []byte("k"),
},
},
{
Region: &metapb.Region{
Id: 3,
StartKey: []byte("k"),
EndKey: []byte("m"),
},
},
}
result := GetSplitKeysOfRegions(sortedKeys, sortedRegions, false)
require.Equal(t, 3, len(result))
require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[1])
require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[2])
require.Equal(t, [][]byte{[]byte("l")}, result[3])

// test case moved from lightning
sortedRegions = sortedRegions[:0]
tableID := int64(1)
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Id: 1,
StoreId: 1,
}
keys := []int64{10, 100, 500, 1000, 999999, -1}
start := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(0))
regionStart := codec.EncodeBytes([]byte{}, start)
for i, end := range keys {
var regionEndKey []byte
if end >= 0 {
endKey := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(end))
regionEndKey = codec.EncodeBytes([]byte{}, endKey)
}
region := &RegionInfo{
Region: &metapb.Region{
Id: uint64(i),
Peers: peers,
StartKey: regionStart,
EndKey: regionEndKey,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
},
}
sortedRegions = append(sortedRegions, region)
regionStart = regionEndKey
}

checkKeys := map[int64]int{
0: -1,
5: 0,
99: 1,
100: -1,
512: 3,
8888: 4,
999999: -1,
100000000: 5,
}
expected := map[uint64][][]byte{}
sortedKeys = make([][]byte, 0, len(checkKeys))

for hdl, idx := range checkKeys {
key := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(hdl))
sortedKeys = append(sortedKeys, key)
if idx < 0 {
continue
}
expected[uint64(idx)] = append(expected[uint64(idx)], key)
}
slices.SortFunc(sortedKeys, bytes.Compare)
got := GetSplitKeysOfRegions(sortedKeys, sortedRegions, false)
require.Equal(t, expected, got)
}
Loading

0 comments on commit 90b21d9

Please sign in to comment.