diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index 3cbae8a97cdb2..55d01247702cb 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -127,6 +127,10 @@ type CheckCtx struct { // TargetInfoGetter defines the interfaces to get target information. type TargetInfoGetter interface { + // FetchRemoteDBModels obtains the models of all databases. Currently, only + // the database name is filled. + FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) + // FetchRemoteTableModels obtains the models of all tables given the schema // name. The returned table info does not need to be precise if the encoder, // is not requiring them, but must at least fill in the following fields for diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 8f41ca6102244..dbcb70f315504 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -271,6 +271,11 @@ func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdHTTPCli pdhttp.Client) b } } +// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface. +func (g *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return tikv.FetchRemoteDBModelsFromTLS(ctx, g.tls) +} + // FetchRemoteTableModels obtains the models of all tables given the schema name. // It implements the `TargetInfoGetter` interface. func (g *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 67d648d0608a3..e26ebd42bac19 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -138,6 +138,38 @@ func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter { } } +// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface. +func (b *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + results := []*model.DBInfo{} + logger := log.FromContext(ctx) + s := common.SQLWithRetry{ + DB: b.db, + Logger: logger, + } + err := s.Transact(ctx, "fetch db models", func(_ context.Context, tx *sql.Tx) error { + results = results[:0] + + rows, e := tx.Query("SHOW DATABASES") + if e != nil { + return e + } + defer rows.Close() + + for rows.Next() { + var dbName string + if e := rows.Scan(&dbName); e != nil { + return e + } + dbInfo := &model.DBInfo{ + Name: model.NewCIStr(dbName), + } + results = append(results, dbInfo) + } + return rows.Err() + }) + return results, err +} + // FetchRemoteTableModels obtains the models of all tables given the schema name. // It implements the `backend.TargetInfoGetter` interface. // TODO: refactor diff --git a/br/pkg/lightning/importer/get_pre_info.go b/br/pkg/lightning/importer/get_pre_info.go index 114bd642b36a3..cf327da85bc23 100644 --- a/br/pkg/lightning/importer/get_pre_info.go +++ b/br/pkg/lightning/importer/get_pre_info.go @@ -90,6 +90,8 @@ type PreImportInfoGetter interface { // TargetInfoGetter defines the operations to get information from target. type TargetInfoGetter interface { + // FetchRemoteDBModels fetches the database structures from the remote target. + FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) // FetchRemoteTableModels fetches the table structures from the remote target. FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) // CheckVersionRequirements performs the check whether the target satisfies the version requirements. @@ -155,6 +157,11 @@ func NewTargetInfoGetterImpl( }, nil } +// FetchRemoteDBModels implements TargetInfoGetter. +func (g *TargetInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return g.backend.FetchRemoteDBModels(ctx) +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the TargetInfoGetter interface. func (g *TargetInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { @@ -787,6 +794,12 @@ func (p *PreImportInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName s return p.targetInfoGetter.IsTableEmpty(ctx, schemaName, tableName) } +// FetchRemoteDBModels fetches the database structures from the remote target. +// It implements the PreImportInfoGetter interface. +func (p *PreImportInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return p.targetInfoGetter.FetchRemoteDBModels(ctx) +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the PreImportInfoGetter interface. func (p *PreImportInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 57eb49e0e8d25..1cba2b3473735 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -625,29 +625,47 @@ type restoreSchemaWorker struct { func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error { stmts, err := createIfNotExistsStmt(worker.parser, sqlStr, job.dbName, job.tblName) if err != nil { - worker.logger.Warn("failed to rewrite statement, will use raw input instead", - zap.String("db", job.dbName), - zap.String("table", job.tblName), - zap.String("statement", sqlStr), - zap.Error(err)) - job.stmts = []string{sqlStr} - } else { - job.stmts = stmts + return errors.Trace(err) } + job.stmts = stmts return worker.appendJob(job) } func (worker *restoreSchemaWorker) makeJobs( dbMetas []*mydump.MDDatabaseMeta, + getDBs func(context.Context) ([]*model.DBInfo, error), getTables func(context.Context, string) ([]*model.TableInfo, error), ) error { defer func() { close(worker.jobCh) worker.quit() }() - var err error + + if len(dbMetas) == 0 { + return nil + } + // 1. restore databases, execute statements concurrency + + dbs, err := getDBs(worker.ctx) + if err != nil { + worker.logger.Warn("get databases from downstream failed", zap.Error(err)) + } + dbSet := make(set.StringSet, len(dbs)) + for _, db := range dbs { + dbSet.Insert(db.Name.L) + } + for _, dbMeta := range dbMetas { + // if downstream already has this database, we can skip ddl job + if dbSet.Exist(strings.ToLower(dbMeta.Name)) { + worker.logger.Info( + "database already exists in downstream, skip processing the source file", + zap.String("db", dbMeta.Name), + ) + continue + } + sql := dbMeta.GetSchema(worker.ctx, worker.store) err = worker.addJob(sql, &schemaJob{ dbName: dbMeta.Name, @@ -662,18 +680,28 @@ func (worker *restoreSchemaWorker) makeJobs( if err != nil { return err } + // 2. restore tables, execute statements concurrency + for _, dbMeta := range dbMetas { // we can ignore error here, and let check failed later if schema not match - tables, _ := getTables(worker.ctx, dbMeta.Name) - tableMap := make(map[string]struct{}) + tables, err := getTables(worker.ctx, dbMeta.Name) + if err != nil { + worker.logger.Warn("get tables from downstream failed", zap.Error(err)) + } + tableSet := make(set.StringSet, len(tables)) for _, t := range tables { - tableMap[t.Name.L] = struct{}{} + tableSet.Insert(t.Name.L) } for _, tblMeta := range dbMeta.Tables { - if _, ok := tableMap[strings.ToLower(tblMeta.Name)]; ok { + if tableSet.Exist(strings.ToLower(tblMeta.Name)) { // we already has this table in TiDB. // we should skip ddl job and let SchemaValid check. + worker.logger.Info( + "table already exists in downstream, skip processing the source file", + zap.String("db", dbMeta.Name), + zap.String("table", tblMeta.Name), + ) continue } else if tblMeta.SchemaFile.FileMeta.Path == "" { return common.ErrSchemaNotExists.GenWithStackByArgs(dbMeta.Name, tblMeta.Name) @@ -748,7 +776,6 @@ loop: var err error if session == nil { session, err = func() (*sql.Conn, error) { - // TODO: support lightning in SQL return worker.db.Conn(worker.ctx) }() if err != nil { @@ -871,7 +898,7 @@ func (rc *Controller) restoreSchema(ctx context.Context) error { for i := 0; i < concurrency; i++ { go worker.doJob() } - err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteTableModels) + err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteDBModels, rc.preInfoGetter.FetchRemoteTableModels) logTask.End(zap.ErrorLevel, err) if err != nil { return err diff --git a/br/pkg/lightning/importer/mock/mock.go b/br/pkg/lightning/importer/mock/mock.go index 27605efb2db50..024c252cdcf3e 100644 --- a/br/pkg/lightning/importer/mock/mock.go +++ b/br/pkg/lightning/importer/mock/mock.go @@ -212,6 +212,15 @@ func (t *TargetInfo) SetTableInfo(schemaName string, tableName string, tblInfo * t.dbTblInfoMap[schemaName][tableName] = tblInfo } +// FetchRemoteDBModels implements the TargetInfoGetter interface. +func (t *TargetInfo) FetchRemoteDBModels(_ context.Context) ([]*model.DBInfo, error) { + resultInfos := []*model.DBInfo{} + for dbName := range t.dbTblInfoMap { + resultInfos = append(resultInfos, &model.DBInfo{Name: model.NewCIStr(dbName)}) + } + return resultInfos, nil +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the TargetInfoGetter interface. func (t *TargetInfo) FetchRemoteTableModels(_ context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/importer/restore_schema_test.go b/br/pkg/lightning/importer/restore_schema_test.go index f6dfbc16ecc6d..9d45ff6417ef7 100644 --- a/br/pkg/lightning/importer/restore_schema_test.go +++ b/br/pkg/lightning/importer/restore_schema_test.go @@ -136,6 +136,10 @@ func (s *restoreSchemaSuite) SetupTest() { s.controller, s.ctx = gomock.WithContext(context.Background(), s.T()) mockTargetInfoGetter := mock.NewMockTargetInfoGetter(s.controller) mockBackend := mock.NewMockBackend(s.controller) + mockTargetInfoGetter.EXPECT(). + FetchRemoteDBModels(gomock.Any()). + AnyTimes(). + Return([]*model.DBInfo{{Name: model.NewCIStr("fakedb")}}, nil) mockTargetInfoGetter.EXPECT(). FetchRemoteTableModels(gomock.Any(), gomock.Any()). AnyTimes(). diff --git a/br/pkg/mock/backend.go b/br/pkg/mock/backend.go index 66c829552f5f7..6612fc2b170fd 100644 --- a/br/pkg/mock/backend.go +++ b/br/pkg/mock/backend.go @@ -299,6 +299,21 @@ func (mr *MockTargetInfoGetterMockRecorder) CheckRequirements(arg0, arg1 any) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckRequirements", reflect.TypeOf((*MockTargetInfoGetter)(nil).CheckRequirements), arg0, arg1) } +// FetchRemoteDBModels mocks base method. +func (m *MockTargetInfoGetter) FetchRemoteDBModels(arg0 context.Context) ([]*model.DBInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchRemoteDBModels", arg0) + ret0, _ := ret[0].([]*model.DBInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchRemoteDBModels indicates an expected call of FetchRemoteDBModels. +func (mr *MockTargetInfoGetterMockRecorder) FetchRemoteDBModels(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRemoteDBModels", reflect.TypeOf((*MockTargetInfoGetter)(nil).FetchRemoteDBModels), arg0) +} + // FetchRemoteTableModels mocks base method. func (m *MockTargetInfoGetter) FetchRemoteTableModels(arg0 context.Context, arg1 string) ([]*model.TableInfo, error) { m.ctrl.T.Helper() diff --git a/br/pkg/mock/encode.go b/br/pkg/mock/encode.go index 0470325d353c9..4c466b549fec9 100644 --- a/br/pkg/mock/encode.go +++ b/br/pkg/mock/encode.go @@ -157,20 +157,6 @@ func (mr *MockRowsMockRecorder) Clear() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Clear", reflect.TypeOf((*MockRows)(nil).Clear)) } -// SplitIntoChunks mocks base method. -func (m *MockRows) SplitIntoChunks(arg0 int) []encode.Rows { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SplitIntoChunks", arg0) - ret0, _ := ret[0].([]encode.Rows) - return ret0 -} - -// SplitIntoChunks indicates an expected call of SplitIntoChunks. -func (mr *MockRowsMockRecorder) SplitIntoChunks(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SplitIntoChunks", reflect.TypeOf((*MockRows)(nil).SplitIntoChunks), arg0) -} - // MockRow is a mock of Row interface. type MockRow struct { ctrl *gomock.Controller diff --git a/br/tests/lightning_character_sets/run.sh b/br/tests/lightning_character_sets/run.sh index 3049c53502b22..88059c6ed2495 100755 --- a/br/tests/lightning_character_sets/run.sh +++ b/br/tests/lightning_character_sets/run.sh @@ -80,6 +80,8 @@ check_contains 's: 5291' # test about unsupported charset in UTF-8 encoding dump files # test local backend run_lightning --config "$CUR/greek.toml" -d "$CUR/greek" 2>&1 | grep -q "Unknown character set: 'greek'" +# check TiDB does not receive the DDL +check_not_contains "greek" $TEST_DIR/tidb.log run_sql 'DROP DATABASE IF EXISTS charsets;' run_sql 'CREATE DATABASE charsets;' run_sql 'CREATE TABLE charsets.greek (c VARCHAR(20) PRIMARY KEY);' diff --git a/br/tests/run.sh b/br/tests/run.sh index f3ff0c5dcbd33..a70cd28ffa6f1 100755 --- a/br/tests/run.sh +++ b/br/tests/run.sh @@ -19,6 +19,7 @@ CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) export PATH="$PATH:$CUR/../../bin:$CUR/../bin:$CUR/_utils" export TEST_DIR=/tmp/backup_restore_test export COV_DIR="/tmp/group_cover" +mkdir -p $COV_DIR || true source $CUR/_utils/run_services # Create COV_DIR if not exists