diff --git a/drainer/config.go b/drainer/config.go index e086a3f7a..07b672fa9 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -122,7 +122,7 @@ type SyncerConfig struct { EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"` DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"` EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"` - LoadTableInfos bool `toml:"load-table-infos" json:"load-table-infos"` + LoadSchemaSnapshot bool `toml:"load-schema-snapshot" json:"load-schema-snapshot"` // v2 filter rules CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` @@ -253,7 +253,7 @@ func NewConfig() *Config { fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant") fs.BoolVar(cfg.SyncerCfg.DisableCausalityFlag, "disable-detect", false, "DEPRECATED, use enable-detect") fs.BoolVar(cfg.SyncerCfg.EnableCausalityFlag, "enable-detect", true, "enable detect causality") - fs.BoolVar(&cfg.SyncerCfg.LoadTableInfos, "load-table-infos", false, "load table infos") + fs.BoolVar(&cfg.SyncerCfg.LoadSchemaSnapshot, "load-schema-snapshot", false, "init drainer schema info through pd meta interface, need to make sure checkpoint ts is not garbage collected in upstream") fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size") fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced") fs.StringVar(new(string), "log-rotate", "", "DEPRECATED") diff --git a/drainer/server.go b/drainer/server.go index fc73f5b95..4ca00f0cf 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -202,7 +202,7 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig) defer tiStore.Close() var jobs []*model.Job - if cfg.LoadTableInfos { + if cfg.LoadSchemaSnapshot { jobs, err = loadTableInfos(tiStore, cp.TS()) } else { jobs, err = loadHistoryDDLJobs(tiStore) @@ -281,7 +281,7 @@ func (s *Server) Start() error { } }) - if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadTableInfos { + if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadSchemaSnapshot { s.tg.GoNoPanic("gc_safepoint", func() { defer func() { go s.Close() }() pdCli, err := getPdClient(s.cfg.EtcdURLs, s.cfg.Security) diff --git a/drainer/util.go b/drainer/util.go index b9d2e4bb8..ff573d1cd 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -214,11 +214,22 @@ func loadTableInfos(tiStore kv.Storage, startTs int64) ([]*model.Job, error) { if err != nil { return nil, errors.Trace(err) } + if len(tableInfos) == 0 { + continue + } for _, tableInfo := range tableInfos { log.L().Debug("load table info", zap.Stringer("db", dbinfo.Name), zap.Stringer("table", tableInfo.Name), zap.Int64("version", version)) - jobs = append(jobs, mockCreateTableJob(tableInfo, dbinfo.ID, version)) - version++ } + jobs = append(jobs, &model.Job{ + Type: model.ActionCreateTables, + State: model.JobStateDone, + SchemaID: dbinfo.ID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: version, + MultipleTableInfos: tableInfos, + }, + }) + version++ } return jobs, nil } @@ -379,15 +390,3 @@ func mockCreateSchemaJob(dbInfo *model.DBInfo, schemaVersion int64) *model.Job { }, } } - -func mockCreateTableJob(tableInfo *model.TableInfo, schemaID, schemaVersion int64) *model.Job { - return &model.Job{ - Type: model.ActionCreateTable, - State: model.JobStateDone, - SchemaID: schemaID, - BinlogInfo: &model.HistoryInfo{ - SchemaVersion: schemaVersion, - TableInfo: tableInfo, - }, - } -}