From 64ec72195cd8a74203dd2c39173088a09eb24e76 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 21 May 2024 12:49:23 +0800 Subject: [PATCH] lightning: skip CREATE DATABASE when downstream exists, and return error when parse failed (#51801) (#53070) close pingcap/tidb#51800 --- .gitignore | 1 + br/pkg/lightning/backend/backend.go | 4 ++ br/pkg/lightning/backend/local/local.go | 5 ++ br/pkg/lightning/backend/tidb/tidb.go | 32 +++++++++++ br/pkg/lightning/importer/get_pre_info.go | 13 +++++ br/pkg/lightning/importer/import.go | 57 ++++++++++++++----- br/pkg/lightning/importer/mock/mock.go | 9 +++ .../lightning/importer/restore_schema_test.go | 4 ++ br/pkg/mock/backend.go | 15 +++++ br/tests/lightning_character_sets/run.sh | 2 + br/tests/run.sh | 1 + 11 files changed, 128 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index f12c5532ca8b6..9c4a07aa04bbf 100644 --- a/.gitignore +++ b/.gitignore @@ -35,5 +35,6 @@ bazel-bin bazel-out bazel-testlogs bazel-tidb +MODULE.bazel.lock .ijwb/ /oom_record/ diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index 026dda56e8744..57752403194d2 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -124,6 +124,10 @@ type CheckCtx struct { // TargetInfoGetter defines the interfaces to get target information. type TargetInfoGetter interface { + // FetchRemoteDBModels obtains the models of all databases. Currently, only + // the database name is filled. + FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) + // FetchRemoteTableModels obtains the models of all tables given the schema // name. The returned table info does not need to be precise if the encoder, // is not requiring them, but must at least fill in the following fields for diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index cca73bb4a806a..9d901f4455b74 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -276,6 +276,11 @@ func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.T } } +// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface. +func (g *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return tikv.FetchRemoteDBModelsFromTLS(ctx, g.tls) +} + // FetchRemoteTableModels obtains the models of all tables given the schema name. // It implements the `TargetInfoGetter` interface. func (g *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 7636e960e498e..741dccb92c725 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -138,6 +138,38 @@ func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter { } } +// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface. +func (b *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + results := []*model.DBInfo{} + logger := log.FromContext(ctx) + s := common.SQLWithRetry{ + DB: b.db, + Logger: logger, + } + err := s.Transact(ctx, "fetch db models", func(_ context.Context, tx *sql.Tx) error { + results = results[:0] + + rows, e := tx.Query("SHOW DATABASES") + if e != nil { + return e + } + defer rows.Close() + + for rows.Next() { + var dbName string + if e := rows.Scan(&dbName); e != nil { + return e + } + dbInfo := &model.DBInfo{ + Name: model.NewCIStr(dbName), + } + results = append(results, dbInfo) + } + return rows.Err() + }) + return results, err +} + // FetchRemoteTableModels obtains the models of all tables given the schema name. // It implements the `backend.TargetInfoGetter` interface. // TODO: refactor diff --git a/br/pkg/lightning/importer/get_pre_info.go b/br/pkg/lightning/importer/get_pre_info.go index d36bd39b4f937..c19acf96b5961 100644 --- a/br/pkg/lightning/importer/get_pre_info.go +++ b/br/pkg/lightning/importer/get_pre_info.go @@ -91,6 +91,8 @@ type PreImportInfoGetter interface { // TargetInfoGetter defines the operations to get information from target. type TargetInfoGetter interface { + // FetchRemoteDBModels fetches the database structures from the remote target. + FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) // FetchRemoteTableModels fetches the table structures from the remote target. FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) // CheckVersionRequirements performs the check whether the target satisfies the version requirements. @@ -158,6 +160,11 @@ func NewTargetInfoGetterImpl( }, nil } +// FetchRemoteDBModels implements TargetInfoGetter. +func (g *TargetInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return g.backend.FetchRemoteDBModels(ctx) +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the TargetInfoGetter interface. func (g *TargetInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { @@ -800,6 +807,12 @@ func (p *PreImportInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName s return p.targetInfoGetter.IsTableEmpty(ctx, schemaName, tableName) } +// FetchRemoteDBModels fetches the database structures from the remote target. +// It implements the PreImportInfoGetter interface. +func (p *PreImportInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return p.targetInfoGetter.FetchRemoteDBModels(ctx) +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the PreImportInfoGetter interface. func (p *PreImportInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 7522195c933cb..8a9fd085aeb80 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -617,29 +617,47 @@ type restoreSchemaWorker struct { func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error { stmts, err := createIfNotExistsStmt(worker.parser, sqlStr, job.dbName, job.tblName) if err != nil { - worker.logger.Warn("failed to rewrite statement, will use raw input instead", - zap.String("db", job.dbName), - zap.String("table", job.tblName), - zap.String("statement", sqlStr), - zap.Error(err)) - job.stmts = []string{sqlStr} - } else { - job.stmts = stmts + return errors.Trace(err) } + job.stmts = stmts return worker.appendJob(job) } func (worker *restoreSchemaWorker) makeJobs( dbMetas []*mydump.MDDatabaseMeta, + getDBs func(context.Context) ([]*model.DBInfo, error), getTables func(context.Context, string) ([]*model.TableInfo, error), ) error { defer func() { close(worker.jobCh) worker.quit() }() - var err error + + if len(dbMetas) == 0 { + return nil + } + // 1. restore databases, execute statements concurrency + + dbs, err := getDBs(worker.ctx) + if err != nil { + worker.logger.Warn("get databases from downstream failed", zap.Error(err)) + } + dbSet := make(set.StringSet, len(dbs)) + for _, db := range dbs { + dbSet.Insert(db.Name.L) + } + for _, dbMeta := range dbMetas { + // if downstream already has this database, we can skip ddl job + if dbSet.Exist(strings.ToLower(dbMeta.Name)) { + worker.logger.Info( + "database already exists in downstream, skip processing the source file", + zap.String("db", dbMeta.Name), + ) + continue + } + sql := dbMeta.GetSchema(worker.ctx, worker.store) err = worker.addJob(sql, &schemaJob{ dbName: dbMeta.Name, @@ -654,18 +672,28 @@ func (worker *restoreSchemaWorker) makeJobs( if err != nil { return err } + // 2. restore tables, execute statements concurrency + for _, dbMeta := range dbMetas { // we can ignore error here, and let check failed later if schema not match - tables, _ := getTables(worker.ctx, dbMeta.Name) - tableMap := make(map[string]struct{}) + tables, err := getTables(worker.ctx, dbMeta.Name) + if err != nil { + worker.logger.Warn("get tables from downstream failed", zap.Error(err)) + } + tableSet := make(set.StringSet, len(tables)) for _, t := range tables { - tableMap[t.Name.L] = struct{}{} + tableSet.Insert(t.Name.L) } for _, tblMeta := range dbMeta.Tables { - if _, ok := tableMap[strings.ToLower(tblMeta.Name)]; ok { + if tableSet.Exist(strings.ToLower(tblMeta.Name)) { // we already has this table in TiDB. // we should skip ddl job and let SchemaValid check. + worker.logger.Info( + "table already exists in downstream, skip processing the source file", + zap.String("db", dbMeta.Name), + zap.String("table", tblMeta.Name), + ) continue } else if tblMeta.SchemaFile.FileMeta.Path == "" { return common.ErrSchemaNotExists.GenWithStackByArgs(dbMeta.Name, tblMeta.Name) @@ -740,7 +768,6 @@ loop: var err error if session == nil { session, err = func() (*sql.Conn, error) { - // TODO: support lightning in SQL return worker.db.Conn(worker.ctx) }() if err != nil { @@ -863,7 +890,7 @@ func (rc *Controller) restoreSchema(ctx context.Context) error { for i := 0; i < concurrency; i++ { go worker.doJob() } - err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteTableModels) + err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteDBModels, rc.preInfoGetter.FetchRemoteTableModels) logTask.End(zap.ErrorLevel, err) if err != nil { return err diff --git a/br/pkg/lightning/importer/mock/mock.go b/br/pkg/lightning/importer/mock/mock.go index f24fb073b1208..1351b52381e1f 100644 --- a/br/pkg/lightning/importer/mock/mock.go +++ b/br/pkg/lightning/importer/mock/mock.go @@ -212,6 +212,15 @@ func (t *TargetInfo) SetTableInfo(schemaName string, tableName string, tblInfo * t.dbTblInfoMap[schemaName][tableName] = tblInfo } +// FetchRemoteDBModels implements the TargetInfoGetter interface. +func (t *TargetInfo) FetchRemoteDBModels(_ context.Context) ([]*model.DBInfo, error) { + resultInfos := []*model.DBInfo{} + for dbName := range t.dbTblInfoMap { + resultInfos = append(resultInfos, &model.DBInfo{Name: model.NewCIStr(dbName)}) + } + return resultInfos, nil +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the TargetInfoGetter interface. func (t *TargetInfo) FetchRemoteTableModels(_ context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/importer/restore_schema_test.go b/br/pkg/lightning/importer/restore_schema_test.go index f6dfbc16ecc6d..9d45ff6417ef7 100644 --- a/br/pkg/lightning/importer/restore_schema_test.go +++ b/br/pkg/lightning/importer/restore_schema_test.go @@ -136,6 +136,10 @@ func (s *restoreSchemaSuite) SetupTest() { s.controller, s.ctx = gomock.WithContext(context.Background(), s.T()) mockTargetInfoGetter := mock.NewMockTargetInfoGetter(s.controller) mockBackend := mock.NewMockBackend(s.controller) + mockTargetInfoGetter.EXPECT(). + FetchRemoteDBModels(gomock.Any()). + AnyTimes(). + Return([]*model.DBInfo{{Name: model.NewCIStr("fakedb")}}, nil) mockTargetInfoGetter.EXPECT(). FetchRemoteTableModels(gomock.Any(), gomock.Any()). AnyTimes(). diff --git a/br/pkg/mock/backend.go b/br/pkg/mock/backend.go index cb1d1a484229e..cc69e8e84738b 100644 --- a/br/pkg/mock/backend.go +++ b/br/pkg/mock/backend.go @@ -295,6 +295,21 @@ func (mr *MockTargetInfoGetterMockRecorder) CheckRequirements(arg0, arg1 interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckRequirements", reflect.TypeOf((*MockTargetInfoGetter)(nil).CheckRequirements), arg0, arg1) } +// FetchRemoteDBModels mocks base method. +func (m *MockTargetInfoGetter) FetchRemoteDBModels(arg0 context.Context) ([]*model.DBInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchRemoteDBModels", arg0) + ret0, _ := ret[0].([]*model.DBInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchRemoteDBModels indicates an expected call of FetchRemoteDBModels. +func (mr *MockTargetInfoGetterMockRecorder) FetchRemoteDBModels(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRemoteDBModels", reflect.TypeOf((*MockTargetInfoGetter)(nil).FetchRemoteDBModels), arg0) +} + // FetchRemoteTableModels mocks base method. func (m *MockTargetInfoGetter) FetchRemoteTableModels(arg0 context.Context, arg1 string) ([]*model.TableInfo, error) { m.ctrl.T.Helper() diff --git a/br/tests/lightning_character_sets/run.sh b/br/tests/lightning_character_sets/run.sh index 3049c53502b22..88059c6ed2495 100755 --- a/br/tests/lightning_character_sets/run.sh +++ b/br/tests/lightning_character_sets/run.sh @@ -80,6 +80,8 @@ check_contains 's: 5291' # test about unsupported charset in UTF-8 encoding dump files # test local backend run_lightning --config "$CUR/greek.toml" -d "$CUR/greek" 2>&1 | grep -q "Unknown character set: 'greek'" +# check TiDB does not receive the DDL +check_not_contains "greek" $TEST_DIR/tidb.log run_sql 'DROP DATABASE IF EXISTS charsets;' run_sql 'CREATE DATABASE charsets;' run_sql 'CREATE TABLE charsets.greek (c VARCHAR(20) PRIMARY KEY);' diff --git a/br/tests/run.sh b/br/tests/run.sh index f3ff0c5dcbd33..a70cd28ffa6f1 100755 --- a/br/tests/run.sh +++ b/br/tests/run.sh @@ -19,6 +19,7 @@ CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) export PATH="$PATH:$CUR/../../bin:$CUR/../bin:$CUR/_utils" export TEST_DIR=/tmp/backup_restore_test export COV_DIR="/tmp/group_cover" +mkdir -p $COV_DIR || true source $CUR/_utils/run_services # Create COV_DIR if not exists