diff --git a/lightning/glue/glue.go b/lightning/glue/glue.go index 6caf38e70..dfcf39d4f 100644 --- a/lightning/glue/glue.go +++ b/lightning/glue/glue.go @@ -19,12 +19,15 @@ import ( "errors" "github.com/pingcap/parser" + "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-lightning/lightning/checkpoints" "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/log" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" ) type Glue interface { @@ -33,7 +36,7 @@ type Glue interface { GetDB() (*sql.DB, error) GetParser() *parser.Parser GetTables(context.Context, string) ([]*model.TableInfo, error) - GetSession() (checkpoints.Session, error) + GetSession(context.Context) (checkpoints.Session, error) OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.CheckpointsDB, error) // Record is used to report some information (key, value) to host TiDB, including progress, stage currently Record(string, uint64) @@ -48,6 +51,39 @@ type SQLExecutor interface { Close() } +// sqlConnSession implement checkpoints.Session used only for lighting itself +type sqlConnSession struct { + checkpoints.Session + conn *sql.Conn +} + +func (session *sqlConnSession) Close() { + session.conn.Close() +} + +func (session *sqlConnSession) Execute(ctx context.Context, sql string) ([]sqlexec.RecordSet, error) { + _, err := session.conn.ExecContext(ctx, sql) + return nil, err +} + +func (session *sqlConnSession) CommitTxn(context.Context) error { + return errors.New("sqlConnSession doesn't have a valid CommitTxn implementation") +} + +func (session *sqlConnSession) RollbackTxn(context.Context) {} + +func (session *sqlConnSession) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error) { + return 0, 0, nil, errors.New("sqlConnSession doesn't have a valid PrepareStmt implementation") +} + +func (session *sqlConnSession) ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error) { + return nil, errors.New("sqlConnSession doesn't have a valid ExecutePreparedStmt implementation") +} + +func (session *sqlConnSession) DropPreparedStmt(stmtID uint32) error { + return errors.New("sqlConnSession doesn't have a valid DropPreparedStmt implementation") +} + type ExternalTiDBGlue struct { db *sql.DB parser *parser.Parser @@ -125,8 +161,12 @@ func (e ExternalTiDBGlue) GetTables(context.Context, string) ([]*model.TableInfo return nil, errors.New("ExternalTiDBGlue doesn't have a valid GetTables function") } -func (e ExternalTiDBGlue) GetSession() (checkpoints.Session, error) { - return nil, errors.New("ExternalTiDBGlue doesn't have a valid GetSession function") +func (e *ExternalTiDBGlue) GetSession(ctx context.Context) (checkpoints.Session, error) { + conn, err := e.db.Conn(ctx) + if err != nil { + return nil, err + } + return &sqlConnSession{conn: conn}, nil } func (e *ExternalTiDBGlue) OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (checkpoints.CheckpointsDB, error) { diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index f4ee5e699..b4c66023d 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -26,10 +26,12 @@ import ( "github.com/pingcap/br/pkg/pdutil" "github.com/pingcap/br/pkg/storage" + "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/failpoint" sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-lightning/lightning/checkpoints" "github.com/pingcap/tidb-lightning/lightning/glue" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/table" @@ -303,48 +305,257 @@ outside: return errors.Trace(err) } -func (rc *RestoreController) restoreSchema(ctx context.Context) error { - if !rc.cfg.Mydumper.NoSchema { - if rc.tidbGlue.OwnsSQLExecutor() { - db, err := DBFromConfig(rc.cfg.TiDB) - if err != nil { - return errors.Annotate(err, "connect to tidb failed") - } - defer db.Close() - db.ExecContext(ctx, "SET SQL_MODE = ?", rc.cfg.TiDB.StrSQLMode) - } +type schemaStmtType int - for _, dbMeta := range rc.dbMetas { - task := log.With(zap.String("db", dbMeta.Name)).Begin(zap.InfoLevel, "restore table schema") +func (stmtType schemaStmtType) String() string { + switch stmtType { + case schemaCreateDatabase: + return "restore database schema" + case schemaCreateTable: + return "restore table schema" + case schemaCreateView: + return "restore view schema" + } + return "unknown statement of schema" +} - tablesSchema := make(map[string]string) - for _, tblMeta := range dbMeta.Tables { - tablesSchema[tblMeta.Name] = tblMeta.GetSchema(ctx, rc.store) - } - err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, tablesSchema) +const ( + schemaCreateDatabase schemaStmtType = iota + schemaCreateTable + schemaCreateView +) - task.End(zap.ErrorLevel, err) - if err != nil { - return errors.Annotatef(err, "restore table schema %s failed", dbMeta.Name) +type schemaJob struct { + dbName string + tblName string // empty for create db jobs + stmtType schemaStmtType + stmts []*schemaStmt +} + +type schemaStmt struct { + sql string +} + +type restoreSchemaWorker struct { + ctx context.Context + quit context.CancelFunc + jobCh chan *schemaJob + errCh chan error + wg sync.WaitGroup + glue glue.Glue + store storage.ExternalStorage +} + +func (worker *restoreSchemaWorker) makeJobs(dbMetas []*mydump.MDDatabaseMeta) error { + defer func() { + close(worker.jobCh) + worker.quit() + }() + var err error + // 1. restore databases, execute statements concurrency + for _, dbMeta := range dbMetas { + restoreSchemaJob := &schemaJob{ + dbName: dbMeta.Name, + stmtType: schemaCreateDatabase, + stmts: make([]*schemaStmt, 0, 1), + } + restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{ + sql: createDatabaseIfNotExistStmt(dbMeta.Name), + }) + err = worker.appendJob(restoreSchemaJob) + if err != nil { + return err + } + } + err = worker.wait() + if err != nil { + return err + } + // 2. restore tables, execute statements concurrency + for _, dbMeta := range dbMetas { + for _, tblMeta := range dbMeta.Tables { + sql := tblMeta.GetSchema(worker.ctx, worker.store) + if sql != "" { + stmts, err := createTableIfNotExistsStmt(worker.glue.GetParser(), sql, dbMeta.Name, tblMeta.Name) + if err != nil { + return err + } + restoreSchemaJob := &schemaJob{ + dbName: dbMeta.Name, + tblName: tblMeta.Name, + stmtType: schemaCreateTable, + stmts: make([]*schemaStmt, 0, len(stmts)), + } + for _, sql := range stmts { + restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{ + sql: sql, + }) + } + err = worker.appendJob(restoreSchemaJob) + if err != nil { + return err + } } } - - // restore views. Since views can cross database we must restore views after all table schemas are restored. - for _, dbMeta := range rc.dbMetas { - if len(dbMeta.Views) > 0 { - task := log.With(zap.String("db", dbMeta.Name)).Begin(zap.InfoLevel, "restore view schema") - viewsSchema := make(map[string]string) - for _, viewMeta := range dbMeta.Views { - viewsSchema[viewMeta.Name] = viewMeta.GetSchema(ctx, rc.store) + } + err = worker.wait() + if err != nil { + return err + } + // 3. restore views. Since views can cross database we must restore views after all table schemas are restored. + for _, dbMeta := range dbMetas { + for _, viewMeta := range dbMeta.Views { + sql := viewMeta.GetSchema(worker.ctx, worker.store) + if sql != "" { + stmts, err := createTableIfNotExistsStmt(worker.glue.GetParser(), sql, dbMeta.Name, viewMeta.Name) + if err != nil { + return err + } + restoreSchemaJob := &schemaJob{ + dbName: dbMeta.Name, + tblName: viewMeta.Name, + stmtType: schemaCreateView, + stmts: make([]*schemaStmt, 0, len(stmts)), + } + for _, sql := range stmts { + restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{ + sql: sql, + }) + } + err = worker.appendJob(restoreSchemaJob) + if err != nil { + return err } - err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, viewsSchema) + // we don't support restore views concurrency, cauz it maybe will raise a error + err = worker.wait() + if err != nil { + return err + } + } + } + } + return nil +} +func (worker *restoreSchemaWorker) doJob() { + var session checkpoints.Session + defer func() { + if session != nil { + session.Close() + } + }() +loop: + for { + select { + case <-worker.ctx.Done(): + // don't `return` or throw `worker.ctx.Err()`here, + // if we `return`, we can't mark cancelled jobs as done, + // if we `throw(worker.ctx.Err())`, it will be blocked to death + break loop + case job := <-worker.jobCh: + if job == nil { + // successful exit + return + } + var err error + if session == nil { + session, err = worker.glue.GetSession(worker.ctx) + if err != nil { + worker.wg.Done() + worker.throw(err) + // don't return + break loop + } + } + logger := log.With(zap.String("db", job.dbName), zap.String("table", job.tblName)) + for _, stmt := range job.stmts { + task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt.sql)) + _, err = session.Execute(worker.ctx, stmt.sql) task.End(zap.ErrorLevel, err) if err != nil { - return errors.Annotatef(err, "restore view schema %s failed", dbMeta.Name) + err = errors.Annotatef(err, "%s %s failed", job.stmtType.String(), common.UniqueTable(job.dbName, job.tblName)) + worker.wg.Done() + worker.throw(err) + // don't return + break loop } } + worker.wg.Done() + } + } + // mark the cancelled job as `Done`, a little tricky, + // cauz we need make sure `worker.wg.Wait()` wouldn't blocked forever + for range worker.jobCh { + worker.wg.Done() + } +} + +func (worker *restoreSchemaWorker) wait() error { + // avoid to `worker.wg.Wait()` blocked forever when all `doJob`'s goroutine exited. + // don't worry about goroutine below, it never become a zombie, + // cauz we have mechanism to clean cancelled jobs in `worker.jobCh`. + // means whole jobs has been send to `worker.jobCh` would be done. + waitCh := make(chan struct{}) + go func() { + worker.wg.Wait() + close(waitCh) + }() + select { + case err := <-worker.errCh: + return err + case <-worker.ctx.Done(): + return worker.ctx.Err() + case <-waitCh: + return nil + } +} + +func (worker *restoreSchemaWorker) throw(err error) { + select { + case <-worker.ctx.Done(): + // don't throw `worker.ctx.Err()` again, it will be blocked to death. + return + case worker.errCh <- err: + worker.quit() + } +} +func (worker *restoreSchemaWorker) appendJob(job *schemaJob) error { + worker.wg.Add(1) + select { + case err := <-worker.errCh: + // cancel the job + worker.wg.Done() + return err + case <-worker.ctx.Done(): + // cancel the job + worker.wg.Done() + return worker.ctx.Err() + case worker.jobCh <- job: + return nil + } +} + +func (rc *RestoreController) restoreSchema(ctx context.Context) error { + if !rc.cfg.Mydumper.NoSchema { + logTask := log.L().Begin(zap.InfoLevel, "restore all schema") + concurrency := utils.MinInt(rc.cfg.App.RegionConcurrency, 8) + childCtx, cancel := context.WithCancel(ctx) + worker := restoreSchemaWorker{ + ctx: childCtx, + quit: cancel, + jobCh: make(chan *schemaJob, concurrency), + errCh: make(chan error), + glue: rc.tidbGlue, + store: rc.store, + } + for i := 0; i < concurrency; i++ { + go worker.doJob() + } + err := worker.makeJobs(rc.dbMetas) + logTask.End(zap.ErrorLevel, err) + if err != nil { + return err } } getTableFunc := rc.backend.FetchRemoteTableModels diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 2160337d2..c552c152e 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -1118,3 +1118,157 @@ func (s *chunkRestoreSuite) TestRestore(c *C) { c.Assert(err, IsNil) c.Assert(saveCpCh, HasLen, 2) } + +var _ = Suite(&restoreSchemaSuite{}) + +type restoreSchemaSuite struct { + ctx context.Context + rc *RestoreController + controller *gomock.Controller +} + +func (s *restoreSchemaSuite) SetUpSuite(c *C) { + ctx := context.Background() + fakeDataDir := c.MkDir() + store, err := storage.NewLocalStorage(fakeDataDir) + c.Assert(err, IsNil) + // restore database schema file + fakeDBName := "fakedb" + // please follow the `mydump.defaultFileRouteRules`, matches files like '{schema}-schema-create.sql' + fakeFileName := fmt.Sprintf("%s-schema-create.sql", fakeDBName) + err = store.Write(ctx, fakeFileName, []byte(fmt.Sprintf("CREATE DATABASE %s;", fakeDBName))) + c.Assert(err, IsNil) + // restore table schema files + fakeTableFilesCount := 8 + for i := 1; i <= fakeTableFilesCount; i++ { + fakeTableName := fmt.Sprintf("tbl%d", i) + // please follow the `mydump.defaultFileRouteRules`, matches files like '{schema}.{table}-schema.sql' + fakeFileName := fmt.Sprintf("%s.%s-schema.sql", fakeDBName, fakeTableName) + fakeFileContent := []byte(fmt.Sprintf("CREATE TABLE %s(i TINYINT);", fakeTableName)) + err = store.Write(ctx, fakeFileName, fakeFileContent) + c.Assert(err, IsNil) + } + // restore view schema files + fakeViewFilesCount := 8 + for i := 1; i <= fakeViewFilesCount; i++ { + fakeViewName := fmt.Sprintf("tbl%d", i) + // please follow the `mydump.defaultFileRouteRules`, matches files like '{schema}.{table}-schema-view.sql' + fakeFileName := fmt.Sprintf("%s.%s-schema-view.sql", fakeDBName, fakeViewName) + fakeFileContent := []byte(fmt.Sprintf("CREATE ALGORITHM=UNDEFINED VIEW `%s` (`i`) AS SELECT `i` FROM `%s`.`%s`;", fakeViewName, fakeDBName, fmt.Sprintf("tbl%d", i))) + err = store.Write(ctx, fakeFileName, fakeFileContent) + c.Assert(err, IsNil) + } + config := config.NewConfig() + config.Mydumper.NoSchema = false + config.Mydumper.DefaultFileRules = true + config.Mydumper.CharacterSet = "utf8mb4" + config.App.RegionConcurrency = 8 + mydumpLoader, err := mydump.NewMyDumpLoaderWithStore(ctx, config, store) + c.Assert(err, IsNil) + s.rc = &RestoreController{ + cfg: config, + store: store, + dbMetas: mydumpLoader.GetDatabases(), + checkpointsDB: &checkpoints.NullCheckpointsDB{}, + } +} + +func (s *restoreSchemaSuite) SetUpTest(c *C) { + s.controller, s.ctx = gomock.WithContext(context.Background(), c) + mockBackend := mock.NewMockBackend(s.controller) + // We don't care the execute results of those + mockBackend.EXPECT(). + FetchRemoteTableModels(gomock.Any(), gomock.Any()). + AnyTimes(). + Return(make([]*model.TableInfo, 0), nil) + s.rc.backend = kv.MakeBackend(mockBackend) + mockSQLExecutor := mock.NewMockSQLExecutor(s.controller) + mockSQLExecutor.EXPECT(). + ExecuteWithLog(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + Return(nil) + mockSession := mock.NewMockSession(s.controller) + mockSession.EXPECT(). + Close(). + AnyTimes(). + Return() + mockSession.EXPECT(). + Execute(gomock.Any(), gomock.Any()). + AnyTimes(). + Return(nil, nil) + mockTiDBGlue := mock.NewMockGlue(s.controller) + mockTiDBGlue.EXPECT(). + GetSQLExecutor(). + AnyTimes(). + Return(mockSQLExecutor) + mockTiDBGlue.EXPECT(). + GetSession(gomock.Any()). + AnyTimes(). + Return(mockSession, nil) + mockTiDBGlue.EXPECT(). + OwnsSQLExecutor(). + AnyTimes(). + Return(true) + parser := parser.New() + mockTiDBGlue.EXPECT(). + GetParser(). + AnyTimes(). + Return(parser) + s.rc.tidbGlue = mockTiDBGlue +} + +func (s *restoreSchemaSuite) TearDownTest(c *C) { + s.rc.Close() + s.controller.Finish() +} + +func (s *restoreSchemaSuite) TestRestoreSchemaSuccessful(c *C) { + err := s.rc.restoreSchema(s.ctx) + c.Assert(err, IsNil) +} + +func (s *restoreSchemaSuite) TestRestoreSchemaFailed(c *C) { + injectErr := errors.New("Somthing wrong") + mockSession := mock.NewMockSession(s.controller) + mockSession.EXPECT(). + Close(). + AnyTimes(). + Return() + mockSession.EXPECT(). + Execute(gomock.Any(), gomock.Any()). + AnyTimes(). + Return(nil, injectErr) + mockTiDBGlue := mock.NewMockGlue(s.controller) + mockTiDBGlue.EXPECT(). + GetSession(gomock.Any()). + AnyTimes(). + Return(mockSession, nil) + s.rc.tidbGlue = mockTiDBGlue + err := s.rc.restoreSchema(s.ctx) + c.Assert(err, NotNil) + c.Assert(errors.ErrorEqual(err, injectErr), IsTrue) +} + +func (s *restoreSchemaSuite) TestRestoreSchemaContextCancel(c *C) { + childCtx, cancel := context.WithCancel(s.ctx) + mockSession := mock.NewMockSession(s.controller) + mockSession.EXPECT(). + Close(). + AnyTimes(). + Return() + mockSession.EXPECT(). + Execute(gomock.Any(), gomock.Any()). + AnyTimes(). + Do(func(context.Context, string) { cancel() }). + Return(nil, nil) + mockTiDBGlue := mock.NewMockGlue(s.controller) + mockTiDBGlue.EXPECT(). + GetSession(gomock.Any()). + AnyTimes(). + Return(mockSession, nil) + s.rc.tidbGlue = mockTiDBGlue + err := s.rc.restoreSchema(childCtx) + cancel() + c.Assert(err, NotNil) + c.Assert(err, Equals, childCtx.Err()) +} diff --git a/lightning/restore/tidb.go b/lightning/restore/tidb.go index 570d9c706..33a1773eb 100644 --- a/lightning/restore/tidb.go +++ b/lightning/restore/tidb.go @@ -184,6 +184,13 @@ loopCreate: return errors.Trace(err) } +func createDatabaseIfNotExistStmt(dbName string) string { + var createDatabase strings.Builder + createDatabase.WriteString("CREATE DATABASE IF NOT EXISTS ") + common.WriteMySQLIdentifier(&createDatabase, dbName) + return createDatabase.String() +} + func createTableIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string) ([]string, error) { stmts, _, err := p.Parse(createTable, "", "") if err != nil { diff --git a/mock/glue.go b/mock/glue.go new file mode 100644 index 000000000..3a09b4a7d --- /dev/null +++ b/mock/glue.go @@ -0,0 +1,234 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/pingcap/tidb-lightning/lightning/glue/glue.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + sql "database/sql" + gomock "github.com/golang/mock/gomock" + parser "github.com/pingcap/parser" + model "github.com/pingcap/parser/model" + checkpoints "github.com/pingcap/tidb-lightning/lightning/checkpoints" + config "github.com/pingcap/tidb-lightning/lightning/config" + glue "github.com/pingcap/tidb-lightning/lightning/glue" + log "github.com/pingcap/tidb-lightning/lightning/log" + reflect "reflect" +) + +// MockGlue is a mock of Glue interface +type MockGlue struct { + ctrl *gomock.Controller + recorder *MockGlueMockRecorder +} + +// MockGlueMockRecorder is the mock recorder for MockGlue +type MockGlueMockRecorder struct { + mock *MockGlue +} + +// NewMockGlue creates a new mock instance +func NewMockGlue(ctrl *gomock.Controller) *MockGlue { + mock := &MockGlue{ctrl: ctrl} + mock.recorder = &MockGlueMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockGlue) EXPECT() *MockGlueMockRecorder { + return m.recorder +} + +// OwnsSQLExecutor mocks base method +func (m *MockGlue) OwnsSQLExecutor() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OwnsSQLExecutor") + ret0, _ := ret[0].(bool) + return ret0 +} + +// OwnsSQLExecutor indicates an expected call of OwnsSQLExecutor +func (mr *MockGlueMockRecorder) OwnsSQLExecutor() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OwnsSQLExecutor", reflect.TypeOf((*MockGlue)(nil).OwnsSQLExecutor)) +} + +// GetSQLExecutor mocks base method +func (m *MockGlue) GetSQLExecutor() glue.SQLExecutor { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSQLExecutor") + ret0, _ := ret[0].(glue.SQLExecutor) + return ret0 +} + +// GetSQLExecutor indicates an expected call of GetSQLExecutor +func (mr *MockGlueMockRecorder) GetSQLExecutor() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSQLExecutor", reflect.TypeOf((*MockGlue)(nil).GetSQLExecutor)) +} + +// GetDB mocks base method +func (m *MockGlue) GetDB() (*sql.DB, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDB") + ret0, _ := ret[0].(*sql.DB) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetDB indicates an expected call of GetDB +func (mr *MockGlueMockRecorder) GetDB() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDB", reflect.TypeOf((*MockGlue)(nil).GetDB)) +} + +// GetParser mocks base method +func (m *MockGlue) GetParser() *parser.Parser { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetParser") + ret0, _ := ret[0].(*parser.Parser) + return ret0 +} + +// GetParser indicates an expected call of GetParser +func (mr *MockGlueMockRecorder) GetParser() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetParser", reflect.TypeOf((*MockGlue)(nil).GetParser)) +} + +// GetTables mocks base method +func (m *MockGlue) GetTables(arg0 context.Context, arg1 string) ([]*model.TableInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTables", arg0, arg1) + ret0, _ := ret[0].([]*model.TableInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTables indicates an expected call of GetTables +func (mr *MockGlueMockRecorder) GetTables(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTables", reflect.TypeOf((*MockGlue)(nil).GetTables), arg0, arg1) +} + +// GetSession mocks base method +func (m *MockGlue) GetSession(arg0 context.Context) (checkpoints.Session, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSession", arg0) + ret0, _ := ret[0].(checkpoints.Session) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSession indicates an expected call of GetSession +func (mr *MockGlueMockRecorder) GetSession(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSession", reflect.TypeOf((*MockGlue)(nil).GetSession), arg0) +} + +// OpenCheckpointsDB mocks base method +func (m *MockGlue) OpenCheckpointsDB(arg0 context.Context, arg1 *config.Config) (checkpoints.CheckpointsDB, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OpenCheckpointsDB", arg0, arg1) + ret0, _ := ret[0].(checkpoints.CheckpointsDB) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OpenCheckpointsDB indicates an expected call of OpenCheckpointsDB +func (mr *MockGlueMockRecorder) OpenCheckpointsDB(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenCheckpointsDB", reflect.TypeOf((*MockGlue)(nil).OpenCheckpointsDB), arg0, arg1) +} + +// Record mocks base method +func (m *MockGlue) Record(arg0 string, arg1 uint64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Record", arg0, arg1) +} + +// Record indicates an expected call of Record +func (mr *MockGlueMockRecorder) Record(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Record", reflect.TypeOf((*MockGlue)(nil).Record), arg0, arg1) +} + +// MockSQLExecutor is a mock of SQLExecutor interface +type MockSQLExecutor struct { + ctrl *gomock.Controller + recorder *MockSQLExecutorMockRecorder +} + +// MockSQLExecutorMockRecorder is the mock recorder for MockSQLExecutor +type MockSQLExecutorMockRecorder struct { + mock *MockSQLExecutor +} + +// NewMockSQLExecutor creates a new mock instance +func NewMockSQLExecutor(ctrl *gomock.Controller) *MockSQLExecutor { + mock := &MockSQLExecutor{ctrl: ctrl} + mock.recorder = &MockSQLExecutorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockSQLExecutor) EXPECT() *MockSQLExecutorMockRecorder { + return m.recorder +} + +// ExecuteWithLog mocks base method +func (m *MockSQLExecutor) ExecuteWithLog(ctx context.Context, query, purpose string, logger log.Logger) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ExecuteWithLog", ctx, query, purpose, logger) + ret0, _ := ret[0].(error) + return ret0 +} + +// ExecuteWithLog indicates an expected call of ExecuteWithLog +func (mr *MockSQLExecutorMockRecorder) ExecuteWithLog(ctx, query, purpose, logger interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecuteWithLog", reflect.TypeOf((*MockSQLExecutor)(nil).ExecuteWithLog), ctx, query, purpose, logger) +} + +// ObtainStringWithLog mocks base method +func (m *MockSQLExecutor) ObtainStringWithLog(ctx context.Context, query, purpose string, logger log.Logger) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ObtainStringWithLog", ctx, query, purpose, logger) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ObtainStringWithLog indicates an expected call of ObtainStringWithLog +func (mr *MockSQLExecutorMockRecorder) ObtainStringWithLog(ctx, query, purpose, logger interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObtainStringWithLog", reflect.TypeOf((*MockSQLExecutor)(nil).ObtainStringWithLog), ctx, query, purpose, logger) +} + +// QueryStringsWithLog mocks base method +func (m *MockSQLExecutor) QueryStringsWithLog(ctx context.Context, query, purpose string, logger log.Logger) ([][]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueryStringsWithLog", ctx, query, purpose, logger) + ret0, _ := ret[0].([][]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// QueryStringsWithLog indicates an expected call of QueryStringsWithLog +func (mr *MockSQLExecutorMockRecorder) QueryStringsWithLog(ctx, query, purpose, logger interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryStringsWithLog", reflect.TypeOf((*MockSQLExecutor)(nil).QueryStringsWithLog), ctx, query, purpose, logger) +} + +// Close mocks base method +func (m *MockSQLExecutor) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close +func (mr *MockSQLExecutorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSQLExecutor)(nil).Close)) +} diff --git a/mock/glue_checkpoint.go b/mock/glue_checkpoint.go new file mode 100644 index 000000000..f9ba9a28f --- /dev/null +++ b/mock/glue_checkpoint.go @@ -0,0 +1,136 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/pingcap/tidb-lightning/lightning/checkpoints/glue_checkpoint.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + gomock "github.com/golang/mock/gomock" + ast "github.com/pingcap/parser/ast" + types "github.com/pingcap/tidb/types" + sqlexec "github.com/pingcap/tidb/util/sqlexec" + reflect "reflect" +) + +// MockSession is a mock of Session interface +type MockSession struct { + ctrl *gomock.Controller + recorder *MockSessionMockRecorder +} + +// MockSessionMockRecorder is the mock recorder for MockSession +type MockSessionMockRecorder struct { + mock *MockSession +} + +// NewMockSession creates a new mock instance +func NewMockSession(ctrl *gomock.Controller) *MockSession { + mock := &MockSession{ctrl: ctrl} + mock.recorder = &MockSessionMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockSession) EXPECT() *MockSessionMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockSession) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close +func (mr *MockSessionMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSession)(nil).Close)) +} + +// Execute mocks base method +func (m *MockSession) Execute(arg0 context.Context, arg1 string) ([]sqlexec.RecordSet, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Execute", arg0, arg1) + ret0, _ := ret[0].([]sqlexec.RecordSet) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Execute indicates an expected call of Execute +func (mr *MockSessionMockRecorder) Execute(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Execute", reflect.TypeOf((*MockSession)(nil).Execute), arg0, arg1) +} + +// CommitTxn mocks base method +func (m *MockSession) CommitTxn(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CommitTxn", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CommitTxn indicates an expected call of CommitTxn +func (mr *MockSessionMockRecorder) CommitTxn(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommitTxn", reflect.TypeOf((*MockSession)(nil).CommitTxn), arg0) +} + +// RollbackTxn mocks base method +func (m *MockSession) RollbackTxn(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RollbackTxn", arg0) +} + +// RollbackTxn indicates an expected call of RollbackTxn +func (mr *MockSessionMockRecorder) RollbackTxn(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RollbackTxn", reflect.TypeOf((*MockSession)(nil).RollbackTxn), arg0) +} + +// PrepareStmt mocks base method +func (m *MockSession) PrepareStmt(sql string) (uint32, int, []*ast.ResultField, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PrepareStmt", sql) + ret0, _ := ret[0].(uint32) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].([]*ast.ResultField) + ret3, _ := ret[3].(error) + return ret0, ret1, ret2, ret3 +} + +// PrepareStmt indicates an expected call of PrepareStmt +func (mr *MockSessionMockRecorder) PrepareStmt(sql interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareStmt", reflect.TypeOf((*MockSession)(nil).PrepareStmt), sql) +} + +// ExecutePreparedStmt mocks base method +func (m *MockSession) ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ExecutePreparedStmt", ctx, stmtID, param) + ret0, _ := ret[0].(sqlexec.RecordSet) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ExecutePreparedStmt indicates an expected call of ExecutePreparedStmt +func (mr *MockSessionMockRecorder) ExecutePreparedStmt(ctx, stmtID, param interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecutePreparedStmt", reflect.TypeOf((*MockSession)(nil).ExecutePreparedStmt), ctx, stmtID, param) +} + +// DropPreparedStmt mocks base method +func (m *MockSession) DropPreparedStmt(stmtID uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DropPreparedStmt", stmtID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DropPreparedStmt indicates an expected call of DropPreparedStmt +func (mr *MockSessionMockRecorder) DropPreparedStmt(stmtID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropPreparedStmt", reflect.TypeOf((*MockSession)(nil).DropPreparedStmt), stmtID) +}