Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: fix too much version when initializing table infos #1237

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type SyncerConfig struct {
EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"`
DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"`
EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"`
LoadTableInfos bool `toml:"load-table-infos" json:"load-table-infos"`
LoadSchemaSnapshot bool `toml:"load-schema-snapshot" json:"load-schema-snapshot"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in-compatible change? might need tell customer about this if they uses old parameter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this parameter is never released. I have discussed with frank before and forget to change this.


// v2 filter rules
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
Expand Down Expand Up @@ -253,7 +253,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(cfg.SyncerCfg.DisableCausalityFlag, "disable-detect", false, "DEPRECATED, use enable-detect")
fs.BoolVar(cfg.SyncerCfg.EnableCausalityFlag, "enable-detect", true, "enable detect causality")
fs.BoolVar(&cfg.SyncerCfg.LoadTableInfos, "load-table-infos", false, "load table infos")
fs.BoolVar(&cfg.SyncerCfg.LoadSchemaSnapshot, "load-schema-snapshot", false, "init drainer schema info through pd meta interface, need to make sure checkpoint ts is not garbage collected in upstream")
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")
Expand Down
4 changes: 2 additions & 2 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig)
defer tiStore.Close()

var jobs []*model.Job
if cfg.LoadTableInfos {
if cfg.LoadSchemaSnapshot {
jobs, err = loadTableInfos(tiStore, cp.TS())
} else {
jobs, err = loadHistoryDDLJobs(tiStore)
Expand Down Expand Up @@ -281,7 +281,7 @@ func (s *Server) Start() error {
}
})

if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadTableInfos {
if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadSchemaSnapshot {
s.tg.GoNoPanic("gc_safepoint", func() {
defer func() { go s.Close() }()
pdCli, err := getPdClient(s.cfg.EtcdURLs, s.cfg.Security)
Expand Down
27 changes: 13 additions & 14 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,22 @@ func loadTableInfos(tiStore kv.Storage, startTs int64) ([]*model.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}
if len(tableInfos) == 0 {
continue
}
for _, tableInfo := range tableInfos {
log.L().Debug("load table info", zap.Stringer("db", dbinfo.Name), zap.Stringer("table", tableInfo.Name), zap.Int64("version", version))
jobs = append(jobs, mockCreateTableJob(tableInfo, dbinfo.ID, version))
version++
}
jobs = append(jobs, &model.Job{
Type: model.ActionCreateTables,
State: model.JobStateDone,
SchemaID: dbinfo.ID,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: version,
MultipleTableInfos: tableInfos,
},
})
version++
}
return jobs, nil
}
Expand Down Expand Up @@ -379,15 +390,3 @@ func mockCreateSchemaJob(dbInfo *model.DBInfo, schemaVersion int64) *model.Job {
},
}
}

func mockCreateTableJob(tableInfo *model.TableInfo, schemaID, schemaVersion int64) *model.Job {
return &model.Job{
Type: model.ActionCreateTable,
State: model.JobStateDone,
SchemaID: schemaID,
BinlogInfo: &model.HistoryInfo{
SchemaVersion: schemaVersion,
TableInfo: tableInfo,
},
}
}