diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index d76b108d3b134..437d1626a4357 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -214,12 +214,13 @@ func NewRestoreClient( pdHTTPCli pdhttp.Client, tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters, - isRawKv bool, ) *Client { return &Client{ - pdClient: pdClient, - pdHTTPClient: pdHTTPCli, - toolClient: split.NewSplitClient(pdClient, pdHTTPCli, tlsConf, isRawKv, maxSplitKeysOnce), + pdClient: pdClient, + pdHTTPClient: pdHTTPCli, + // toolClient reuse the split.SplitClient to do miscellaneous things. It doesn't + // call split related functions so set the arguments to arbitrary values. + toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3), tlsConf: tlsConf, keepaliveConf: keepaliveConf, switchCh: make(chan struct{}), @@ -555,7 +556,11 @@ func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBack useTokenBucket = true } - metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode, maxSplitKeysOnce) + var splitClientOpts []split.ClientOptionalParameter + if isRawKvMode { + splitClientOpts = append(splitClientOpts, split.WithRawKV()) + } + metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, rc.GetStoreCount()+1, splitClientOpts...) importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, stores, rc.rewriteMode, concurrencyPerStore, useTokenBucket) } @@ -1423,7 +1428,7 @@ func (rc *Client) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor() splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx) log.Info("get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys)) - client := split.NewSplitClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), false, maxSplitKeysOnce) + client := split.NewClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), maxSplitKeysOnce, 3) return NewLogFilesIterWithSplitHelper(logIter, rules, client, splitSize, splitKeys), nil } diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index dcd819d4fe281..205e9203d810a 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -47,7 +47,7 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{ func TestCreateTables(t *testing.T) { m := mc g := gluetidb.New() - client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg) err := client.Init(g, m.Storage) require.NoError(t, err) @@ -106,7 +106,7 @@ func TestCreateTables(t *testing.T) { func TestIsOnline(t *testing.T) { m := mc g := gluetidb.New() - client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg) err := client.Init(g, m.Storage) require.NoError(t, err) @@ -130,7 +130,7 @@ func TestNeedCheckTargetClusterFresh(t *testing.T) { defer cluster.Stop() g := gluetidb.New() - client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -160,7 +160,7 @@ func TestCheckTargetClusterFresh(t *testing.T) { defer cluster.Stop() g := gluetidb.New() - client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -177,7 +177,7 @@ func TestCheckTargetClusterFreshWithTable(t *testing.T) { defer cluster.Stop() g := gluetidb.New() - client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -212,7 +212,7 @@ func TestCheckTargetClusterFreshWithTable(t *testing.T) { func TestCheckSysTableCompatibility(t *testing.T) { cluster := mc g := gluetidb.New() - client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -288,7 +288,7 @@ func TestCheckSysTableCompatibility(t *testing.T) { func TestInitFullClusterRestore(t *testing.T) { cluster := mc g := gluetidb.New() - client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -313,7 +313,7 @@ func TestInitFullClusterRestore(t *testing.T) { func TestPreCheckTableClusterIndex(t *testing.T) { m := mc g := gluetidb.New() - client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg) err := client.Init(g, m.Storage) require.NoError(t, err) @@ -402,7 +402,7 @@ func TestGetTSWithRetry(t *testing.T) { t.Run("PD leader is healthy:", func(t *testing.T) { retryTimes := -1000 pDClient := fakePDClient{notLeader: false, retryTimes: &retryTimes} - client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg) _, err := client.GetTSWithRetry(context.Background()) require.NoError(t, err) }) @@ -410,7 +410,7 @@ func TestGetTSWithRetry(t *testing.T) { t.Run("PD leader failure:", func(t *testing.T) { retryTimes := -1000 pDClient := fakePDClient{notLeader: true, retryTimes: &retryTimes} - client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg) _, err := client.GetTSWithRetry(context.Background()) require.Error(t, err) }) @@ -418,7 +418,7 @@ func TestGetTSWithRetry(t *testing.T) { t.Run("PD leader switch successfully", func(t *testing.T) { retryTimes := 0 pDClient := fakePDClient{notLeader: true, retryTimes: &retryTimes} - client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false) + client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg) _, err := client.GetTSWithRetry(context.Background()) require.NoError(t, err) }) @@ -450,7 +450,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) { g := gluetidb.New() client := restore.NewRestoreClient(fakePDClient{ stores: mockStores, - }, nil, nil, defaultKeepaliveCfg, false) + }, nil, nil, defaultKeepaliveCfg) err := client.Init(g, m.Storage) require.NoError(t, err) @@ -574,7 +574,7 @@ func TestSetSpeedLimit(t *testing.T) { // 1. The cost of concurrent communication is expected to be less than the cost of serial communication. client := restore.NewRestoreClient(fakePDClient{ stores: mockStores, - }, nil, nil, defaultKeepaliveCfg, false) + }, nil, nil, defaultKeepaliveCfg) ctx := context.Background() recordStores = NewRecordStores() @@ -600,7 +600,7 @@ func TestSetSpeedLimit(t *testing.T) { mockStores[5].Id = SET_SPEED_LIMIT_ERROR // setting a fault store client = restore.NewRestoreClient(fakePDClient{ stores: mockStores, - }, nil, nil, defaultKeepaliveCfg, false) + }, nil, nil, defaultKeepaliveCfg) // Concurrency needs to be less than the number of stores err = restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 2) @@ -680,7 +680,7 @@ func TestDeleteRangeQuery(t *testing.T) { g := gluetidb.New() client := restore.NewRestoreClient(fakePDClient{ stores: mockStores, - }, nil, nil, defaultKeepaliveCfg, false) + }, nil, nil, defaultKeepaliveCfg) err := client.Init(g, m.Storage) require.NoError(t, err) @@ -730,7 +730,7 @@ func TestDeleteRangeQueryExec(t *testing.T) { client := restore.NewRestoreClient(fakePDClient{ stores: mockStores, retryTimes: &retryCnt, - }, nil, nil, defaultKeepaliveCfg, false) + }, nil, nil, defaultKeepaliveCfg) err := client.Init(g, m.Storage) require.NoError(t, err) diff --git a/br/pkg/restore/import_retry_test.go b/br/pkg/restore/import_retry_test.go index 411af37c46d2a..c10a6c56b14c8 100644 --- a/br/pkg/restore/import_retry_test.go +++ b/br/pkg/restore/import_retry_test.go @@ -50,6 +50,43 @@ func assertRegions(t *testing.T, regions []*split.RegionInfo, keys ...string) { } } +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +func initTestClient(isRawKv bool) *TestClient { + peers := make([]*metapb.Peer, 1) + peers[0] = &metapb.Peer{ + Id: 1, + StoreId: 1, + } + keys := [6]string{"", "aay", "bba", "bbh", "cca", ""} + regions := make(map[uint64]*split.RegionInfo) + for i := uint64(1); i < 6; i++ { + startKey := []byte(keys[i-1]) + if len(startKey) != 0 { + startKey = codec.EncodeBytesExt([]byte{}, startKey, isRawKv) + } + endKey := []byte(keys[i]) + if len(endKey) != 0 { + endKey = codec.EncodeBytesExt([]byte{}, endKey, isRawKv) + } + regions[i] = &split.RegionInfo{ + Leader: &metapb.Peer{ + Id: i, + }, + Region: &metapb.Region{ + Id: i, + Peers: peers, + StartKey: startKey, + EndKey: endKey, + }, + } + } + stores := make(map[uint64]*metapb.Store) + stores[1] = &metapb.Store{ + Id: 1, + } + return NewTestClient(stores, regions, 6) +} + func TestScanSuccess(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index cc855a05a2266..456c3f0cc4056 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -36,12 +36,6 @@ const ( maxSplitKeysOnce = 10240 ) -type SplitContext struct { - isRawKv bool - storeCount int - onSplit OnSplitFunc -} - // RegionSplitter is a executor of region split by rules. type RegionSplitter struct { client split.SplitClient @@ -66,9 +60,6 @@ type OnSplitFunc func(key [][]byte) func (rs *RegionSplitter) ExecuteSplit( ctx context.Context, ranges []rtree.Range, - storeCount int, - isRawKv bool, - onSplit OnSplitFunc, ) error { if len(ranges) == 0 { log.Info("skip split regions, no range") @@ -97,22 +88,12 @@ func (rs *RegionSplitter) ExecuteSplit( sortedKeys = append(sortedKeys, r.EndKey) totalRangeSize += r.Size } - // need use first range's start key to scan region - // and the range size must be greater than 0 here - scanStartKey := sortedRanges[0].StartKey - sctx := SplitContext{ - isRawKv: isRawKv, - onSplit: onSplit, - storeCount: storeCount, - } // the range size must be greater than 0 here - return rs.executeSplitByRanges(ctx, sctx, scanStartKey, sortedKeys) + return rs.executeSplitByRanges(ctx, sortedKeys) } func (rs *RegionSplitter) executeSplitByRanges( ctx context.Context, - splitContext SplitContext, - scanStartKey []byte, sortedKeys [][]byte, ) error { startTime := time.Now() @@ -125,14 +106,14 @@ func (rs *RegionSplitter) executeSplitByRanges( roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex]) } if len(roughSortedSplitKeys) > 0 { - if err := rs.executeSplitByKeys(ctx, splitContext, scanStartKey, roughSortedSplitKeys); err != nil { + if err := rs.executeSplitByKeys(ctx, roughSortedSplitKeys); err != nil { return errors.Trace(err) } } log.Info("finish spliting regions roughly", zap.Duration("take", time.Since(startTime))) // Then send split requests to each TiKV. - if err := rs.executeSplitByKeys(ctx, splitContext, scanStartKey, sortedKeys); err != nil { + if err := rs.executeSplitByKeys(ctx, sortedKeys); err != nil { return errors.Trace(err) } @@ -143,65 +124,13 @@ func (rs *RegionSplitter) executeSplitByRanges( // executeSplitByKeys will split regions by **sorted** keys with following steps. // 1. locate regions with correspond keys. // 2. split these regions with correspond keys. -// 3. make sure new splitted regions are balanced. +// 3. make sure new split regions are balanced. func (rs *RegionSplitter) executeSplitByKeys( ctx context.Context, - splitContext SplitContext, - scanStartKey []byte, sortedKeys [][]byte, ) error { - var mutex sync.Mutex startTime := time.Now() - minKey := codec.EncodeBytesExt(nil, scanStartKey, splitContext.isRawKv) - maxKey := codec.EncodeBytesExt(nil, sortedKeys[len(sortedKeys)-1], splitContext.isRawKv) - scatterRegions := make([]*split.RegionInfo, 0) - regionsMap := make(map[uint64]*split.RegionInfo) - - err := utils.WithRetry(ctx, func() error { - clear(regionsMap) - regions, err := split.PaginateScanRegion(ctx, rs.client, minKey, maxKey, split.ScanRegionPaginationLimit) - if err != nil { - return err - } - splitKeyMap := split.GetSplitKeysOfRegions(sortedKeys, regions, splitContext.isRawKv) - workerPool := util.NewWorkerPool(uint(splitContext.storeCount)+1, "split keys") - eg, ectx := errgroup.WithContext(ctx) - for region, splitKeys := range splitKeyMap { - region := region - keys := splitKeys - sctx := splitContext - workerPool.ApplyOnErrorGroup(eg, func() error { - log.Info("get split keys for split regions", - logutil.Region(region.Region), logutil.Keys(keys)) - newRegions, err := rs.splitAndScatterRegions(ectx, region, keys, sctx.isRawKv) - if err != nil { - return err - } - if len(newRegions) != len(keys) { - log.Warn("split key count and new region count mismatch", - zap.Int("new region count", len(newRegions)), - zap.Int("split key count", len(keys))) - } - log.Info("scattered regions", zap.Int("count", len(newRegions))) - mutex.Lock() - for _, r := range newRegions { - regionsMap[r.Region.Id] = r - } - mutex.Unlock() - sctx.onSplit(keys) - return nil - }) - } - err = eg.Wait() - if err != nil { - return err - } - for _, r := range regionsMap { - // merge all scatter regions - scatterRegions = append(scatterRegions, r) - } - return nil - }, newSplitBackoffer()) + scatterRegions, err := rs.client.SplitKeysAndScatter(ctx, sortedKeys) if err != nil { return errors.Trace(err) } @@ -215,13 +144,6 @@ func (rs *RegionSplitter) executeSplitByKeys( return nil } -func (rs *RegionSplitter) splitAndScatterRegions( - ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, _ bool, -) ([]*split.RegionInfo, error) { - newRegions, err := rs.client.SplitWaitAndScatter(ctx, regionInfo, keys) - return newRegions, err -} - // waitRegionsScattered try to wait mutilple regions scatterd in 3 minutes. // this could timeout, but if many regions scatterd the restore could continue // so we don't wait long time here. @@ -409,7 +331,7 @@ func (helper *LogSplitHelper) splitRegionByPoints( } helper.pool.ApplyOnErrorGroup(helper.eg, func() error { - newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints, false) + newRegions, errSplit := regionSplitter.client.SplitWaitAndScatter(ctx, region, splitPoints) if errSplit != nil { log.Warn("failed to split the scaned region", zap.Error(errSplit)) _, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil) @@ -419,7 +341,7 @@ func (helper *LogSplitHelper) splitRegionByPoints( startKey = point } - return regionSplitter.ExecuteSplit(ctx, ranges, 3, false, func([][]byte) {}) + return regionSplitter.ExecuteSplit(ctx, ranges) } select { case <-ctx.Done(): diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index d56c1f7f9888f..448f67d62a579 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -58,7 +58,7 @@ go_test( ], embed = [":split"], flaky = True, - shard_count = 17, + shard_count = 18, deps = [ "//br/pkg/errors", "//br/pkg/utils", diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index bb9f6e497445a..6a7df1d105873 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -121,28 +121,25 @@ type pdClient struct { needScatterInit sync.Once isRawKv bool + onSplit func(key [][]byte) splitConcurrency int splitBatchKeyCnt int } -// NewSplitClient returns a client used by RegionSplitter. -// TODO(lance6716): replace this function with NewClient. -func NewSplitClient( - client pd.Client, - httpCli pdhttp.Client, - tlsConf *tls.Config, - isRawKv bool, - splitBatchKeyCnt int, -) SplitClient { - cli := &pdClient{ - client: client, - httpCli: httpCli, - tlsConf: tlsConf, - storeCache: make(map[uint64]*metapb.Store), - isRawKv: isRawKv, - splitBatchKeyCnt: splitBatchKeyCnt, +type ClientOptionalParameter func(*pdClient) + +// WithRawKV sets the client to use raw kv mode. +func WithRawKV() ClientOptionalParameter { + return func(c *pdClient) { + c.isRawKv = true + } +} + +// WithOnSplit sets a callback function to be called after each split. +func WithOnSplit(onSplit func(key [][]byte)) ClientOptionalParameter { + return func(c *pdClient) { + c.onSplit = onSplit } - return cli } // NewClient creates a SplitClient. @@ -153,19 +150,21 @@ func NewClient( client pd.Client, httpCli pdhttp.Client, tlsConf *tls.Config, - isRawKv bool, splitBatchKeyCnt int, splitConcurrency int, + opts ...ClientOptionalParameter, ) SplitClient { cli := &pdClient{ client: client, httpCli: httpCli, tlsConf: tlsConf, storeCache: make(map[uint64]*metapb.Store), - isRawKv: isRawKv, splitBatchKeyCnt: splitBatchKeyCnt, splitConcurrency: splitConcurrency, } + for _, opt := range opts { + opt(cli) + } return cli } @@ -540,12 +539,15 @@ func (c *pdClient) SplitKeysAndScatter(ctx context.Context, sortedSplitKeys [][] } // we need to find the regions that contain the split keys. However, the scan // region API accepts a key range [start, end) where end key is exclusive, and if - // sortedSplitKeys length is 1, we scan region may return empty result. So we + // sortedSplitKeys length is 1, scan region may return empty result. So we // increase the end key a bit. If the end key is on the region boundaries, it // will be skipped by getSplitKeysOfRegions. scanStart := codec.EncodeBytesExt(nil, sortedSplitKeys[0], c.isRawKv) lastKey := kv.Key(sortedSplitKeys[len(sortedSplitKeys)-1]) - scanEnd := codec.EncodeBytesExt(nil, lastKey.Next(), c.isRawKv) + if len(lastKey) > 0 { + lastKey = lastKey.Next() + } + scanEnd := codec.EncodeBytesExt(nil, lastKey, c.isRawKv) // mu protects ret, retrySplitKeys, lastSplitErr mu := sync.Mutex{} @@ -575,7 +577,7 @@ func (c *pdClient) SplitKeysAndScatter(ctx context.Context, sortedSplitKeys [][] allSplitKeys = retrySplitKeys retrySplitKeys = retrySplitKeys[:0] } - splitKeyMap := GetSplitKeysOfRegions(allSplitKeys, regions, c.isRawKv) + splitKeyMap := getSplitKeysOfRegions(allSplitKeys, regions, c.isRawKv) workerPool := tidbutil.NewWorkerPool(uint(c.splitConcurrency), "split keys") eg, eCtx := errgroup.WithContext(ctx) for region, splitKeys := range splitKeyMap { @@ -683,6 +685,9 @@ func (c *pdClient) SplitWaitAndScatter(ctx context.Context, region *RegionInfo, zap.Error(err), ) } + if c.onSplit != nil { + c.onSplit(keys[start:end]) + } // the region with the max start key is the region need to be further split, // depending on the origin region is the first region or last region, we need to diff --git a/br/pkg/restore/split/client_test.go b/br/pkg/restore/split/client_test.go index 432424960529b..e881078777224 100644 --- a/br/pkg/restore/split/client_test.go +++ b/br/pkg/restore/split/client_test.go @@ -181,6 +181,41 @@ func TestSplitScatterRawKV(t *testing.T) { require.Equal(t, len(result)-3, mockPDClient.scatterRegions.regionCount) } +func TestSplitScatterEmptyEndKey(t *testing.T) { + mockPDClient := NewMockPDClientForSplit() + keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} + mockPDClient.SetRegions(keys) + mockClient := &pdClient{ + client: mockPDClient, + splitConcurrency: 10, + splitBatchKeyCnt: 100, + isRawKv: true, // make tests more readable + } + ctx := context.Background() + + splitKeys := [][]byte{{'b'}, {'c'}, {}} + + _, err := mockClient.SplitKeysAndScatter(ctx, splitKeys) + require.NoError(t, err) + + // check split ranges + regions, err := PaginateScanRegion(ctx, mockClient, []byte{}, []byte{}, 5) + require.NoError(t, err) + result := [][]byte{ + []byte(""), []byte("aay"), + []byte("b"), []byte("bba"), []byte("bbh"), + []byte("c"), []byte("cca"), []byte(""), + } + checkRegionsBoundaries(t, regions, result) + + // test only one split key which is empty + _, err = mockClient.SplitKeysAndScatter(ctx, [][]byte{{}}) + require.NoError(t, err) + regions, err = PaginateScanRegion(ctx, mockClient, []byte{}, []byte{}, 5) + require.NoError(t, err) + checkRegionsBoundaries(t, regions, result) +} + func TestScanRegionEmptyResult(t *testing.T) { backup := WaitRegionOnlineAttemptTimes WaitRegionOnlineAttemptTimes = 2 diff --git a/br/pkg/restore/split/mock_pd_client.go b/br/pkg/restore/split/mock_pd_client.go index 6b5e116b9ef95..cc01d68ecfc45 100644 --- a/br/pkg/restore/split/mock_pd_client.go +++ b/br/pkg/restore/split/mock_pd_client.go @@ -181,6 +181,9 @@ func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) ( c.mu.Lock() defer c.mu.Unlock() + if c.getOperator.responses == nil { + return &pdpb.GetOperatorResponse{Desc: []byte("scatter-region"), Status: pdpb.OperatorStatus_SUCCESS}, nil + } ret := c.getOperator.responses[regionID][0] c.getOperator.responses[regionID] = c.getOperator.responses[regionID][1:] return ret, nil diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index 36e831ae91a77..97197df839ccb 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -36,7 +36,7 @@ const ( ScanRegionPaginationLimit = 128 ) -func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { +func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { // current pd can't guarantee the consistency of returned regions if len(regions) == 0 { return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s", @@ -119,7 +119,7 @@ func PaginateScanRegion( } lastRegions = regions - if err = CheckRegionConsistency(startKey, endKey, regions); err != nil { + if err = checkRegionConsistency(startKey, endKey, regions); err != nil { log.Warn("failed to scan region, retrying", logutil.ShortError(err), zap.Int("regionLength", len(regions))) @@ -276,7 +276,7 @@ func (b *BackoffMayNotCountBackoffer) Attempt() int { return b.state.Attempt() } -// GetSplitKeysOfRegions checks every input key is necessary to split region on +// getSplitKeysOfRegions checks every input key is necessary to split region on // it. Returns a map from region to split keys belongs to it. // // The key will be skipped if it's the region boundary. @@ -286,7 +286,7 @@ func (b *BackoffMayNotCountBackoffer) Attempt() int { // - sortedRegions are continuous and sorted in ascending order by start key. // - sortedRegions can cover all keys in sortedKeys. // PaginateScanRegion should satisfy the above prerequisites. -func GetSplitKeysOfRegions( +func getSplitKeysOfRegions( sortedKeys [][]byte, sortedRegions []*RegionInfo, isRawKV bool, diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index fb9f2a997650d..077cdcdd1cb54 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -357,7 +357,7 @@ func TestGetSplitKeyPerRegion(t *testing.T) { }, }, } - result := GetSplitKeysOfRegions(sortedKeys, sortedRegions, false) + result := getSplitKeysOfRegions(sortedKeys, sortedRegions, false) require.Equal(t, 3, len(result)) require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[sortedRegions[0]]) require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[sortedRegions[1]]) @@ -414,7 +414,7 @@ func TestGetSplitKeyPerRegion(t *testing.T) { slices.SortFunc(expected[i], bytes.Compare) } - got := GetSplitKeysOfRegions(sortedKeys, sortedRegions, false) + got := getSplitKeysOfRegions(sortedKeys, sortedRegions, false) require.Equal(t, len(expected), len(got)) for region, gotKeys := range got { require.Equal(t, expected[region.Region.GetId()], gotKeys) @@ -608,7 +608,7 @@ func TestRegionConsistency(t *testing.T) { }, } for _, ca := range cases { - err := CheckRegionConsistency(ca.startKey, ca.endKey, ca.regions) + err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions) require.Error(t, err) require.Regexp(t, ca.err, err.Error()) } diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 085686c7a445e..ee54ea291c54c 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -164,14 +164,16 @@ func (c *TestClient) WaitRegionsScattered(context.Context, []*split.RegionInfo) } func TestScanEmptyRegion(t *testing.T) { - client := initTestClient(false) + mockPDCli := split.NewMockPDClientForSplit() + mockPDCli.SetRegions([][]byte{{}, {12}, {34}, {}}) + client := split.NewClient(mockPDCli, nil, nil, 100, 4) ranges := initRanges() // make ranges has only one ranges = ranges[0:1] regionSplitter := NewRegionSplitter(client) ctx := context.Background() - err := regionSplitter.ExecuteSplit(ctx, ranges, 1, false, func(key [][]byte) {}) + err := regionSplitter.ExecuteSplit(ctx, ranges) // should not return error with only one range entry require.NoError(t, err) } @@ -184,26 +186,39 @@ func TestScanEmptyRegion(t *testing.T) { // [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), // [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, ) func TestSplitAndScatter(t *testing.T) { - client := initTestClient(false) - ranges := initRanges() + rangeBoundaries := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} + encodeBytes(rangeBoundaries) + mockPDCli := split.NewMockPDClientForSplit() + mockPDCli.SetRegions(rangeBoundaries) + client := split.NewClient(mockPDCli, nil, nil, 100, 4) regionSplitter := NewRegionSplitter(client) ctx := context.Background() + ranges := initRanges() rules := initRewriteRules() for i, rg := range ranges { tmp, err := RewriteRange(&rg, rules) require.NoError(t, err) ranges[i] = *tmp } - err := regionSplitter.ExecuteSplit(ctx, ranges, 1, false, func(key [][]byte) {}) + err := regionSplitter.ExecuteSplit(ctx, ranges) require.NoError(t, err) - regions := client.GetAllRegions() - if !validateRegions(regions) { - for _, region := range regions { - t.Logf("region: %v\n", region.Region) + regions := mockPDCli.Regions.ScanRange(nil, nil, 100) + expected := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbf"), []byte("bbh"), []byte("bbj"), []byte("cca"), []byte("xxe"), []byte("xxz"), []byte("")} + encodeBytes(expected) + require.Len(t, regions, len(expected)-1) + for i, region := range regions { + require.Equal(t, expected[i], region.Meta.StartKey) + require.Equal(t, expected[i+1], region.Meta.EndKey) + } +} + +func encodeBytes(keys [][]byte) { + for i := range keys { + if len(keys[i]) == 0 { + continue } - t.Log("get wrong result") - t.Fail() + keys[i] = codec.EncodeBytes(nil, keys[i]) } } @@ -215,58 +230,22 @@ func TestRawSplit(t *testing.T) { EndKey: []byte{}, }, } - client := initTestClient(true) ctx := context.Background() + rangeBoundaries := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} + mockPDCli := split.NewMockPDClientForSplit() + mockPDCli.SetRegions(rangeBoundaries) + client := split.NewClient(mockPDCli, nil, nil, 100, 4, split.WithRawKV()) regionSplitter := NewRegionSplitter(client) - err := regionSplitter.ExecuteSplit(ctx, ranges, 1, true, func(key [][]byte) {}) + err := regionSplitter.ExecuteSplit(ctx, ranges) require.NoError(t, err) - regions := client.GetAllRegions() - expectedKeys := []string{"", "aay", "bba", "bbh", "cca", ""} - if !validateRegionsExt(regions, expectedKeys, true) { - for _, region := range regions { - t.Logf("region: %v\n", region.Region) - } - t.Log("get wrong result") - t.Fail() - } -} -// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) -func initTestClient(isRawKv bool) *TestClient { - peers := make([]*metapb.Peer, 1) - peers[0] = &metapb.Peer{ - Id: 1, - StoreId: 1, - } - keys := [6]string{"", "aay", "bba", "bbh", "cca", ""} - regions := make(map[uint64]*split.RegionInfo) - for i := uint64(1); i < 6; i++ { - startKey := []byte(keys[i-1]) - if len(startKey) != 0 { - startKey = codec.EncodeBytesExt([]byte{}, startKey, isRawKv) - } - endKey := []byte(keys[i]) - if len(endKey) != 0 { - endKey = codec.EncodeBytesExt([]byte{}, endKey, isRawKv) - } - regions[i] = &split.RegionInfo{ - Leader: &metapb.Peer{ - Id: i, - }, - Region: &metapb.Region{ - Id: i, - Peers: peers, - StartKey: startKey, - EndKey: endKey, - }, - } - } - stores := make(map[uint64]*metapb.Store) - stores[1] = &metapb.Store{ - Id: 1, + regions := mockPDCli.Regions.ScanRange(nil, nil, 100) + require.Len(t, regions, len(rangeBoundaries)-1) + for i, region := range regions { + require.Equal(t, rangeBoundaries[i], region.Meta.StartKey) + require.Equal(t, rangeBoundaries[i+1], region.Meta.EndKey) } - return NewTestClient(stores, regions, 6) } // range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) @@ -306,40 +285,6 @@ func initRewriteRules() *RewriteRules { } } -// expected regions after split: -// -// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), -// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, ) -func validateRegions(regions map[uint64]*split.RegionInfo) bool { - keys := [...]string{"", "aay", "bba", "bbf", "bbh", "bbj", "cca", "xxe", "xxz", ""} - return validateRegionsExt(regions, keys[:], false) -} - -func validateRegionsExt(regions map[uint64]*split.RegionInfo, expectedKeys []string, isRawKv bool) bool { - if len(regions) != len(expectedKeys)-1 { - return false - } -FindRegion: - for i := 1; i < len(expectedKeys); i++ { - for _, region := range regions { - startKey := []byte(expectedKeys[i-1]) - if len(startKey) != 0 { - startKey = codec.EncodeBytesExt([]byte{}, startKey, isRawKv) - } - endKey := []byte(expectedKeys[i]) - if len(endKey) != 0 { - endKey = codec.EncodeBytesExt([]byte{}, endKey, isRawKv) - } - if bytes.Equal(region.Region.GetStartKey(), startKey) && - bytes.Equal(region.Region.GetEndKey(), endKey) { - continue FindRegion - } - } - return false - } - return true -} - type fakeRestorer struct { mu sync.Mutex diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 0d0e1dceaf6e1..9d8855546a6f8 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -510,19 +510,26 @@ func SplitRanges( updateCh glue.Progress, isRawKv bool, ) error { - splitter := NewRegionSplitter(split.NewSplitClient( + splitClientOpts := make([]split.ClientOptionalParameter, 0, 2) + splitClientOpts = append(splitClientOpts, split.WithOnSplit(func(keys [][]byte) { + for range keys { + updateCh.Inc() + } + })) + if isRawKv { + splitClientOpts = append(splitClientOpts, split.WithRawKV()) + } + + splitter := NewRegionSplitter(split.NewClient( client.GetPDClient(), client.pdHTTPClient, client.GetTLSConfig(), - isRawKv, maxSplitKeysOnce, + client.GetStoreCount()+1, + splitClientOpts..., )) - return splitter.ExecuteSplit(ctx, ranges, client.GetStoreCount(), isRawKv, func(keys [][]byte) { - for range keys { - updateCh.Inc() - } - }) + return splitter.ExecuteSplit(ctx, ranges) } func findMatchedRewriteRule(file AppliedFile, rules *RewriteRules) *import_sstpb.RewriteRule { diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 239791ad1a049..c0230a00d2481 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -734,13 +734,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) keepaliveCfg.PermitWithoutStream = true - client := restore.NewRestoreClient( - mgr.GetPDClient(), - mgr.GetPDHTTPClient(), - mgr.GetTLSConfig(), - keepaliveCfg, - false, - ) + client := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), keepaliveCfg) // using tikv config to set the concurrency-per-store for client. client.SetConcurrencyPerStore(kvConfigs.ImportGoroutines.Value) err = configureRestoreClient(ctx, client, cfg) diff --git a/br/pkg/task/restore_data.go b/br/pkg/task/restore_data.go index fce8d7793b5d8..ff19f19ecac3f 100644 --- a/br/pkg/task/restore_data.go +++ b/br/pkg/task/restore_data.go @@ -76,13 +76,7 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto tc.EnableGlobalKill = false tidbconfig.StoreGlobalConfig(tc) - client := restore.NewRestoreClient( - mgr.GetPDClient(), - mgr.GetPDHTTPClient(), - mgr.GetTLSConfig(), - keepaliveCfg, - false, - ) + client := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), keepaliveCfg) restoreTS, err := client.GetTS(ctx) if err != nil { diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 5b9b009853a02..50360d4b4e01e 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -91,13 +91,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR // sometimes we have pooled the connections. // sending heartbeats in idle times is useful. keepaliveCfg.PermitWithoutStream = true - client := restore.NewRestoreClient( - mgr.GetPDClient(), - mgr.GetPDHTTPClient(), - mgr.GetTLSConfig(), - keepaliveCfg, - true, - ) + client := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), keepaliveCfg) client.SetRateLimit(cfg.RateLimit) client.SetCrypter(&cfg.CipherInfo) client.SetConcurrency(uint(cfg.Concurrency)) diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 9301c1ea88f39..490f0d1546fa5 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -59,7 +59,7 @@ func TestConfigureRestoreClient(t *testing.T) { RestoreCommonConfig: restoreComCfg, DdlBatchSize: 128, } - client := restore.NewRestoreClient(mockPDClient{}, nil, nil, keepalive.ClientParameters{}, false) + client := restore.NewRestoreClient(mockPDClient{}, nil, nil, keepalive.ClientParameters{}) ctx := context.Background() err := configureRestoreClient(ctx, client, restoreCfg) require.NoError(t, err) diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 596b1d29d714e..a74b8c950601a 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -38,13 +38,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) // sometimes we have pooled the connections. // sending heartbeats in idle times is useful. keepaliveCfg.PermitWithoutStream = true - client := restore.NewRestoreClient( - mgr.GetPDClient(), - mgr.GetPDHTTPClient(), - mgr.GetTLSConfig(), - keepaliveCfg, - true, - ) + client := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), keepaliveCfg) client.SetRateLimit(cfg.RateLimit) client.SetCrypter(&cfg.CipherInfo) client.SetConcurrency(uint(cfg.Concurrency)) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8703dae12e759..0118a25c96b37 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1462,13 +1462,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m var err error keepaliveCfg := GetKeepalive(&cfg.Config) keepaliveCfg.PermitWithoutStream = true - client := restore.NewRestoreClient( - mgr.GetPDClient(), - mgr.GetPDHTTPClient(), - mgr.GetTLSConfig(), - keepaliveCfg, - false, - ) + client := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), keepaliveCfg) err = client.Init(g, mgr.GetStorage()) if err != nil { return nil, errors.Trace(err) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index d79c8b33fa97e..77467a6743b79 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -554,7 +554,7 @@ func NewBackend( pdCli.GetServiceDiscovery(), pdhttp.WithTLSConfig(tls.TLSConfig()), ).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, pdutil.PDRequestRetryTime*time.Second)) - splitCli := split.NewClient(pdCli, pdHTTPCli, tls.TLSConfig(), false, config.RegionSplitBatchSize, config.RegionSplitConcurrency) + splitCli := split.NewClient(pdCli, pdHTTPCli, tls.TLSConfig(), config.RegionSplitBatchSize, config.RegionSplitConcurrency) importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType) var writeLimiter StoreWriteLimiter if config.StoreWriteBWLimit > 0 {