Skip to content

Commit

Permalink
lightning: fix pd http request using old address (#45680) (#45726)
Browse files Browse the repository at this point in the history
close #43436
  • Loading branch information
ti-chi-bot authored Oct 17, 2023
1 parent b003558 commit acdac74
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 30 deletions.
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,15 @@ func (*encodingBuilder) MakeEmptyRows() encode.Rows {
type targetInfoGetter struct {
tls *common.TLS
targetDB *sql.DB
pdAddr string
pdCli pd.Client
}

// NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdAddr string) backend.TargetInfoGetter {
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.TargetInfoGetter {
return &targetInfoGetter{
tls: tls,
targetDB: db,
pdAddr: pdAddr,
pdCli: pdCli,
}
}

Expand All @@ -296,10 +296,10 @@ func (g *targetInfoGetter) CheckRequirements(ctx context.Context, checkCtx *back
if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
return err
}
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil {
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinPDVersion, localMaxPDVersion); err != nil {
return err
}
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinTiKVVersion, localMaxTiKVVersion); err != nil {
return err
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_pd_client//:client",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
12 changes: 2 additions & 10 deletions br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -37,21 +36,14 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
return nil, nil
}

pdAddr := rc.cfg.TiDB.PdAddr
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, pdAddr)
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, rc.pdCli.GetLeaderAddr())
if err != nil {
return nil, errors.Trace(err)
}

// for v4.0.0 or upper, we can use the gc ttl api
var manager local.ChecksumManager
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}

backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db)
// only set backoff weight when it's smaller than default value
if err == nil && backoffWeight >= local.DefaultBackoffWeight {
Expand All @@ -60,7 +52,7 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
log.FromContext(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", local.DefaultBackoffWeight))
backoffWeight = local.DefaultBackoffWeight
}
manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight)
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight)
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/lightning/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/mock"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -123,12 +124,14 @@ type TargetInfoGetterImpl struct {
db *sql.DB
tls *common.TLS
backend backend.TargetInfoGetter
pdCli pd.Client
}

// NewTargetInfoGetterImpl creates a TargetInfoGetterImpl object.
func NewTargetInfoGetterImpl(
cfg *config.Config,
targetDB *sql.DB,
pdCli pd.Client,
) (*TargetInfoGetterImpl, error) {
tls, err := cfg.ToTLS()
if err != nil {
Expand All @@ -139,7 +142,10 @@ func NewTargetInfoGetterImpl(
case config.BackendTiDB:
backendTargetInfoGetter = tidb.NewTargetInfoGetter(targetDB)
case config.BackendLocal:
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, cfg.TiDB.PdAddr)
if pdCli == nil {
return nil, common.ErrUnknown.GenWithStack("pd client is required when using local backend")
}
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, pdCli)
default:
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
}
Expand All @@ -148,6 +154,7 @@ func NewTargetInfoGetterImpl(
tls: tls,
db: targetDB,
backend: backendTargetInfoGetter,
pdCli: pdCli,
}, nil
}

Expand Down Expand Up @@ -229,7 +236,7 @@ func (g *TargetInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Contex
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtypes.ReplicationConfig, error) {
result := new(pdtypes.ReplicationConfig)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdReplicate, &result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdReplicate, &result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand All @@ -240,7 +247,7 @@ func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtyp
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.StoresInfo, error) {
result := new(pdtypes.StoresInfo)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdStores, result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand All @@ -251,7 +258,7 @@ func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.Sto
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetEmptyRegionsInfo(ctx context.Context) (*pdtypes.RegionsInfo, error) {
result := new(pdtypes.RegionsInfo)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/lightning/importer/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,10 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NoError(t, err)
lnConfig := config.NewConfig()
lnConfig.TikvImporter.Backend = config.BackendLocal
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db)
_, err = NewTargetInfoGetterImpl(lnConfig, db, nil)
require.ErrorContains(t, err, "pd client is required when using local backend")
lnConfig.TikvImporter.Backend = config.BackendTiDB
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db, nil)
require.NoError(t, err)
require.Equal(t, lnConfig, targetGetter.cfg)

Expand Down
17 changes: 14 additions & 3 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type Controller struct {
engineMgr backend.EngineManager
backend backend.Backend
db *sql.DB
pdCli pd.Client

alterTableLock sync.Mutex
sysVars map[string]string
Expand Down Expand Up @@ -332,6 +333,7 @@ func NewImportControllerWithPauser(

var encodingBuilder encode.EncodingBuilder
var backendObj backend.Backend
var pdCli pd.Client
switch cfg.TikvImporter.Backend {
case config.BackendTiDB:
encodingBuilder = tidb.NewEncodingBuilder()
Expand All @@ -347,9 +349,13 @@ func NewImportControllerWithPauser(
if maxOpenFiles < 0 {
maxOpenFiles = math.MaxInt32
}
pdCli, err = pd.NewClientWithContext(ctx, []string{cfg.TiDB.PdAddr}, tls.ToPDSecurityOption())
if err != nil {
return nil, errors.Trace(err)
}

if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
if err := tikv.CheckTiKVVersion(ctx, tls, cfg.TiDB.PdAddr, minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
if err := tikv.CheckTiKVVersion(ctx, tls, pdCli.GetLeaderAddr(), minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
if berrors.Is(err, berrors.ErrVersionMismatch) {
log.FromContext(ctx).Warn("TiKV version doesn't support duplicate resolution. The resolution algorithm will fall back to 'none'", zap.Error(err))
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
Expand Down Expand Up @@ -399,7 +405,7 @@ func NewImportControllerWithPauser(

var wrapper backend.TargetInfoGetter
if cfg.TikvImporter.Backend == config.BackendLocal {
wrapper = local.NewTargetInfoGetter(tls, db, cfg.TiDB.PdAddr)
wrapper = local.NewTargetInfoGetter(tls, db, pdCli)
} else {
wrapper = tidb.NewTargetInfoGetter(db)
}
Expand All @@ -409,6 +415,7 @@ func NewImportControllerWithPauser(
db: db,
tls: tls,
backend: wrapper,
pdCli: pdCli,
}
preInfoGetter, err := NewPreImportInfoGetter(
cfg,
Expand Down Expand Up @@ -438,6 +445,7 @@ func NewImportControllerWithPauser(
pauser: p.Pauser,
engineMgr: backend.MakeEngineManager(backendObj),
backend: backendObj,
pdCli: pdCli,
db: db,
sysVars: common.DefaultImportantVariables,
tls: tls,
Expand Down Expand Up @@ -473,6 +481,9 @@ func NewImportControllerWithPauser(
func (rc *Controller) Close() {
rc.backend.Close()
_ = rc.db.Close()
if rc.pdCli != nil {
rc.pdCli.Close()
}
}

// Run starts the restore task.
Expand Down Expand Up @@ -1925,7 +1936,7 @@ func (rc *Controller) fullCompact(ctx context.Context) error {
}

func (rc *Controller) doCompact(ctx context.Context, level int32) error {
tls := rc.tls.WithHost(rc.cfg.TiDB.PdAddr)
tls := rc.tls.WithHost(rc.pdCli.GetLeaderAddr())
return tikv.ForAllStores(
ctx,
tls,
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
ropts "github.com/pingcap/tidb/br/pkg/lightning/importer/opts"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/precheck"
pd "github.com/tikv/pd/client"
)

type precheckContextKey string
Expand All @@ -29,7 +30,8 @@ type PrecheckItemBuilder struct {
}

// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
// pdCli **must not** be nil for local backend
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
var gerr error
builderCfg := new(ropts.PrecheckItemBuilderConfig)
for _, o := range opts {
Expand All @@ -39,7 +41,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, o
if err != nil {
return nil, errors.Trace(err)
}
targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB)
targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB, pdCli)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
29 changes: 25 additions & 4 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ import (
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/testutils"
pd "github.com/tikv/pd/client"
)

const (
Expand Down Expand Up @@ -1162,6 +1164,8 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
require.NoError(s.T(), err)
mockStore, err := storage.NewLocalStorage(dir)
require.NoError(s.T(), err)
_, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(s.T(), err)
for _, ca := range cases {
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
var err error
Expand All @@ -1178,9 +1182,11 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {

url := strings.TrimPrefix(server.URL, "https://")
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
pdCli := &mockPDClient{Client: pdClient, leaderAddr: url}
targetInfoGetter := &TargetInfoGetterImpl{
cfg: cfg,
tls: tls,
cfg: cfg,
tls: tls,
pdCli: pdCli,
}
preInfoGetter := &PreImportInfoGetterImpl{
cfg: cfg,
Expand All @@ -1195,6 +1201,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
checkTemplate: template,
preInfoGetter: preInfoGetter,
precheckItemBuilder: theCheckBuilder,
pdCli: pdCli,
}
var sourceSize int64
err = rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
Expand Down Expand Up @@ -1231,6 +1238,15 @@ func (mockTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(ta
return err
}

type mockPDClient struct {
pd.Client
leaderAddr string
}

func (m *mockPDClient) GetLeaderAddr() string {
return m.leaderAddr
}

func (s *tableRestoreSuite) TestCheckClusterRegion() {
type testCase struct {
stores pdtypes.StoresInfo
Expand All @@ -1246,6 +1262,8 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
}
return regions
}
_, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(s.T(), err)

testCases := []testCase{
{
Expand Down Expand Up @@ -1321,10 +1339,12 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {

url := strings.TrimPrefix(server.URL, "https://")
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
pdCli := &mockPDClient{Client: pdClient, leaderAddr: url}

targetInfoGetter := &TargetInfoGetterImpl{
cfg: cfg,
tls: tls,
cfg: cfg,
tls: tls,
pdCli: pdCli,
}
dbMetas := []*mydump.MDDatabaseMeta{}
preInfoGetter := &PreImportInfoGetterImpl{
Expand All @@ -1341,6 +1361,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
preInfoGetter: preInfoGetter,
dbInfos: make(map[string]*checkpoints.TidbDBInfo),
precheckItemBuilder: theCheckBuilder,
pdCli: pdCli,
}

preInfoGetter.dbInfosCache = rc.dbInfos
Expand Down
1 change: 0 additions & 1 deletion executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func prepareSortDir(e *LoadDataController, jobID int64) (string, error) {
tidbCfg := tidb.GetGlobalConfig()
sortPathSuffix := "import-" + strconv.Itoa(int(tidbCfg.Port))
sortPath := filepath.Join(tidbCfg.TempDir, sortPathSuffix, strconv.FormatInt(jobID, 10))

if info, err := os.Stat(sortPath); err != nil {
if !os.IsNotExist(err) {
e.logger.Error("stat sort dir failed", zap.String("path", sortPath), zap.Error(err))
Expand Down

0 comments on commit acdac74

Please sign in to comment.