From a6201f97fd938b433162a3a2d2f12eeb54ec5c50 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 29 Jan 2024 18:37:39 +0800 Subject: [PATCH 01/11] restore: get proper config from tikv server --- br/pkg/conn/conn.go | 70 ++++++++++++++++++++++++++++--------- br/pkg/rtree/logging.go | 2 +- br/pkg/task/common_test.go | 4 +-- br/pkg/task/restore.go | 52 +++++++++++++++++---------- br/pkg/task/restore_raw.go | 7 ++-- br/pkg/task/restore_test.go | 4 +-- 6 files changed, 96 insertions(+), 43 deletions(-) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index bf67f0d96ddff..e9723d9e2e032 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -48,6 +48,11 @@ const ( // DefaultMergeRegionKeyCount is the default region key count, 960000. DefaultMergeRegionKeyCount uint64 = 960000 + + // DefaultImportNumThreads is the default number of threads for import. + // use 128 as default value, which is 8 times of the default value of tidb. + // we think is proper for IO-bound cases. + DefaultImportNumThreads uint = 128 ) type VersionCheckerType int @@ -287,12 +292,24 @@ func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) { return oracle.ComposeTS(p, l), nil } -// GetMergeRegionSizeAndCount returns the tikv config -// `coprocessor.region-split-size` and `coprocessor.region-split-key`. -// returns the default config when failed. -func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Client) (uint64, uint64) { - regionSplitSize := DefaultMergeRegionSizeBytes - regionSplitKeys := DefaultMergeRegionKeyCount +func ParseImportThreadsFromConfig(resp *http.Response) (uint, error) { + type importer struct { + Threads uint `json:"num-threads"` + } + + type config struct { + Import importer `json:"import"` + } + c := &config{} + e := json.NewDecoder(resp.Body).Decode(c) + if e != nil { + return 0, e + } + + return c.Import.Threads, nil +} + +func ParseMergeRegionSizeAndCountFromConfig(resp *http.Response) (uint64, uint64, error) { type coprocessor struct { RegionSplitKeys uint64 `json:"region-split-keys"` RegionSplitSize string `json:"region-split-size"` @@ -301,28 +318,49 @@ func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Cli type config struct { Cop coprocessor `json:"coprocessor"` } + c := &config{} + e := json.NewDecoder(resp.Body).Decode(c) + if e != nil { + return 0, 0, e + } + rs, e := units.RAMInBytes(c.Cop.RegionSplitSize) + if e != nil { + return 0, 0, e + } + urs := uint64(rs) + return urs, c.Cop.RegionSplitKeys, nil +} + +// GetTiKVConfigs returns the tikv config +// `coprocessor.region-split-size` and `coprocessor.region-split-key` and `import.num-threads`. +// returns the default config when failed. +func (mgr *Mgr) GetTiKVConfigs(ctx context.Context, client *http.Client) (uint64, uint64, uint) { + regionSplitSize := DefaultMergeRegionSizeBytes + regionSplitKeys := DefaultMergeRegionKeyCount + importThreads := DefaultImportNumThreads err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error { - c := &config{} - e := json.NewDecoder(resp.Body).Decode(c) + rs, rk, e := ParseMergeRegionSizeAndCountFromConfig(resp) if e != nil { return e } - rs, e := units.RAMInBytes(c.Cop.RegionSplitSize) + if regionSplitSize == DefaultMergeRegionSizeBytes || rs < regionSplitSize { + regionSplitSize = rs + regionSplitKeys = rk + } + n, e := ParseImportThreadsFromConfig(resp) if e != nil { + log.Warn("meet error when parsing import num-threads, ignore it", logutil.ShortError(e)) return e } - urs := uint64(rs) - if regionSplitSize == DefaultMergeRegionSizeBytes || urs < regionSplitSize { - regionSplitSize = urs - regionSplitKeys = c.Cop.RegionSplitKeys - } + // we use 8 times of the default value because it's an IO-bound cases. + importThreads = 8 * n return nil }) if err != nil { log.Warn("meet error when getting config from TiKV; using default", logutil.ShortError(err)) - return DefaultMergeRegionSizeBytes, DefaultMergeRegionKeyCount + return DefaultMergeRegionSizeBytes, DefaultMergeRegionKeyCount, DefaultImportNumThreads } - return regionSplitSize, regionSplitKeys + return regionSplitSize, regionSplitKeys, importThreads } // GetConfigFromTiKV get configs from all alive tikv stores. diff --git a/br/pkg/rtree/logging.go b/br/pkg/rtree/logging.go index 1dc4434d9f27a..08dfbf5a32322 100644 --- a/br/pkg/rtree/logging.go +++ b/br/pkg/rtree/logging.go @@ -55,6 +55,6 @@ func (rs rangesMarshaler) MarshalLogObject(encoder zapcore.ObjectEncoder) error encoder.AddInt("totalFiles", totalFile) encoder.AddUint64("totalKVs", totalKV) encoder.AddUint64("totalBytes", totalBytes) - encoder.AddUint64("totalSize", totalBytes) + encoder.AddUint64("totalSize", totalSize) return nil } diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index 5db9d638a1658..fcc8bdc5c1236 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -208,8 +208,8 @@ func expectedDefaultRestoreConfig() RestoreConfig { Config: defaultConfig, RestoreCommonConfig: RestoreCommonConfig{Online: false, Granularity: "fine-grained", - MergeSmallRegionSizeBytes: 0x6000000, - MergeSmallRegionKeyCount: 0xea600, + MergeSmallRegionSizeBytes: ConfigSet[uint64]{value: 0x6000000}, + MergeSmallRegionKeyCount: ConfigSet[uint64]{value: 0xea600}, WithSysTable: true, ResetSysUsers: []string{"cloud_admin", "root"}}, NoSchema: false, diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index bc60b155ec101..596be687e730d 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -100,17 +100,22 @@ const ( TxnRestoreCmd = "Txn Restore" ) +type ConfigSet[T uint | uint64] struct { + value T + hasSet bool +} + // RestoreCommonConfig is the common configuration for all BR restore tasks. type RestoreCommonConfig struct { - Online bool `json:"online" toml:"online"` - Granularity string `json:"granularity" toml:"granularity"` - ConcurrencyPerStore uint `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"` + Online bool `json:"online" toml:"online"` + Granularity string `json:"granularity" toml:"granularity"` + ConcurrencyPerStore ConfigSet[uint] `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"` // MergeSmallRegionSizeBytes is the threshold of merging small regions (Default 96MB, region split size). // MergeSmallRegionKeyCount is the threshold of merging smalle regions (Default 960_000, region split key count). // See https://github.com/tikv/tikv/blob/v4.0.8/components/raftstore/src/coprocessor/config.rs#L35-L38 - MergeSmallRegionSizeBytes uint64 `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` - MergeSmallRegionKeyCount uint64 `json:"merge-region-key-count" toml:"merge-region-key-count"` + MergeSmallRegionSizeBytes ConfigSet[uint64] `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` + MergeSmallRegionKeyCount ConfigSet[uint64] `json:"merge-region-key-count" toml:"merge-region-key-count"` // determines whether enable restore sys table on default, see fullClusterRestore in restore/client.go WithSysTable bool `json:"with-sys-table" toml:"with-sys-table"` @@ -121,17 +126,17 @@ type RestoreCommonConfig struct { // adjust adjusts the abnormal config value in the current config. // useful when not starting BR from CLI (e.g. from BRIE in SQL). func (cfg *RestoreCommonConfig) adjust() { - if cfg.MergeSmallRegionKeyCount == 0 { - cfg.MergeSmallRegionKeyCount = conn.DefaultMergeRegionKeyCount + if !cfg.MergeSmallRegionKeyCount.hasSet { + cfg.MergeSmallRegionKeyCount.value = conn.DefaultMergeRegionKeyCount } - if cfg.MergeSmallRegionSizeBytes == 0 { - cfg.MergeSmallRegionSizeBytes = conn.DefaultMergeRegionSizeBytes + if !cfg.MergeSmallRegionSizeBytes.hasSet { + cfg.MergeSmallRegionSizeBytes.value = conn.DefaultMergeRegionSizeBytes } if len(cfg.Granularity) == 0 { cfg.Granularity = string(restore.FineGrained) } - if cfg.ConcurrencyPerStore == 0 { - cfg.ConcurrencyPerStore = 128 + if cfg.ConcurrencyPerStore.hasSet { + cfg.ConcurrencyPerStore.value = conn.DefaultImportNumThreads } } @@ -178,14 +183,23 @@ func (cfg *RestoreCommonConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - cfg.MergeSmallRegionKeyCount, err = flags.GetUint64(FlagMergeRegionKeyCount) + cfg.ConcurrencyPerStore.value, err = flags.GetUint(flagConcurrencyPerStore) + if err != nil { + return errors.Trace(err) + } + cfg.ConcurrencyPerStore.hasSet = flags.Changed(flagConcurrencyPerStore) + + cfg.MergeSmallRegionKeyCount.value, err = flags.GetUint64(FlagMergeRegionKeyCount) if err != nil { return errors.Trace(err) } - cfg.MergeSmallRegionSizeBytes, err = flags.GetUint64(FlagMergeRegionSizeBytes) + cfg.MergeSmallRegionKeyCount.hasSet = flags.Changed(FlagMergeRegionKeyCount) + + cfg.MergeSmallRegionSizeBytes.value, err = flags.GetUint64(FlagMergeRegionSizeBytes) if err != nil { return errors.Trace(err) } + cfg.MergeSmallRegionSizeBytes.hasSet = flags.Changed(FlagMergeRegionSizeBytes) if flags.Lookup(flagWithSysTable) != nil { cfg.WithSysTable, err = flags.GetBool(flagWithSysTable) @@ -529,7 +543,6 @@ func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *Re return errors.Trace(err) } client.SetConcurrency(uint(cfg.Concurrency)) - client.SetConcurrencyPerStore(cfg.ConcurrencyPerStore) return nil } @@ -708,12 +721,13 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf mergeRegionSize := cfg.MergeSmallRegionSizeBytes mergeRegionCount := cfg.MergeSmallRegionKeyCount - if mergeRegionSize == conn.DefaultMergeRegionSizeBytes && - mergeRegionCount == conn.DefaultMergeRegionKeyCount { + importNumThreads := cfg.ConcurrencyPerStore + + if !mergeRegionSize.hasSet || !mergeRegionCount.hasSet || !importNumThreads.hasSet { // according to https://github.com/pingcap/tidb/issues/34167. // we should get the real config from tikv to adapt the dynamic region. httpCli := httputil.NewClient(mgr.GetTLSConfig()) - mergeRegionSize, mergeRegionCount = mgr.GetMergeRegionSizeAndCount(ctx, httpCli) + mergeRegionSize.value, mergeRegionCount.value, importNumThreads.value = mgr.GetTiKVConfigs(ctx, httpCli) } keepaliveCfg.PermitWithoutStream = true @@ -724,6 +738,8 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf keepaliveCfg, false, ) + // using tikv config to set the concurrency-per-store for client. + client.SetConcurrencyPerStore(importNumThreads.value) err = configureRestoreClient(ctx, client, cfg) if err != nil { return errors.Trace(err) @@ -982,7 +998,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf log.Debug("mapped table to files", zap.Any("result map", tableFileMap)) rangeStream := restore.GoValidateFileRanges( - ctx, tableStream, tableFileMap, mergeRegionSize, mergeRegionCount, errCh) + ctx, tableStream, tableFileMap, mergeRegionSize.value, mergeRegionCount.value, errCh) rangeSize := restore.EstimateRangeSize(files) summary.CollectInt("restore ranges", rangeSize) diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 31763a9a9a638..50c7194f8fbc4 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -75,12 +75,11 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR mergeRegionSize := cfg.MergeSmallRegionSizeBytes mergeRegionCount := cfg.MergeSmallRegionKeyCount - if mergeRegionSize == conn.DefaultMergeRegionSizeBytes && - mergeRegionCount == conn.DefaultMergeRegionKeyCount { + if !mergeRegionSize.hasSet || !mergeRegionSize.hasSet { // according to https://github.com/pingcap/tidb/issues/34167. // we should get the real config from tikv to adapt the dynamic region. httpCli := httputil.NewClient(mgr.GetTLSConfig()) - mergeRegionSize, mergeRegionCount = mgr.GetMergeRegionSizeAndCount(ctx, httpCli) + mergeRegionSize.value, mergeRegionCount.value, _ = mgr.GetTiKVConfigs(ctx, httpCli) } keepaliveCfg := GetKeepalive(&cfg.Config) @@ -134,7 +133,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR summary.CollectInt("restore files", len(files)) ranges, _, err := restore.MergeFileRanges( - files, mergeRegionSize, mergeRegionCount) + files, mergeRegionSize.value, mergeRegionCount.value) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index f279aaea65061..e64220810baaf 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -31,8 +31,8 @@ func TestRestoreConfigAdjust(t *testing.T) { require.Equal(t, uint32(defaultRestoreConcurrency), cfg.Config.Concurrency) require.Equal(t, defaultSwitchInterval, cfg.Config.SwitchModeInterval) - require.Equal(t, conn.DefaultMergeRegionKeyCount, cfg.MergeSmallRegionKeyCount) - require.Equal(t, conn.DefaultMergeRegionSizeBytes, cfg.MergeSmallRegionSizeBytes) + require.Equal(t, conn.DefaultMergeRegionKeyCount, cfg.MergeSmallRegionKeyCount.value) + require.Equal(t, conn.DefaultMergeRegionSizeBytes, cfg.MergeSmallRegionSizeBytes.value) } type mockPDClient struct { From 064365afa1311e17c637347b0f4c2bcd6f34e0f0 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 31 Jan 2024 13:34:58 +0800 Subject: [PATCH 02/11] add test --- br/pkg/conn/conn.go | 14 ++++++++------ br/pkg/conn/conn_test.go | 38 +++++++++++++++++++++++--------------- br/pkg/task/restore.go | 10 +++++----- 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index e9723d9e2e032..cb7d5276cca7c 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -49,10 +49,10 @@ const ( // DefaultMergeRegionKeyCount is the default region key count, 960000. DefaultMergeRegionKeyCount uint64 = 960000 - // DefaultImportNumThreads is the default number of threads for import. + // DefaultImportNumGoroutines is the default number of threads for import. // use 128 as default value, which is 8 times of the default value of tidb. // we think is proper for IO-bound cases. - DefaultImportNumThreads uint = 128 + DefaultImportNumGoroutines uint = 128 ) type VersionCheckerType int @@ -337,7 +337,7 @@ func ParseMergeRegionSizeAndCountFromConfig(resp *http.Response) (uint64, uint64 func (mgr *Mgr) GetTiKVConfigs(ctx context.Context, client *http.Client) (uint64, uint64, uint) { regionSplitSize := DefaultMergeRegionSizeBytes regionSplitKeys := DefaultMergeRegionKeyCount - importThreads := DefaultImportNumThreads + importGoroutines := DefaultImportNumGoroutines err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error { rs, rk, e := ParseMergeRegionSizeAndCountFromConfig(resp) if e != nil { @@ -353,14 +353,16 @@ func (mgr *Mgr) GetTiKVConfigs(ctx context.Context, client *http.Client) (uint64 return e } // we use 8 times of the default value because it's an IO-bound cases. - importThreads = 8 * n + if n > 0 && n*8 < importGoroutines { + importGoroutines = n * 8 + } return nil }) if err != nil { log.Warn("meet error when getting config from TiKV; using default", logutil.ShortError(err)) - return DefaultMergeRegionSizeBytes, DefaultMergeRegionKeyCount, DefaultImportNumThreads + return DefaultMergeRegionSizeBytes, DefaultMergeRegionKeyCount, DefaultImportNumGoroutines } - return regionSplitSize, regionSplitKeys, importThreads + return regionSplitSize, regionSplitKeys, importGoroutines } // GetConfigFromTiKV get configs from all alive tikv stores. diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index fc822fac123d9..6f17be4f2a2d1 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -269,10 +269,11 @@ func TestGetConnOnCanceledContext(t *testing.T) { func TestGetMergeRegionSizeAndCount(t *testing.T) { cases := []struct { - stores []*metapb.Store - content []string - regionSplitSize uint64 - regionSplitKeys uint64 + stores []*metapb.Store + content []string + importNumGoroutines uint + regionSplitSize uint64 + regionSplitKeys uint64 }{ { stores: []*metapb.Store{ @@ -289,8 +290,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, content: []string{""}, // no tikv detected in this case - regionSplitSize: DefaultMergeRegionSizeBytes, - regionSplitKeys: DefaultMergeRegionKeyCount, + importNumGoroutines: DefaultImportNumGoroutines, + regionSplitSize: DefaultMergeRegionSizeBytes, + regionSplitKeys: DefaultMergeRegionKeyCount, }, { stores: []*metapb.Store{ @@ -321,8 +323,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { "", }, // no tikv detected in this case - regionSplitSize: DefaultMergeRegionSizeBytes, - regionSplitKeys: DefaultMergeRegionKeyCount, + importNumGoroutines: DefaultImportNumGoroutines, + regionSplitSize: DefaultMergeRegionSizeBytes, + regionSplitKeys: DefaultMergeRegionKeyCount, }, { stores: []*metapb.Store{ @@ -338,8 +341,10 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, }, content: []string{ - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 1, \"region-split-size\": \"1MiB\"}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 1, \"region-split-size\": \"1MiB\"}, \"import\": {\"num-threads\": 6}}", }, + // the number of import goroutines is 8 times than import.num-threads. + importNumGoroutines: 48, // one tikv detected in this case we are not update default size and keys because they are too small. regionSplitSize: 1 * units.MiB, regionSplitKeys: 1, @@ -358,8 +363,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, }, content: []string{ - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, import\": {\"num-threads\": 128}}", }, + importNumGoroutines: 1024, // one tikv detected in this case and we update with new size and keys. regionSplitSize: 1 * units.GiB, regionSplitKeys: 10000000, @@ -388,12 +394,13 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, }, content: []string{ - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}}", - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, import\": {\"num-threads\": 128}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}, import\": {\"num-threads\": 12}}", }, // two tikv detected in this case and we choose the small one. - regionSplitSize: 900 * units.MiB, - regionSplitKeys: 12000000, + importNumGoroutines: 96, + regionSplitSize: 900 * units.MiB, + regionSplitKeys: 12000000, }, } @@ -420,8 +427,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { httpCli := mockServer.Client() mgr := &Mgr{PdController: &pdutil.PdController{}} mgr.PdController.SetPDClient(pdCli) - rs, rk := mgr.GetMergeRegionSizeAndCount(ctx, httpCli) + rs, rk, threads := mgr.GetTiKVConfigs(ctx, httpCli) require.Equal(t, ca.regionSplitSize, rs) + require.Equal(t, ca.importNumGoroutines, threads) require.Equal(t, ca.regionSplitKeys, rk) mockServer.Close() } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 596be687e730d..a4598f60c1b5a 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -136,7 +136,7 @@ func (cfg *RestoreCommonConfig) adjust() { cfg.Granularity = string(restore.FineGrained) } if cfg.ConcurrencyPerStore.hasSet { - cfg.ConcurrencyPerStore.value = conn.DefaultImportNumThreads + cfg.ConcurrencyPerStore.value = conn.DefaultImportNumGoroutines } } @@ -721,13 +721,13 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf mergeRegionSize := cfg.MergeSmallRegionSizeBytes mergeRegionCount := cfg.MergeSmallRegionKeyCount - importNumThreads := cfg.ConcurrencyPerStore + importNumGoroutines := cfg.ConcurrencyPerStore - if !mergeRegionSize.hasSet || !mergeRegionCount.hasSet || !importNumThreads.hasSet { + if !mergeRegionSize.hasSet || !mergeRegionCount.hasSet || !importNumGoroutines.hasSet { // according to https://github.com/pingcap/tidb/issues/34167. // we should get the real config from tikv to adapt the dynamic region. httpCli := httputil.NewClient(mgr.GetTLSConfig()) - mergeRegionSize.value, mergeRegionCount.value, importNumThreads.value = mgr.GetTiKVConfigs(ctx, httpCli) + mergeRegionSize.value, mergeRegionCount.value, importNumGoroutines.value = mgr.GetTiKVConfigs(ctx, httpCli) } keepaliveCfg.PermitWithoutStream = true @@ -739,7 +739,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf false, ) // using tikv config to set the concurrency-per-store for client. - client.SetConcurrencyPerStore(importNumThreads.value) + client.SetConcurrencyPerStore(importNumGoroutines.value) err = configureRestoreClient(ctx, client, cfg) if err != nil { return errors.Trace(err) From ff8a611b626f1cd970270cea9b961ca5cb424142 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 5 Feb 2024 17:40:38 +0800 Subject: [PATCH 03/11] update test --- br/pkg/config/kv.go | 58 +++++++++++++++++++ br/pkg/conn/conn.go | 104 ++++++++++++++--------------------- br/pkg/conn/conn_test.go | 24 +++++--- br/pkg/restore/split_test.go | 14 +++++ br/pkg/task/restore.go | 65 +++++++++++----------- br/pkg/task/restore_raw.go | 17 ++++-- 6 files changed, 173 insertions(+), 109 deletions(-) create mode 100644 br/pkg/config/kv.go diff --git a/br/pkg/config/kv.go b/br/pkg/config/kv.go new file mode 100644 index 0000000000000..af8aaa2717977 --- /dev/null +++ b/br/pkg/config/kv.go @@ -0,0 +1,58 @@ +// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. +package config + +import ( + "encoding/json" + + "github.com/docker/go-units" +) + +type ConfigSet[T uint | uint64] struct { + Value T + HasSet bool +} + +type KVConfig struct { + ImportGoroutines ConfigSet[uint] + MergeRegionSize ConfigSet[uint64] + MergeRegionKeyCount ConfigSet[uint64] +} + +func ParseImportThreadsFromConfig(resp []byte) (uint, error) { + type importer struct { + Threads uint `json:"num-threads"` + } + + type config struct { + Import importer `json:"import"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return 0, e + } + + return c.Import.Threads, nil +} + +func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) { + type coprocessor struct { + RegionSplitSize string `json:"region-split-size"` + RegionSplitKeys uint64 `json:"region-split-keys"` + } + + type config struct { + Cop coprocessor `json:"coprocessor"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return 0, 0, e + } + rs, e := units.RAMInBytes(c.Cop.RegionSplitSize) + if e != nil { + return 0, 0, e + } + urs := uint64(rs) + return urs, c.Cop.RegionSplitKeys, nil +} diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index cb7d5276cca7c..4bf7419b1512b 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -5,8 +5,8 @@ package conn import ( "context" "crypto/tls" - "encoding/json" "fmt" + "io" "net" "net/http" "net/url" @@ -20,6 +20,7 @@ import ( logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + kvconfig "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/conn/util" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" @@ -292,77 +293,56 @@ func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) { return oracle.ComposeTS(p, l), nil } -func ParseImportThreadsFromConfig(resp *http.Response) (uint, error) { - type importer struct { - Threads uint `json:"num-threads"` +// ProcessTiKVConfigs handle the tikv config for region split size, region split keys, and import goroutines in place. +// It retrieves the config from all alive tikv stores and returns the minimum values. +// If retrieving the config fails, it returns the default config values. +func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, client *http.Client) { + mergeRegionSize := cfg.MergeRegionSize + mergeRegionKeyCount := cfg.MergeRegionKeyCount + importGoroutines := cfg.ImportGoroutines + + if mergeRegionSize.HasSet && mergeRegionKeyCount.HasSet && importGoroutines.HasSet { + log.Info("no need to retrieve the config from tikv if user has set the config") + return } - type config struct { - Import importer `json:"import"` - } - c := &config{} - e := json.NewDecoder(resp.Body).Decode(c) - if e != nil { - return 0, e - } - - return c.Import.Threads, nil -} - -func ParseMergeRegionSizeAndCountFromConfig(resp *http.Response) (uint64, uint64, error) { - type coprocessor struct { - RegionSplitKeys uint64 `json:"region-split-keys"` - RegionSplitSize string `json:"region-split-size"` - } - - type config struct { - Cop coprocessor `json:"coprocessor"` - } - c := &config{} - e := json.NewDecoder(resp.Body).Decode(c) - if e != nil { - return 0, 0, e - } - rs, e := units.RAMInBytes(c.Cop.RegionSplitSize) - if e != nil { - return 0, 0, e - } - urs := uint64(rs) - return urs, c.Cop.RegionSplitKeys, nil -} - -// GetTiKVConfigs returns the tikv config -// `coprocessor.region-split-size` and `coprocessor.region-split-key` and `import.num-threads`. -// returns the default config when failed. -func (mgr *Mgr) GetTiKVConfigs(ctx context.Context, client *http.Client) (uint64, uint64, uint) { - regionSplitSize := DefaultMergeRegionSizeBytes - regionSplitKeys := DefaultMergeRegionKeyCount - importGoroutines := DefaultImportNumGoroutines err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error { - rs, rk, e := ParseMergeRegionSizeAndCountFromConfig(resp) - if e != nil { - return e - } - if regionSplitSize == DefaultMergeRegionSizeBytes || rs < regionSplitSize { - regionSplitSize = rs - regionSplitKeys = rk + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return err } - n, e := ParseImportThreadsFromConfig(resp) - if e != nil { - log.Warn("meet error when parsing import num-threads, ignore it", logutil.ShortError(e)) - return e + if !mergeRegionSize.HasSet || !mergeRegionKeyCount.HasSet { + size, keys, e := kvconfig.ParseMergeRegionSizeFromConfig(respBytes) + if e != nil { + log.Warn("Failed to parse region split size and keys from config", logutil.ShortError(e)) + return e + } + if mergeRegionKeyCount.Value == DefaultMergeRegionKeyCount || keys < mergeRegionKeyCount.Value { + mergeRegionSize.Value = size + mergeRegionKeyCount.Value = keys + } } - // we use 8 times of the default value because it's an IO-bound cases. - if n > 0 && n*8 < importGoroutines { - importGoroutines = n * 8 + if !importGoroutines.HasSet { + threads, e := kvconfig.ParseImportThreadsFromConfig(respBytes) + if e != nil { + log.Warn("Failed to parse import num-threads from config", logutil.ShortError(e)) + return e + } + // We use 8 times the default value because it's an IO-bound case. + if importGoroutines.Value == DefaultImportNumGoroutines || (threads > 0 && threads*8 < importGoroutines.Value) { + importGoroutines.Value = threads * 8 + } } + // replace the value + cfg.MergeRegionSize = mergeRegionSize + cfg.MergeRegionKeyCount = mergeRegionKeyCount + cfg.ImportGoroutines = importGoroutines return nil }) + if err != nil { - log.Warn("meet error when getting config from TiKV; using default", logutil.ShortError(err)) - return DefaultMergeRegionSizeBytes, DefaultMergeRegionKeyCount, DefaultImportNumGoroutines + log.Warn("Failed to get config from TiKV; using default", logutil.ShortError(err)) } - return regionSplitSize, regionSplitKeys, importGoroutines } // GetConfigFromTiKV get configs from all alive tikv stores. diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 6f17be4f2a2d1..ac129189c8b99 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + kvconfig "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/utils" @@ -363,7 +364,7 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, }, content: []string{ - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, import\": {\"num-threads\": 128}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, \"import\": {\"num-threads\": 128}}", }, importNumGoroutines: 1024, // one tikv detected in this case and we update with new size and keys. @@ -394,13 +395,13 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { }, }, content: []string{ - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, import\": {\"num-threads\": 128}}", - "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}, import\": {\"num-threads\": 12}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, \"import\": {\"num-threads\": 128}}", + "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}, \"import\": {\"num-threads\": 12}}", }, // two tikv detected in this case and we choose the small one. importNumGoroutines: 96, - regionSplitSize: 900 * units.MiB, - regionSplitKeys: 12000000, + regionSplitSize: 1 * units.GiB, + regionSplitKeys: 10000000, }, } @@ -427,10 +428,15 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { httpCli := mockServer.Client() mgr := &Mgr{PdController: &pdutil.PdController{}} mgr.PdController.SetPDClient(pdCli) - rs, rk, threads := mgr.GetTiKVConfigs(ctx, httpCli) - require.Equal(t, ca.regionSplitSize, rs) - require.Equal(t, ca.importNumGoroutines, threads) - require.Equal(t, ca.regionSplitKeys, rk) + kvConfigs := &kvconfig.KVConfig{ + ImportGoroutines: kvconfig.ConfigSet[uint]{Value: DefaultImportNumGoroutines, HasSet: false}, + MergeRegionSize: kvconfig.ConfigSet[uint64]{Value: DefaultMergeRegionSizeBytes, HasSet: false}, + MergeRegionKeyCount: kvconfig.ConfigSet[uint64]{Value: DefaultMergeRegionKeyCount, HasSet: false}, + } + mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) + require.EqualValues(t, ca.regionSplitSize, kvConfigs.MergeRegionSize.Value) + require.EqualValues(t, ca.regionSplitKeys, kvConfigs.MergeRegionKeyCount.Value) + require.EqualValues(t, ca.importNumGoroutines, kvConfigs.ImportGoroutines.Value) mockServer.Close() } } diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 583847bf6e2a3..fb602037ce52a 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -723,6 +723,20 @@ func TestChooseSplitKeysBySize(t *testing.T) { require.EqualValues(t, size, 0) } +func TestPickupRangesForRegions(t *testing.T) { + cases := []struct { + ranges []rtree.Range + regions []*split.RegionInfo + rangesMap map[uint64][]rtree.Range + regionsMap map[uint64][]*split.RegionInfo + }{} + + for _, ca := range cases { + rangesMap, regionsMap := restore.PickupRangesForRegions(ca.ranges, ca.regions) + require.Equal() + } +} + func TestNeedSplit(t *testing.T) { testNeedSplit(t, false) testNeedSplit(t, true) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index a4598f60c1b5a..b304d2430e734 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -100,22 +100,17 @@ const ( TxnRestoreCmd = "Txn Restore" ) -type ConfigSet[T uint | uint64] struct { - value T - hasSet bool -} - // RestoreCommonConfig is the common configuration for all BR restore tasks. type RestoreCommonConfig struct { - Online bool `json:"online" toml:"online"` - Granularity string `json:"granularity" toml:"granularity"` - ConcurrencyPerStore ConfigSet[uint] `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"` + Online bool `json:"online" toml:"online"` + Granularity string `json:"granularity" toml:"granularity"` + ConcurrencyPerStore pconfig.ConfigSet[uint] `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"` // MergeSmallRegionSizeBytes is the threshold of merging small regions (Default 96MB, region split size). // MergeSmallRegionKeyCount is the threshold of merging smalle regions (Default 960_000, region split key count). // See https://github.com/tikv/tikv/blob/v4.0.8/components/raftstore/src/coprocessor/config.rs#L35-L38 - MergeSmallRegionSizeBytes ConfigSet[uint64] `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` - MergeSmallRegionKeyCount ConfigSet[uint64] `json:"merge-region-key-count" toml:"merge-region-key-count"` + MergeSmallRegionSizeBytes pconfig.ConfigSet[uint64] `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` + MergeSmallRegionKeyCount pconfig.ConfigSet[uint64] `json:"merge-region-key-count" toml:"merge-region-key-count"` // determines whether enable restore sys table on default, see fullClusterRestore in restore/client.go WithSysTable bool `json:"with-sys-table" toml:"with-sys-table"` @@ -126,18 +121,21 @@ type RestoreCommonConfig struct { // adjust adjusts the abnormal config value in the current config. // useful when not starting BR from CLI (e.g. from BRIE in SQL). func (cfg *RestoreCommonConfig) adjust() { - if !cfg.MergeSmallRegionKeyCount.hasSet { - cfg.MergeSmallRegionKeyCount.value = conn.DefaultMergeRegionKeyCount + if !cfg.MergeSmallRegionKeyCount.HasSet { + cfg.MergeSmallRegionKeyCount.Value = conn.DefaultMergeRegionKeyCount } - if !cfg.MergeSmallRegionSizeBytes.hasSet { - cfg.MergeSmallRegionSizeBytes.value = conn.DefaultMergeRegionSizeBytes + cfg.MergeSmallRegionKeyCount.Name = FlagMergeRegionKeyCount + if !cfg.MergeSmallRegionSizeBytes.HasSet { + cfg.MergeSmallRegionSizeBytes.Value = conn.DefaultMergeRegionSizeBytes } + cfg.MergeSmallRegionSizeBytes.Name = FlagMergeRegionSizeBytes if len(cfg.Granularity) == 0 { cfg.Granularity = string(restore.FineGrained) } - if cfg.ConcurrencyPerStore.hasSet { - cfg.ConcurrencyPerStore.value = conn.DefaultImportNumGoroutines + if cfg.ConcurrencyPerStore.HasSet { + cfg.ConcurrencyPerStore.Value = conn.DefaultImportNumGoroutines } + cfg.ConcurrencyPerStore.Name = flagConcurrencyPerStore } // DefineRestoreCommonFlags defines common flags for the restore command. @@ -183,23 +181,23 @@ func (cfg *RestoreCommonConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - cfg.ConcurrencyPerStore.value, err = flags.GetUint(flagConcurrencyPerStore) + cfg.ConcurrencyPerStore.Value, err = flags.GetUint(flagConcurrencyPerStore) if err != nil { return errors.Trace(err) } - cfg.ConcurrencyPerStore.hasSet = flags.Changed(flagConcurrencyPerStore) + cfg.ConcurrencyPerStore.HasSet = flags.Changed(flagConcurrencyPerStore) - cfg.MergeSmallRegionKeyCount.value, err = flags.GetUint64(FlagMergeRegionKeyCount) + cfg.MergeSmallRegionKeyCount.Value, err = flags.GetUint64(FlagMergeRegionKeyCount) if err != nil { return errors.Trace(err) } - cfg.MergeSmallRegionKeyCount.hasSet = flags.Changed(FlagMergeRegionKeyCount) + cfg.MergeSmallRegionKeyCount.HasSet = flags.Changed(FlagMergeRegionKeyCount) - cfg.MergeSmallRegionSizeBytes.value, err = flags.GetUint64(FlagMergeRegionSizeBytes) + cfg.MergeSmallRegionSizeBytes.Value, err = flags.GetUint64(FlagMergeRegionSizeBytes) if err != nil { return errors.Trace(err) } - cfg.MergeSmallRegionSizeBytes.hasSet = flags.Changed(FlagMergeRegionSizeBytes) + cfg.MergeSmallRegionSizeBytes.HasSet = flags.Changed(FlagMergeRegionSizeBytes) if flags.Lookup(flagWithSysTable) != nil { cfg.WithSysTable, err = flags.GetBool(flagWithSysTable) @@ -719,17 +717,18 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf defer mgr.Close() codec := mgr.GetStorage().GetCodec() - mergeRegionSize := cfg.MergeSmallRegionSizeBytes - mergeRegionCount := cfg.MergeSmallRegionKeyCount - importNumGoroutines := cfg.ConcurrencyPerStore - - if !mergeRegionSize.hasSet || !mergeRegionCount.hasSet || !importNumGoroutines.hasSet { - // according to https://github.com/pingcap/tidb/issues/34167. - // we should get the real config from tikv to adapt the dynamic region. - httpCli := httputil.NewClient(mgr.GetTLSConfig()) - mergeRegionSize.value, mergeRegionCount.value, importNumGoroutines.value = mgr.GetTiKVConfigs(ctx, httpCli) + // need retrive these configs from tikv if not set in command. + kvConfigs := &pconfig.KVConfig{ + ImportGoroutines: cfg.ConcurrencyPerStore, + MergeRegionSize: cfg.MergeSmallRegionSizeBytes, + MergeRegionKeyCount: cfg.MergeSmallRegionKeyCount, } + // according to https://github.com/pingcap/tidb/issues/34167. + // we should get the real config from tikv to adapt the dynamic region. + httpCli := httputil.NewClient(mgr.GetTLSConfig()) + mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) + keepaliveCfg.PermitWithoutStream = true client := restore.NewRestoreClient( mgr.GetPDClient(), @@ -739,7 +738,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf false, ) // using tikv config to set the concurrency-per-store for client. - client.SetConcurrencyPerStore(importNumGoroutines.value) + client.SetConcurrencyPerStore(kvConfigs.ImportGoroutines.Value) err = configureRestoreClient(ctx, client, cfg) if err != nil { return errors.Trace(err) @@ -998,7 +997,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf log.Debug("mapped table to files", zap.Any("result map", tableFileMap)) rangeStream := restore.GoValidateFileRanges( - ctx, tableStream, tableFileMap, mergeRegionSize.value, mergeRegionCount.value, errCh) + ctx, tableStream, tableFileMap, kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value, errCh) rangeSize := restore.EstimateRangeSize(files) summary.CollectInt("restore ranges", rangeSize) diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 50c7194f8fbc4..dd6e27551fa64 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -7,6 +7,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + kvconfig "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" @@ -73,13 +74,19 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } defer mgr.Close() - mergeRegionSize := cfg.MergeSmallRegionSizeBytes - mergeRegionCount := cfg.MergeSmallRegionKeyCount - if !mergeRegionSize.hasSet || !mergeRegionSize.hasSet { + // need retrive these configs from tikv if not set in command. + kvConfigs := &kvconfig.KVConfig{ + MergeRegionSize: cfg.MergeSmallRegionSizeBytes, + MergeRegionKeyCount: cfg.MergeSmallRegionKeyCount, + } + + if !kvConfigs.MergeRegionSize.HasSet || !kvConfigs.MergeRegionKeyCount.HasSet { // according to https://github.com/pingcap/tidb/issues/34167. // we should get the real config from tikv to adapt the dynamic region. + kvConfigs.MergeRegionSize.ParseFn = kvconfig.ParseMergeRegionSizeFromConfig + kvConfigs.MergeRegionKeyCount.ParseFn = kvconfig.ParseMergeRegionKeysFromConfig httpCli := httputil.NewClient(mgr.GetTLSConfig()) - mergeRegionSize.value, mergeRegionCount.value, _ = mgr.GetTiKVConfigs(ctx, httpCli) + mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) } keepaliveCfg := GetKeepalive(&cfg.Config) @@ -133,7 +140,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR summary.CollectInt("restore files", len(files)) ranges, _, err := restore.MergeFileRanges( - files, mergeRegionSize.value, mergeRegionCount.value) + files, kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value) if err != nil { return errors.Trace(err) } From 75258a8417925ba102527080a805649c9ef984b6 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 5 Feb 2024 17:42:11 +0800 Subject: [PATCH 04/11] fix --- br/pkg/restore/split_test.go | 14 -------------- br/pkg/task/restore_test.go | 4 ++-- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index fb602037ce52a..583847bf6e2a3 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -723,20 +723,6 @@ func TestChooseSplitKeysBySize(t *testing.T) { require.EqualValues(t, size, 0) } -func TestPickupRangesForRegions(t *testing.T) { - cases := []struct { - ranges []rtree.Range - regions []*split.RegionInfo - rangesMap map[uint64][]rtree.Range - regionsMap map[uint64][]*split.RegionInfo - }{} - - for _, ca := range cases { - rangesMap, regionsMap := restore.PickupRangesForRegions(ca.ranges, ca.regions) - require.Equal() - } -} - func TestNeedSplit(t *testing.T) { testNeedSplit(t, false) testNeedSplit(t, true) diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index e64220810baaf..b42a1ab095dce 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -31,8 +31,8 @@ func TestRestoreConfigAdjust(t *testing.T) { require.Equal(t, uint32(defaultRestoreConcurrency), cfg.Config.Concurrency) require.Equal(t, defaultSwitchInterval, cfg.Config.SwitchModeInterval) - require.Equal(t, conn.DefaultMergeRegionKeyCount, cfg.MergeSmallRegionKeyCount.value) - require.Equal(t, conn.DefaultMergeRegionSizeBytes, cfg.MergeSmallRegionSizeBytes.value) + require.Equal(t, conn.DefaultMergeRegionKeyCount, cfg.MergeSmallRegionKeyCount.Value) + require.Equal(t, conn.DefaultMergeRegionSizeBytes, cfg.MergeSmallRegionSizeBytes.Value) } type mockPDClient struct { From 6cfdda26ea954fc128d1fce209255cbd64b9cdfb Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 6 Feb 2024 10:46:45 +0800 Subject: [PATCH 05/11] fix build --- br/pkg/task/restore.go | 3 --- br/pkg/task/restore_raw.go | 2 -- 2 files changed, 5 deletions(-) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index b304d2430e734..a157de20f5d34 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -124,18 +124,15 @@ func (cfg *RestoreCommonConfig) adjust() { if !cfg.MergeSmallRegionKeyCount.HasSet { cfg.MergeSmallRegionKeyCount.Value = conn.DefaultMergeRegionKeyCount } - cfg.MergeSmallRegionKeyCount.Name = FlagMergeRegionKeyCount if !cfg.MergeSmallRegionSizeBytes.HasSet { cfg.MergeSmallRegionSizeBytes.Value = conn.DefaultMergeRegionSizeBytes } - cfg.MergeSmallRegionSizeBytes.Name = FlagMergeRegionSizeBytes if len(cfg.Granularity) == 0 { cfg.Granularity = string(restore.FineGrained) } if cfg.ConcurrencyPerStore.HasSet { cfg.ConcurrencyPerStore.Value = conn.DefaultImportNumGoroutines } - cfg.ConcurrencyPerStore.Name = flagConcurrencyPerStore } // DefineRestoreCommonFlags defines common flags for the restore command. diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index dd6e27551fa64..942248ab0e30d 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -83,8 +83,6 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR if !kvConfigs.MergeRegionSize.HasSet || !kvConfigs.MergeRegionKeyCount.HasSet { // according to https://github.com/pingcap/tidb/issues/34167. // we should get the real config from tikv to adapt the dynamic region. - kvConfigs.MergeRegionSize.ParseFn = kvconfig.ParseMergeRegionSizeFromConfig - kvConfigs.MergeRegionKeyCount.ParseFn = kvconfig.ParseMergeRegionKeysFromConfig httpCli := httputil.NewClient(mgr.GetTLSConfig()) mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) } From a73b1f2d631fdac198b4947479f9a5ccd7469c4a Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 7 Feb 2024 18:34:37 +0800 Subject: [PATCH 06/11] fix bazel --- br/pkg/config/BUILD.bazel | 6 +++++- br/pkg/conn/BUILD.bazel | 2 ++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/br/pkg/config/BUILD.bazel b/br/pkg/config/BUILD.bazel index 4bddb6debfa2a..022078ea4f0fc 100644 --- a/br/pkg/config/BUILD.bazel +++ b/br/pkg/config/BUILD.bazel @@ -2,12 +2,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "config", - srcs = ["ebs.go"], + srcs = [ + "ebs.go", + "kv.go", + ], importpath = "github.com/pingcap/tidb/br/pkg/config", visibility = ["//visibility:public"], deps = [ "//br/pkg/metautil", "//br/pkg/storage", + "@com_github_docker_go_units//:go-units", "@com_github_masterminds_semver//:semver", "@com_github_pingcap_errors//:errors", "@io_k8s_api//core/v1:core", diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index 0d0227fc9db48..e42fdf7b7f046 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/conn", visibility = ["//visibility:public"], deps = [ + "//br/pkg/config", "//br/pkg/conn/util", "//br/pkg/errors", "//br/pkg/glue", @@ -47,6 +48,7 @@ go_test( flaky = True, shard_count = 7, deps = [ + "//br/pkg/config", "//br/pkg/conn/util", "//br/pkg/pdutil", "//br/pkg/utils", From 2f45f83d525963513b8c4fbebca811d3736f6d7e Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 7 Feb 2024 20:05:52 +0800 Subject: [PATCH 07/11] fix build --- br/pkg/task/restore.go | 2 +- br/pkg/task/restore_raw.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index a157de20f5d34..e1c975d0bf27f 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -714,7 +714,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf defer mgr.Close() codec := mgr.GetStorage().GetCodec() - // need retrive these configs from tikv if not set in command. + // need retrieve these configs from tikv if not set in command. kvConfigs := &pconfig.KVConfig{ ImportGoroutines: cfg.ConcurrencyPerStore, MergeRegionSize: cfg.MergeSmallRegionSizeBytes, diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 942248ab0e30d..5097c5426cda6 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -74,7 +74,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } defer mgr.Close() - // need retrive these configs from tikv if not set in command. + // need retrieve these configs from tikv if not set in command. kvConfigs := &kvconfig.KVConfig{ MergeRegionSize: cfg.MergeSmallRegionSizeBytes, MergeRegionKeyCount: cfg.MergeSmallRegionKeyCount, From b5f0f9acde1e0dd9004917bb14b3d3fb5f34e82a Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 7 Feb 2024 20:26:17 +0800 Subject: [PATCH 08/11] fix test --- br/pkg/rtree/logging_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/rtree/logging_test.go b/br/pkg/rtree/logging_test.go index 9630cda766aeb..7e977a1d9e598 100644 --- a/br/pkg/rtree/logging_test.go +++ b/br/pkg/rtree/logging_test.go @@ -34,7 +34,7 @@ func TestLogRanges(t *testing.T) { ranges := make([]rtree.Range, cs.count) for j := 0; j < cs.count; j++ { ranges[j] = *newRange([]byte(fmt.Sprintf("%d", j)), []byte(fmt.Sprintf("%d", j+1))) - ranges[j].Files = append(ranges[j].Files, &backuppb.File{TotalKvs: uint64(j), TotalBytes: uint64(j)}) + ranges[j].Files = append(ranges[j].Files, &backuppb.File{TotalKvs: uint64(j), TotalBytes: uint64(j), Size_: uint64(j)}) } out, err := encoder.EncodeEntry(zapcore.Entry{}, []zap.Field{rtree.ZapRanges(ranges)}) require.NoError(t, err) From a893759d1a84c03c01846951f72a9869543de6fd Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 8 Feb 2024 16:28:20 +0800 Subject: [PATCH 09/11] fix build --- br/pkg/task/common_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index fcc8bdc5c1236..eb350fe1872e6 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -9,6 +9,7 @@ import ( backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" + kvconfig "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/config" @@ -208,8 +209,8 @@ func expectedDefaultRestoreConfig() RestoreConfig { Config: defaultConfig, RestoreCommonConfig: RestoreCommonConfig{Online: false, Granularity: "fine-grained", - MergeSmallRegionSizeBytes: ConfigSet[uint64]{value: 0x6000000}, - MergeSmallRegionKeyCount: ConfigSet[uint64]{value: 0xea600}, + MergeSmallRegionSizeBytes: kvconfig.ConfigSet[uint64]{Value: 0x6000000}, + MergeSmallRegionKeyCount: kvconfig.ConfigSet[uint64]{Value: 0xea600}, WithSysTable: true, ResetSysUsers: []string{"cloud_admin", "root"}}, NoSchema: false, From 21b478dfd280ccd635212f38c57efdd4c459f0f6 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 8 Feb 2024 16:34:53 +0800 Subject: [PATCH 10/11] update --- br/pkg/task/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index bd381b38a2c7a..760a968a65b3e 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -102,6 +102,7 @@ go_test( flaky = True, shard_count = 22, deps = [ + "//br/pkg/config", "//br/pkg/conn", "//br/pkg/errors", "//br/pkg/metautil", From c04e0adfdeaf322176279113af387a6c3d219b67 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Sun, 25 Feb 2024 10:33:49 +0800 Subject: [PATCH 11/11] address some comments --- br/pkg/config/kv.go | 12 ++++++------ br/pkg/conn/conn.go | 6 +++--- br/pkg/conn/conn_test.go | 6 +++--- br/pkg/task/common_test.go | 4 ++-- br/pkg/task/restore.go | 22 +++++++++++----------- br/pkg/task/restore_raw.go | 2 +- 6 files changed, 26 insertions(+), 26 deletions(-) diff --git a/br/pkg/config/kv.go b/br/pkg/config/kv.go index af8aaa2717977..0283c8d3cc721 100644 --- a/br/pkg/config/kv.go +++ b/br/pkg/config/kv.go @@ -7,15 +7,15 @@ import ( "github.com/docker/go-units" ) -type ConfigSet[T uint | uint64] struct { - Value T - HasSet bool +type ConfigTerm[T uint | uint64] struct { + Value T + Modified bool } type KVConfig struct { - ImportGoroutines ConfigSet[uint] - MergeRegionSize ConfigSet[uint64] - MergeRegionKeyCount ConfigSet[uint64] + ImportGoroutines ConfigTerm[uint] + MergeRegionSize ConfigTerm[uint64] + MergeRegionKeyCount ConfigTerm[uint64] } func ParseImportThreadsFromConfig(resp []byte) (uint, error) { diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 4bf7419b1512b..d76b11202cd9f 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -301,7 +301,7 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, mergeRegionKeyCount := cfg.MergeRegionKeyCount importGoroutines := cfg.ImportGoroutines - if mergeRegionSize.HasSet && mergeRegionKeyCount.HasSet && importGoroutines.HasSet { + if mergeRegionSize.Modified && mergeRegionKeyCount.Modified && importGoroutines.Modified { log.Info("no need to retrieve the config from tikv if user has set the config") return } @@ -311,7 +311,7 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, if err != nil { return err } - if !mergeRegionSize.HasSet || !mergeRegionKeyCount.HasSet { + if !mergeRegionSize.Modified || !mergeRegionKeyCount.Modified { size, keys, e := kvconfig.ParseMergeRegionSizeFromConfig(respBytes) if e != nil { log.Warn("Failed to parse region split size and keys from config", logutil.ShortError(e)) @@ -322,7 +322,7 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, mergeRegionKeyCount.Value = keys } } - if !importGoroutines.HasSet { + if !importGoroutines.Modified { threads, e := kvconfig.ParseImportThreadsFromConfig(respBytes) if e != nil { log.Warn("Failed to parse import num-threads from config", logutil.ShortError(e)) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index ac129189c8b99..576c3f7feb826 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -429,9 +429,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { mgr := &Mgr{PdController: &pdutil.PdController{}} mgr.PdController.SetPDClient(pdCli) kvConfigs := &kvconfig.KVConfig{ - ImportGoroutines: kvconfig.ConfigSet[uint]{Value: DefaultImportNumGoroutines, HasSet: false}, - MergeRegionSize: kvconfig.ConfigSet[uint64]{Value: DefaultMergeRegionSizeBytes, HasSet: false}, - MergeRegionKeyCount: kvconfig.ConfigSet[uint64]{Value: DefaultMergeRegionKeyCount, HasSet: false}, + ImportGoroutines: kvconfig.ConfigTerm[uint]{Value: DefaultImportNumGoroutines, Modified: false}, + MergeRegionSize: kvconfig.ConfigTerm[uint64]{Value: DefaultMergeRegionSizeBytes, Modified: false}, + MergeRegionKeyCount: kvconfig.ConfigTerm[uint64]{Value: DefaultMergeRegionKeyCount, Modified: false}, } mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) require.EqualValues(t, ca.regionSplitSize, kvConfigs.MergeRegionSize.Value) diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index eb350fe1872e6..65369afd1a018 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -209,8 +209,8 @@ func expectedDefaultRestoreConfig() RestoreConfig { Config: defaultConfig, RestoreCommonConfig: RestoreCommonConfig{Online: false, Granularity: "fine-grained", - MergeSmallRegionSizeBytes: kvconfig.ConfigSet[uint64]{Value: 0x6000000}, - MergeSmallRegionKeyCount: kvconfig.ConfigSet[uint64]{Value: 0xea600}, + MergeSmallRegionSizeBytes: kvconfig.ConfigTerm[uint64]{Value: 0x6000000}, + MergeSmallRegionKeyCount: kvconfig.ConfigTerm[uint64]{Value: 0xea600}, WithSysTable: true, ResetSysUsers: []string{"cloud_admin", "root"}}, NoSchema: false, diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index e1c975d0bf27f..ea5ce851f395d 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -102,15 +102,15 @@ const ( // RestoreCommonConfig is the common configuration for all BR restore tasks. type RestoreCommonConfig struct { - Online bool `json:"online" toml:"online"` - Granularity string `json:"granularity" toml:"granularity"` - ConcurrencyPerStore pconfig.ConfigSet[uint] `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"` + Online bool `json:"online" toml:"online"` + Granularity string `json:"granularity" toml:"granularity"` + ConcurrencyPerStore pconfig.ConfigTerm[uint] `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"` // MergeSmallRegionSizeBytes is the threshold of merging small regions (Default 96MB, region split size). // MergeSmallRegionKeyCount is the threshold of merging smalle regions (Default 960_000, region split key count). // See https://github.com/tikv/tikv/blob/v4.0.8/components/raftstore/src/coprocessor/config.rs#L35-L38 - MergeSmallRegionSizeBytes pconfig.ConfigSet[uint64] `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` - MergeSmallRegionKeyCount pconfig.ConfigSet[uint64] `json:"merge-region-key-count" toml:"merge-region-key-count"` + MergeSmallRegionSizeBytes pconfig.ConfigTerm[uint64] `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"` + MergeSmallRegionKeyCount pconfig.ConfigTerm[uint64] `json:"merge-region-key-count" toml:"merge-region-key-count"` // determines whether enable restore sys table on default, see fullClusterRestore in restore/client.go WithSysTable bool `json:"with-sys-table" toml:"with-sys-table"` @@ -121,16 +121,16 @@ type RestoreCommonConfig struct { // adjust adjusts the abnormal config value in the current config. // useful when not starting BR from CLI (e.g. from BRIE in SQL). func (cfg *RestoreCommonConfig) adjust() { - if !cfg.MergeSmallRegionKeyCount.HasSet { + if !cfg.MergeSmallRegionKeyCount.Modified { cfg.MergeSmallRegionKeyCount.Value = conn.DefaultMergeRegionKeyCount } - if !cfg.MergeSmallRegionSizeBytes.HasSet { + if !cfg.MergeSmallRegionSizeBytes.Modified { cfg.MergeSmallRegionSizeBytes.Value = conn.DefaultMergeRegionSizeBytes } if len(cfg.Granularity) == 0 { cfg.Granularity = string(restore.FineGrained) } - if cfg.ConcurrencyPerStore.HasSet { + if cfg.ConcurrencyPerStore.Modified { cfg.ConcurrencyPerStore.Value = conn.DefaultImportNumGoroutines } } @@ -182,19 +182,19 @@ func (cfg *RestoreCommonConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - cfg.ConcurrencyPerStore.HasSet = flags.Changed(flagConcurrencyPerStore) + cfg.ConcurrencyPerStore.Modified = flags.Changed(flagConcurrencyPerStore) cfg.MergeSmallRegionKeyCount.Value, err = flags.GetUint64(FlagMergeRegionKeyCount) if err != nil { return errors.Trace(err) } - cfg.MergeSmallRegionKeyCount.HasSet = flags.Changed(FlagMergeRegionKeyCount) + cfg.MergeSmallRegionKeyCount.Modified = flags.Changed(FlagMergeRegionKeyCount) cfg.MergeSmallRegionSizeBytes.Value, err = flags.GetUint64(FlagMergeRegionSizeBytes) if err != nil { return errors.Trace(err) } - cfg.MergeSmallRegionSizeBytes.HasSet = flags.Changed(FlagMergeRegionSizeBytes) + cfg.MergeSmallRegionSizeBytes.Modified = flags.Changed(FlagMergeRegionSizeBytes) if flags.Lookup(flagWithSysTable) != nil { cfg.WithSysTable, err = flags.GetBool(flagWithSysTable) diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 5097c5426cda6..0e4a89a99fe5e 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -80,7 +80,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR MergeRegionKeyCount: cfg.MergeSmallRegionKeyCount, } - if !kvConfigs.MergeRegionSize.HasSet || !kvConfigs.MergeRegionKeyCount.HasSet { + if !kvConfigs.MergeRegionSize.Modified || !kvConfigs.MergeRegionKeyCount.Modified { // according to https://github.com/pingcap/tidb/issues/34167. // we should get the real config from tikv to adapt the dynamic region. httpCli := httputil.NewClient(mgr.GetTLSConfig())