diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 34a1117b55fc9..1c6eb1e7cb8d6 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -267,15 +267,15 @@ func (*encodingBuilder) MakeEmptyRows() encode.Rows { type targetInfoGetter struct { tls *common.TLS targetDB *sql.DB - pdAddr string + pdCli pd.Client } // NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation. -func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdAddr string) backend.TargetInfoGetter { +func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.TargetInfoGetter { return &targetInfoGetter{ tls: tls, targetDB: db, - pdAddr: pdAddr, + pdCli: pdCli, } } @@ -296,10 +296,10 @@ func (g *targetInfoGetter) CheckRequirements(ctx context.Context, checkCtx *back if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil { return err } - if err := tikv.CheckPDVersion(ctx, g.tls, g.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil { + if err := tikv.CheckPDVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinPDVersion, localMaxPDVersion); err != nil { return err } - if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil { + if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinTiKVVersion, localMaxTiKVVersion); err != nil { return err } diff --git a/br/pkg/lightning/importer/BUILD.bazel b/br/pkg/lightning/importer/BUILD.bazel index a49a1db60caf1..a7f6a63212b7d 100644 --- a/br/pkg/lightning/importer/BUILD.bazel +++ b/br/pkg/lightning/importer/BUILD.bazel @@ -164,6 +164,8 @@ go_test( "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_pd_client//:client", "@com_github_xitongsys_parquet_go//writer", "@com_github_xitongsys_parquet_go_source//buffer", "@io_etcd_go_etcd_client_v3//:client", diff --git a/br/pkg/lightning/importer/checksum_helper.go b/br/pkg/lightning/importer/checksum_helper.go index 88bc40d5a72e1..124bddb92b57a 100644 --- a/br/pkg/lightning/importer/checksum_helper.go +++ b/br/pkg/lightning/importer/checksum_helper.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/kv" - pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -37,8 +36,7 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) ( return nil, nil } - pdAddr := rc.cfg.TiDB.PdAddr - pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, pdAddr) + pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, rc.pdCli.GetLeaderAddr()) if err != nil { return nil, errors.Trace(err) } @@ -46,12 +44,6 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) ( // for v4.0.0 or upper, we can use the gc ttl api var manager local.ChecksumManager if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL { - tlsOpt := rc.tls.ToPDSecurityOption() - pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt) - if err != nil { - return nil, errors.Trace(err) - } - backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db) // only set backoff weight when it's smaller than default value if err == nil && backoffWeight >= local.DefaultBackoffWeight { @@ -60,7 +52,7 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) ( log.FromContext(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", local.DefaultBackoffWeight)) backoffWeight = local.DefaultBackoffWeight } - manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight) + manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight) } else { manager = local.NewTiDBChecksumExecutor(rc.db) } diff --git a/br/pkg/lightning/importer/get_pre_info.go b/br/pkg/lightning/importer/get_pre_info.go index 8ad3937faa19a..191fed628bdf6 100644 --- a/br/pkg/lightning/importer/get_pre_info.go +++ b/br/pkg/lightning/importer/get_pre_info.go @@ -50,6 +50,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/mock" + pd "github.com/tikv/pd/client" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -123,12 +124,14 @@ type TargetInfoGetterImpl struct { db *sql.DB tls *common.TLS backend backend.TargetInfoGetter + pdCli pd.Client } // NewTargetInfoGetterImpl creates a TargetInfoGetterImpl object. func NewTargetInfoGetterImpl( cfg *config.Config, targetDB *sql.DB, + pdCli pd.Client, ) (*TargetInfoGetterImpl, error) { tls, err := cfg.ToTLS() if err != nil { @@ -139,7 +142,10 @@ func NewTargetInfoGetterImpl( case config.BackendTiDB: backendTargetInfoGetter = tidb.NewTargetInfoGetter(targetDB) case config.BackendLocal: - backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, cfg.TiDB.PdAddr) + if pdCli == nil { + return nil, common.ErrUnknown.GenWithStack("pd client is required when using local backend") + } + backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, pdCli) default: return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend) } @@ -148,6 +154,7 @@ func NewTargetInfoGetterImpl( tls: tls, db: targetDB, backend: backendTargetInfoGetter, + pdCli: pdCli, }, nil } @@ -229,7 +236,7 @@ func (g *TargetInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Contex // It uses the PD interface through TLS to get the information. func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtypes.ReplicationConfig, error) { result := new(pdtypes.ReplicationConfig) - if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdReplicate, &result); err != nil { + if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdReplicate, &result); err != nil { return nil, errors.Trace(err) } return result, nil @@ -240,7 +247,7 @@ func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtyp // It uses the PD interface through TLS to get the information. func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.StoresInfo, error) { result := new(pdtypes.StoresInfo) - if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result); err != nil { + if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdStores, result); err != nil { return nil, errors.Trace(err) } return result, nil @@ -251,7 +258,7 @@ func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.Sto // It uses the PD interface through TLS to get the information. func (g *TargetInfoGetterImpl) GetEmptyRegionsInfo(ctx context.Context) (*pdtypes.RegionsInfo, error) { result := new(pdtypes.RegionsInfo) - if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdEmptyRegions, &result); err != nil { + if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdEmptyRegions, &result); err != nil { return nil, errors.Trace(err) } return result, nil diff --git a/br/pkg/lightning/importer/get_pre_info_test.go b/br/pkg/lightning/importer/get_pre_info_test.go index 66480654cdfd8..7fda215beaa85 100644 --- a/br/pkg/lightning/importer/get_pre_info_test.go +++ b/br/pkg/lightning/importer/get_pre_info_test.go @@ -757,7 +757,10 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) { require.NoError(t, err) lnConfig := config.NewConfig() lnConfig.TikvImporter.Backend = config.BackendLocal - targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db) + _, err = NewTargetInfoGetterImpl(lnConfig, db, nil) + require.ErrorContains(t, err, "pd client is required when using local backend") + lnConfig.TikvImporter.Backend = config.BackendTiDB + targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db, nil) require.NoError(t, err) require.Equal(t, lnConfig, targetGetter.cfg) diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 5e5a44f0b8b1d..a1e26abd209ab 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -203,6 +203,7 @@ type Controller struct { engineMgr backend.EngineManager backend backend.Backend db *sql.DB + pdCli pd.Client alterTableLock sync.Mutex sysVars map[string]string @@ -332,6 +333,7 @@ func NewImportControllerWithPauser( var encodingBuilder encode.EncodingBuilder var backendObj backend.Backend + var pdCli pd.Client switch cfg.TikvImporter.Backend { case config.BackendTiDB: encodingBuilder = tidb.NewEncodingBuilder() @@ -347,9 +349,13 @@ func NewImportControllerWithPauser( if maxOpenFiles < 0 { maxOpenFiles = math.MaxInt32 } + pdCli, err = pd.NewClientWithContext(ctx, []string{cfg.TiDB.PdAddr}, tls.ToPDSecurityOption()) + if err != nil { + return nil, errors.Trace(err) + } if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone { - if err := tikv.CheckTiKVVersion(ctx, tls, cfg.TiDB.PdAddr, minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil { + if err := tikv.CheckTiKVVersion(ctx, tls, pdCli.GetLeaderAddr(), minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil { if berrors.Is(err, berrors.ErrVersionMismatch) { log.FromContext(ctx).Warn("TiKV version doesn't support duplicate resolution. The resolution algorithm will fall back to 'none'", zap.Error(err)) cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone @@ -399,7 +405,7 @@ func NewImportControllerWithPauser( var wrapper backend.TargetInfoGetter if cfg.TikvImporter.Backend == config.BackendLocal { - wrapper = local.NewTargetInfoGetter(tls, db, cfg.TiDB.PdAddr) + wrapper = local.NewTargetInfoGetter(tls, db, pdCli) } else { wrapper = tidb.NewTargetInfoGetter(db) } @@ -409,6 +415,7 @@ func NewImportControllerWithPauser( db: db, tls: tls, backend: wrapper, + pdCli: pdCli, } preInfoGetter, err := NewPreImportInfoGetter( cfg, @@ -438,6 +445,7 @@ func NewImportControllerWithPauser( pauser: p.Pauser, engineMgr: backend.MakeEngineManager(backendObj), backend: backendObj, + pdCli: pdCli, db: db, sysVars: common.DefaultImportantVariables, tls: tls, @@ -473,6 +481,9 @@ func NewImportControllerWithPauser( func (rc *Controller) Close() { rc.backend.Close() _ = rc.db.Close() + if rc.pdCli != nil { + rc.pdCli.Close() + } } // Run starts the restore task. @@ -1925,7 +1936,7 @@ func (rc *Controller) fullCompact(ctx context.Context) error { } func (rc *Controller) doCompact(ctx context.Context, level int32) error { - tls := rc.tls.WithHost(rc.cfg.TiDB.PdAddr) + tls := rc.tls.WithHost(rc.pdCli.GetLeaderAddr()) return tikv.ForAllStores( ctx, tls, diff --git a/br/pkg/lightning/importer/precheck.go b/br/pkg/lightning/importer/precheck.go index 1658229321edb..735e17f163ca2 100644 --- a/br/pkg/lightning/importer/precheck.go +++ b/br/pkg/lightning/importer/precheck.go @@ -9,6 +9,7 @@ import ( ropts "github.com/pingcap/tidb/br/pkg/lightning/importer/opts" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/precheck" + pd "github.com/tikv/pd/client" ) type precheckContextKey string @@ -29,7 +30,8 @@ type PrecheckItemBuilder struct { } // NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config -func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) { +// pdCli **must not** be nil for local backend +func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) { var gerr error builderCfg := new(ropts.PrecheckItemBuilderConfig) for _, o := range opts { @@ -39,7 +41,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, o if err != nil { return nil, errors.Trace(err) } - targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB) + targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB, pdCli) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/importer/table_import_test.go b/br/pkg/lightning/importer/table_import_test.go index 01ccf311f0e65..aa669560c6cea 100644 --- a/br/pkg/lightning/importer/table_import_test.go +++ b/br/pkg/lightning/importer/table_import_test.go @@ -70,6 +70,8 @@ import ( filter "github.com/pingcap/tidb/util/table-filter" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/testutils" + pd "github.com/tikv/pd/client" ) const ( @@ -1162,6 +1164,8 @@ func (s *tableRestoreSuite) TestCheckClusterResource() { require.NoError(s.T(), err) mockStore, err := storage.NewLocalStorage(dir) require.NoError(s.T(), err) + _, _, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(s.T(), err) for _, ca := range cases { server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { var err error @@ -1178,9 +1182,11 @@ func (s *tableRestoreSuite) TestCheckClusterResource() { url := strings.TrimPrefix(server.URL, "https://") cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}} + pdCli := &mockPDClient{Client: pdClient, leaderAddr: url} targetInfoGetter := &TargetInfoGetterImpl{ - cfg: cfg, - tls: tls, + cfg: cfg, + tls: tls, + pdCli: pdCli, } preInfoGetter := &PreImportInfoGetterImpl{ cfg: cfg, @@ -1195,6 +1201,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() { checkTemplate: template, preInfoGetter: preInfoGetter, precheckItemBuilder: theCheckBuilder, + pdCli: pdCli, } var sourceSize int64 err = rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { @@ -1231,6 +1238,15 @@ func (mockTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(ta return err } +type mockPDClient struct { + pd.Client + leaderAddr string +} + +func (m *mockPDClient) GetLeaderAddr() string { + return m.leaderAddr +} + func (s *tableRestoreSuite) TestCheckClusterRegion() { type testCase struct { stores pdtypes.StoresInfo @@ -1246,6 +1262,8 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() { } return regions } + _, _, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(s.T(), err) testCases := []testCase{ { @@ -1321,10 +1339,12 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() { url := strings.TrimPrefix(server.URL, "https://") cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}} + pdCli := &mockPDClient{Client: pdClient, leaderAddr: url} targetInfoGetter := &TargetInfoGetterImpl{ - cfg: cfg, - tls: tls, + cfg: cfg, + tls: tls, + pdCli: pdCli, } dbMetas := []*mydump.MDDatabaseMeta{} preInfoGetter := &PreImportInfoGetterImpl{ @@ -1341,6 +1361,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() { preInfoGetter: preInfoGetter, dbInfos: make(map[string]*checkpoints.TidbDBInfo), precheckItemBuilder: theCheckBuilder, + pdCli: pdCli, } preInfoGetter.dbInfosCache = rc.dbInfos diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index 42d850a1df8a0..637be6e9905fa 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -52,7 +52,6 @@ func prepareSortDir(e *LoadDataController, jobID int64) (string, error) { tidbCfg := tidb.GetGlobalConfig() sortPathSuffix := "import-" + strconv.Itoa(int(tidbCfg.Port)) sortPath := filepath.Join(tidbCfg.TempDir, sortPathSuffix, strconv.FormatInt(jobID, 10)) - if info, err := os.Stat(sortPath); err != nil { if !os.IsNotExist(err) { e.logger.Error("stat sort dir failed", zap.String("path", sortPath), zap.Error(err))