diff --git a/br/pkg/config/BUILD.bazel b/br/pkg/config/BUILD.bazel index 4bddb6debfa2a..022078ea4f0fc 100644 --- a/br/pkg/config/BUILD.bazel +++ b/br/pkg/config/BUILD.bazel @@ -2,12 +2,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "config", - srcs = ["ebs.go"], + srcs = [ + "ebs.go", + "kv.go", + ], importpath = "github.com/pingcap/tidb/br/pkg/config", visibility = ["//visibility:public"], deps = [ "//br/pkg/metautil", "//br/pkg/storage", + "@com_github_docker_go_units//:go-units", "@com_github_masterminds_semver//:semver", "@com_github_pingcap_errors//:errors", "@io_k8s_api//core/v1:core", diff --git a/br/pkg/config/kv.go b/br/pkg/config/kv.go new file mode 100644 index 0000000000000..0283c8d3cc721 --- /dev/null +++ b/br/pkg/config/kv.go @@ -0,0 +1,58 @@ +// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. +package config + +import ( + "encoding/json" + + "github.com/docker/go-units" +) + +type ConfigTerm[T uint | uint64] struct { + Value T + Modified bool +} + +type KVConfig struct { + ImportGoroutines ConfigTerm[uint] + MergeRegionSize ConfigTerm[uint64] + MergeRegionKeyCount ConfigTerm[uint64] +} + +func ParseImportThreadsFromConfig(resp []byte) (uint, error) { + type importer struct { + Threads uint `json:"num-threads"` + } + + type config struct { + Import importer `json:"import"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return 0, e + } + + return c.Import.Threads, nil +} + +func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) { + type coprocessor struct { + RegionSplitSize string `json:"region-split-size"` + RegionSplitKeys uint64 `json:"region-split-keys"` + } + + type config struct { + Cop coprocessor `json:"coprocessor"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return 0, 0, e + } + rs, e := units.RAMInBytes(c.Cop.RegionSplitSize) + if e != nil { + return 0, 0, e + } + urs := uint64(rs) + return urs, c.Cop.RegionSplitKeys, nil +} diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index 0d0227fc9db48..e42fdf7b7f046 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/conn", visibility = ["//visibility:public"], deps = [ + "//br/pkg/config", "//br/pkg/conn/util", "//br/pkg/errors", "//br/pkg/glue", @@ -47,6 +48,7 @@ go_test( flaky = True, shard_count = 7, deps = [ + "//br/pkg/config", "//br/pkg/conn/util", "//br/pkg/pdutil", "//br/pkg/utils", diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index bf67f0d96ddff..d76b11202cd9f 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -5,8 +5,8 @@ package conn import ( "context" "crypto/tls" - "encoding/json" "fmt" + "io" "net" "net/http" "net/url" @@ -20,6 +20,7 @@ import ( logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + kvconfig "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/conn/util" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" @@ -48,6 +49,11 @@ const ( // DefaultMergeRegionKeyCount is the default region key count, 960000. DefaultMergeRegionKeyCount uint64 = 960000 + + // DefaultImportNumGoroutines is the default number of threads for import. + // use 128 as default value, which is 8 times of the default value of tidb. + // we think is proper for IO-bound cases. + DefaultImportNumGoroutines uint = 128 ) type VersionCheckerType int @@ -287,42 +293,56 @@ func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) { return oracle.ComposeTS(p, l), nil } -// GetMergeRegionSizeAndCount returns the tikv config -// `coprocessor.region-split-size` and `coprocessor.region-split-key`. -// returns the default config when failed. -func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Client) (uint64, uint64) { - regionSplitSize := DefaultMergeRegionSizeBytes - regionSplitKeys := DefaultMergeRegionKeyCount - type coprocessor struct { - RegionSplitKeys uint64 `json:"region-split-keys"` - RegionSplitSize string `json:"region-split-size"` +// ProcessTiKVConfigs handle the tikv config for region split size, region split keys, and import goroutines in place. +// It retrieves the config from all alive tikv stores and returns the minimum values. +// If retrieving the config fails, it returns the default config values. +func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, client *http.Client) { + mergeRegionSize := cfg.MergeRegionSize + mergeRegionKeyCount := cfg.MergeRegionKeyCount + importGoroutines := cfg.ImportGoroutines + + if mergeRegionSize.Modified && mergeRegionKeyCount.Modified && importGoroutines.Modified { + log.Info("no need to retrieve the config from tikv if user has set the config") + return } - type config struct { - Cop coprocessor `json:"coprocessor"` - } err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error { - c := &config{} - e := json.NewDecoder(resp.Body).Decode(c) - if e != nil { - return e + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return err } - rs, e := units.RAMInBytes(c.Cop.RegionSplitSize) - if e != nil { - return e + if !mergeRegionSize.Modified || !mergeRegionKeyCount.Modified { + size, keys, e := kvconfig.ParseMergeRegionSizeFromConfig(respBytes) + if e != nil { + log.Warn("Failed to parse region split size and keys from config", logutil.ShortError(e)) + return e + } + if mergeRegionKeyCount.Value == DefaultMergeRegionKeyCount || keys < mergeRegionKeyCount.Value { + mergeRegionSize.Value = size + mergeRegionKeyCount.Value = keys + } } - urs := uint64(rs) - if regionSplitSize == DefaultMergeRegionSizeBytes || urs < regionSplitSize { - regionSplitSize = urs - regionSplitKeys = c.Cop.RegionSplitKeys + if !importGoroutines.Modified { + threads, e := kvconfig.ParseImportThreadsFromConfig(respBytes) + if e != nil { + log.Warn("Failed to parse import num-threads from config", logutil.ShortError(e)) + return e + } + // We use 8 times the default value because it's an IO-bound case. + if importGoroutines.Value == DefaultImportNumGoroutines || (threads > 0 && threads*8 < importGoroutines.Value) { + importGoroutines.Value = threads * 8 + } } + // replace the value + cfg.MergeRegionSize = mergeRegionSize + cfg.MergeRegionKeyCount = mergeRegionKeyCount + cfg.ImportGoroutines = importGoroutines return nil }) + if err != nil { - log.Warn("meet error when getting config from TiKV; using default", logutil.ShortError(err)) - return DefaultMergeRegionSizeBytes, DefaultMergeRegionKeyCount + log.Warn("Failed to get config from TiKV; using default", logutil.ShortError(err)) } - return regionSplitSize, regionSplitKeys } // GetConfigFromTiKV get configs from all alive tikv stores. diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index fc822fac123d9..576c3f7feb826 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + kvconfig "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/utils" @@ -269,10 +270,11 @@ func TestGetConnOnCanceledContext(t *testing.T) { func TestGetMergeRegionSizeAndCount(t *testing.T) { cases := []struct { - stores []*metapb.Store - content []string - regionSplitSize uint64 - regionSplitKeys uint64 + stores []*metapb.Store + content []string + importNumGoroutines uint + regionSplitSize uint64 + regionSplitKeys uint64 }{ { stores: []*metapb.Store{ @@ -289,8 +291,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, content: []string{""}, // no tikv detected in this case - regionSplitSize: DefaultMergeRegionSizeBytes, - regionSplitKeys: DefaultMergeRegionKeyCount, + importNumGoroutines: DefaultImportNumGoroutines, + regionSplitSize: DefaultMergeRegionSizeBytes, + regionSplitKeys: DefaultMergeRegionKeyCount, }, { stores: []*metapb.Store{ @@ -321,8 +324,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { "", }, // no tikv detected in this case - regionSplitSize: DefaultMergeRegionSizeBytes, - regionSplitKeys: DefaultMergeRegionKeyCount, + importNumGoroutines: DefaultImportNumGoroutines, + regionSplitSize: DefaultMergeRegionSizeBytes, + regionSplitKeys: DefaultMergeRegionKeyCount, }, { stores: []*metapb.Store{ @@ -338,8 +342,10 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, }, content: []string{ - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 1, \"region-split-size\": \"1MiB\"}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 1, \"region-split-size\": \"1MiB\"}, \"import\": {\"num-threads\": 6}}", }, + // the number of import goroutines is 8 times than import.num-threads. + importNumGoroutines: 48, // one tikv detected in this case we are not update default size and keys because they are too small. regionSplitSize: 1 * units.MiB, regionSplitKeys: 1, @@ -358,8 +364,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, }, content: []string{ - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, \"import\": {\"num-threads\": 128}}", }, + importNumGoroutines: 1024, // one tikv detected in this case and we update with new size and keys. regionSplitSize: 1 * units.GiB, regionSplitKeys: 10000000, @@ -388,12 +395,13 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, }, content: []string{ - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}}", - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, \"import\": {\"num-threads\": 128}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}, \"import\": {\"num-threads\": 12}}", }, // two tikv detected in this case and we choose the small one. - regionSplitSize: 900 * units.MiB, - regionSplitKeys: 12000000, + importNumGoroutines: 96, + regionSplitSize: 1 * units.GiB, + regionSplitKeys: 10000000, }, } @@ -420,9 +428,15 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { httpCli := mockServer.Client() mgr := &Mgr{PdController: &pdutil.PdController{}} mgr.PdController.SetPDClient(pdCli) - rs, rk := mgr.GetMergeRegionSizeAndCount(ctx, httpCli) - require.Equal(t, ca.regionSplitSize, rs) - require.Equal(t, ca.regionSplitKeys, rk) + kvConfigs := &kvconfig.KVConfig{ + ImportGoroutines: kvconfig.ConfigTerm[uint]{Value: DefaultImportNumGoroutines, Modified: false}, + MergeRegionSize: kvconfig.ConfigTerm[uint64]{Value: DefaultMergeRegionSizeBytes, Modified: false}, + MergeRegionKeyCount: kvconfig.ConfigTerm[uint64]{Value: DefaultMergeRegionKeyCount, Modified: false}, + } + mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) + require.EqualValues(t, ca.regionSplitSize, kvConfigs.MergeRegionSize.Value) + require.EqualValues(t, ca.regionSplitKeys, kvConfigs.MergeRegionKeyCount.Value) + require.EqualValues(t, ca.importNumGoroutines, kvConfigs.ImportGoroutines.Value) mockServer.Close() } } diff --git a/br/pkg/rtree/logging.go b/br/pkg/rtree/logging.go index 1dc4434d9f27a..08dfbf5a32322 100644 --- a/br/pkg/rtree/logging.go +++ b/br/pkg/rtree/logging.go @@ -55,6 +55,6 @@ func (rs rangesMarshaler) MarshalLogObject(encoder zapcore.ObjectEncoder) error encoder.AddInt("totalFiles", totalFile) encoder.AddUint64("totalKVs", totalKV) encoder.AddUint64("totalBytes", totalBytes) - encoder.AddUint64("totalSize", totalBytes) + encoder.AddUint64("totalSize", totalSize) return nil } diff --git a/br/pkg/rtree/logging_test.go b/br/pkg/rtree/logging_test.go index 9630cda766aeb..7e977a1d9e598 100644 --- a/br/pkg/rtree/logging_test.go +++ b/br/pkg/rtree/logging_test.go @@ -34,7 +34,7 @@ func TestLogRanges(t *testing.T) { ranges := make([]rtree.Range, cs.count) for j := 0; j < cs.count; j++ { ranges[j] = *newRange([]byte(fmt.Sprintf("%d", j)), []byte(fmt.Sprintf("%d", j+1))) - ranges[j].Files = append(ranges[j].Files, &backuppb.File{TotalKvs: uint64(j), TotalBytes: uint64(j)}) + ranges[j].Files = append(ranges[j].Files, &backuppb.File{TotalKvs: uint64(j), TotalBytes: uint64(j), Size_: uint64(j)}) } out, err := encoder.EncodeEntry(zapcore.Entry{}, []zap.Field{rtree.ZapRanges(ranges)}) require.NoError(t, err) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index bd381b38a2c7a..760a968a65b3e 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -102,6 +102,7 @@ go_test( flaky = True, shard_count = 22, deps = [ + "//br/pkg/config", "//br/pkg/conn", "//br/pkg/errors", "//br/pkg/metautil", diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index 5db9d638a1658..65369afd1a018 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -9,6 +9,7 @@ import ( backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" + kvconfig "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/config" @@ -208,8 +209,8 @@ func expectedDefaultRestoreConfig() RestoreConfig { Config: defaultConfig, RestoreCommonConfig: RestoreCommonConfig{Online: false, Granularity: "fine-grained", - MergeSmallRegionSizeBytes: 0x6000000, - MergeSmallRegionKeyCount: 0xea600, + MergeSmallRegionSizeBytes: kvconfig.ConfigTerm[uint64]{Value: 0x6000000}, + MergeSmallRegionKeyCount: kvconfig.ConfigTerm[uint64]{Value: 0xea600}, WithSysTable: true, ResetSysUsers: []string{"cloud_admin", "root"}}, NoSchema: false, diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index bc60b155ec101..ea5ce851f395d 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -102,15 +102,15 @@ const ( // RestoreCommonConfig is the common configuration for all BR restore tasks. type RestoreCommonConfig struct { - Online bool `json:"online" toml:"online"` - Granularity string `json:"granularity" toml:"granularity"` - ConcurrencyPerStore uint `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"` + Online bool `json:"online" toml:"online"` + Granularity string `json:"granularity" toml:"granularity"` + ConcurrencyPerStore pconfig.ConfigTerm[uint] `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"` // MergeSmallRegionSizeBytes is the threshold of merging small regions (Default 96MB, region split size). // MergeSmallRegionKeyCount is the threshold of merging smalle regions (Default 960_000, region split key count). // See https://github.com/tikv/tikv/blob/v4.0.8/components/raftstore/src/coprocessor/config.rs#L35-L38 - MergeSmallRegionSizeBytes uint64 `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` - MergeSmallRegionKeyCount uint64 `json:"merge-region-key-count" toml:"merge-region-key-count"` + MergeSmallRegionSizeBytes pconfig.ConfigTerm[uint64] `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` + MergeSmallRegionKeyCount pconfig.ConfigTerm[uint64] `json:"merge-region-key-count" toml:"merge-region-key-count"` // determines whether enable restore sys table on default, see fullClusterRestore in restore/client.go WithSysTable bool `json:"with-sys-table" toml:"with-sys-table"` @@ -121,17 +121,17 @@ type RestoreCommonConfig struct { // adjust adjusts the abnormal config value in the current config. // useful when not starting BR from CLI (e.g. from BRIE in SQL). func (cfg *RestoreCommonConfig) adjust() { - if cfg.MergeSmallRegionKeyCount == 0 { - cfg.MergeSmallRegionKeyCount = conn.DefaultMergeRegionKeyCount + if !cfg.MergeSmallRegionKeyCount.Modified { + cfg.MergeSmallRegionKeyCount.Value = conn.DefaultMergeRegionKeyCount } - if cfg.MergeSmallRegionSizeBytes == 0 { - cfg.MergeSmallRegionSizeBytes = conn.DefaultMergeRegionSizeBytes + if !cfg.MergeSmallRegionSizeBytes.Modified { + cfg.MergeSmallRegionSizeBytes.Value = conn.DefaultMergeRegionSizeBytes } if len(cfg.Granularity) == 0 { cfg.Granularity = string(restore.FineGrained) } - if cfg.ConcurrencyPerStore == 0 { - cfg.ConcurrencyPerStore = 128 + if cfg.ConcurrencyPerStore.Modified { + cfg.ConcurrencyPerStore.Value = conn.DefaultImportNumGoroutines } } @@ -178,14 +178,23 @@ func (cfg *RestoreCommonConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - cfg.MergeSmallRegionKeyCount, err = flags.GetUint64(FlagMergeRegionKeyCount) + cfg.ConcurrencyPerStore.Value, err = flags.GetUint(flagConcurrencyPerStore) if err != nil { return errors.Trace(err) } - cfg.MergeSmallRegionSizeBytes, err = flags.GetUint64(FlagMergeRegionSizeBytes) + cfg.ConcurrencyPerStore.Modified = flags.Changed(flagConcurrencyPerStore) + + cfg.MergeSmallRegionKeyCount.Value, err = flags.GetUint64(FlagMergeRegionKeyCount) + if err != nil { + return errors.Trace(err) + } + cfg.MergeSmallRegionKeyCount.Modified = flags.Changed(FlagMergeRegionKeyCount) + + cfg.MergeSmallRegionSizeBytes.Value, err = flags.GetUint64(FlagMergeRegionSizeBytes) if err != nil { return errors.Trace(err) } + cfg.MergeSmallRegionSizeBytes.Modified = flags.Changed(FlagMergeRegionSizeBytes) if flags.Lookup(flagWithSysTable) != nil { cfg.WithSysTable, err = flags.GetBool(flagWithSysTable) @@ -529,7 +538,6 @@ func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *Re return errors.Trace(err) } client.SetConcurrency(uint(cfg.Concurrency)) - client.SetConcurrencyPerStore(cfg.ConcurrencyPerStore) return nil } @@ -706,16 +714,18 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf defer mgr.Close() codec := mgr.GetStorage().GetCodec() - mergeRegionSize := cfg.MergeSmallRegionSizeBytes - mergeRegionCount := cfg.MergeSmallRegionKeyCount - if mergeRegionSize == conn.DefaultMergeRegionSizeBytes && - mergeRegionCount == conn.DefaultMergeRegionKeyCount { - // according to https://github.com/pingcap/tidb/issues/34167. - // we should get the real config from tikv to adapt the dynamic region. - httpCli := httputil.NewClient(mgr.GetTLSConfig()) - mergeRegionSize, mergeRegionCount = mgr.GetMergeRegionSizeAndCount(ctx, httpCli) + // need retrieve these configs from tikv if not set in command. + kvConfigs := &pconfig.KVConfig{ + ImportGoroutines: cfg.ConcurrencyPerStore, + MergeRegionSize: cfg.MergeSmallRegionSizeBytes, + MergeRegionKeyCount: cfg.MergeSmallRegionKeyCount, } + // according to https://github.com/pingcap/tidb/issues/34167. + // we should get the real config from tikv to adapt the dynamic region. + httpCli := httputil.NewClient(mgr.GetTLSConfig()) + mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) + keepaliveCfg.PermitWithoutStream = true client := restore.NewRestoreClient( mgr.GetPDClient(), @@ -724,6 +734,8 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf keepaliveCfg, false, ) + // using tikv config to set the concurrency-per-store for client. + client.SetConcurrencyPerStore(kvConfigs.ImportGoroutines.Value) err = configureRestoreClient(ctx, client, cfg) if err != nil { return errors.Trace(err) @@ -982,7 +994,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf log.Debug("mapped table to files", zap.Any("result map", tableFileMap)) rangeStream := restore.GoValidateFileRanges( - ctx, tableStream, tableFileMap, mergeRegionSize, mergeRegionCount, errCh) + ctx, tableStream, tableFileMap, kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value, errCh) rangeSize := restore.EstimateRangeSize(files) summary.CollectInt("restore ranges", rangeSize) diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 31763a9a9a638..0e4a89a99fe5e 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -7,6 +7,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + kvconfig "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" @@ -73,14 +74,17 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } defer mgr.Close() - mergeRegionSize := cfg.MergeSmallRegionSizeBytes - mergeRegionCount := cfg.MergeSmallRegionKeyCount - if mergeRegionSize == conn.DefaultMergeRegionSizeBytes && - mergeRegionCount == conn.DefaultMergeRegionKeyCount { + // need retrieve these configs from tikv if not set in command. + kvConfigs := &kvconfig.KVConfig{ + MergeRegionSize: cfg.MergeSmallRegionSizeBytes, + MergeRegionKeyCount: cfg.MergeSmallRegionKeyCount, + } + + if !kvConfigs.MergeRegionSize.Modified || !kvConfigs.MergeRegionKeyCount.Modified { // according to https://github.com/pingcap/tidb/issues/34167. // we should get the real config from tikv to adapt the dynamic region. httpCli := httputil.NewClient(mgr.GetTLSConfig()) - mergeRegionSize, mergeRegionCount = mgr.GetMergeRegionSizeAndCount(ctx, httpCli) + mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) } keepaliveCfg := GetKeepalive(&cfg.Config) @@ -134,7 +138,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR summary.CollectInt("restore files", len(files)) ranges, _, err := restore.MergeFileRanges( - files, mergeRegionSize, mergeRegionCount) + files, kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index f279aaea65061..b42a1ab095dce 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -31,8 +31,8 @@ func TestRestoreConfigAdjust(t *testing.T) { require.Equal(t, uint32(defaultRestoreConcurrency), cfg.Config.Concurrency) require.Equal(t, defaultSwitchInterval, cfg.Config.SwitchModeInterval) - require.Equal(t, conn.DefaultMergeRegionKeyCount, cfg.MergeSmallRegionKeyCount) - require.Equal(t, conn.DefaultMergeRegionSizeBytes, cfg.MergeSmallRegionSizeBytes) + require.Equal(t, conn.DefaultMergeRegionKeyCount, cfg.MergeSmallRegionKeyCount.Value) + require.Equal(t, conn.DefaultMergeRegionSizeBytes, cfg.MergeSmallRegionSizeBytes.Value) } type mockPDClient struct {