Skip to content

Commit

Permalink
ddl: use latest PD address to register lightning (#48687)
Browse files Browse the repository at this point in the history
close #48680
  • Loading branch information
lance6716 authored Nov 18, 2023
1 parent 36cb29f commit 6260e66
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 12 deletions.
4 changes: 3 additions & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -142,7 +143,8 @@ func (s *backfillDistScheduler) Init(ctx context.Context) error {
if idx == nil {
return errors.Trace(errors.Errorf("index info not found: %d", bgm.EleIDs[0]))
}
bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName)
pdLeaderAddr := d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
if err != nil {
return errors.Trace(err)
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,10 +770,15 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt
if err != nil {
return model.ReorgTypeNone, err
}
var pdLeaderAddr string
if d != nil {
//nolint:forcetypeassert
pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
}
if variable.EnableDistTask.Load() {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
} else {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, job.ReorgMeta.ResourceGroupName)
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
}
if err != nil {
return model.ReorgTypeNone, err
Expand Down Expand Up @@ -964,7 +969,12 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
return true, 0, nil
}
ctx := logutil.WithCategory(w.ctx, "ddl-ingest")
bc, err = ingest.LitBackCtxMgr.Register(ctx, allIndexInfos[0].Unique, job.ID, nil, job.ReorgMeta.ResourceGroupName)
var pdLeaderAddr string
if d != nil {
//nolint:forcetypeassert
pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
}
bc, err = ingest.LitBackCtxMgr.Register(ctx, allIndexInfos[0].Unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), allIndexInfos, err)
return false, ver, errors.Trace(err)
Expand Down Expand Up @@ -1976,7 +1986,9 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
if err != nil {
return err
}
return checkDuplicateForUniqueIndex(w.ctx, t, reorgInfo)
//nolint:forcetypeassert
pdLeaderAddr := w.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
return checkDuplicateForUniqueIndex(w.ctx, t, reorgInfo, pdLeaderAddr)
}
}

Expand Down Expand Up @@ -2013,7 +2025,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
return errors.Trace(err)
}

func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo) error {
func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, pdAddr string) error {
var bc ingest.BackendCtx
var err error
defer func() {
Expand All @@ -2029,7 +2041,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
if indexInfo.Unique {
ctx := logutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, reorgInfo.ReorgMeta.ResourceGroupName)
bc, err = ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, reorgInfo.ID, nil, pdAddr, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// BackendCtxMgr is used to manage the backend context.
type BackendCtxMgr interface {
CheckAvailable() (bool, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error)
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, resourceGroupName string) (BackendCtx, error) {
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if !exist {
m.memRoot.RefreshConsumption()
Expand All @@ -92,6 +92,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
cfg.Lightning.TiDB.PdAddr = pdAddr
bd, err := createLocalBackend(ctx, cfg, resourceGroupName)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func genConfig(ctx context.Context, memRoot MemRoot, jobID int64, unique bool) (
} else {
cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgNone
}
cfg.TiDB.PdAddr = tidbCfg.Path
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort)
// Set TLS related information
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
}

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string) (BackendCtx, error) {
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client, _ string, _ string) (BackendCtx, error) {
logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest4/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

// Mock there is a running ingest job.
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, "")
_, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil, realtikvtest.PDAddr, "")
require.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(2)
Expand Down
3 changes: 3 additions & 0 deletions tests/realtikvtest/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ var (
// TiKVPath is the path of the TiKV Storage.
TiKVPath = flag.String("tikv-path", "tikv://127.0.0.1:2379?disableGC=true", "TiKV addr")

// PDAddr is the address of PD.
PDAddr = "127.0.0.1:2379"

// KeyspaceName is an option to specify the name of keyspace that the tests run on,
// this option is only valid while the flag WithRealTiKV is set.
KeyspaceName = flag.String("keyspace-name", "", "the name of keyspace that the tests run on")
Expand Down

0 comments on commit 6260e66

Please sign in to comment.