Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复mysql meta文件记录出错可能存在重启后重复读取数据的问题 #627

Merged
merged 1 commit into from
Jul 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions reader/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (syncDBRecords *SyncDBRecords) SetTableRecords(db string, tableRecords Tabl
syncDBRecords.mutex.Unlock()
}

func (syncDBRecords *SyncDBRecords) SetTableInfo(db, table string, tableInfo TableInfo) {
syncDBRecords.mutex.Lock()
syncDBRecords.records.SetTableInfo(db, table, tableInfo)
syncDBRecords.mutex.Unlock()
}

func (syncDBRecords *SyncDBRecords) GetTableRecords(db string) TableRecords {
syncDBRecords.mutex.RLock()
tableRecords := syncDBRecords.records.GetTableRecords(db)
Expand Down Expand Up @@ -177,6 +183,18 @@ func (dbRecords *DBRecords) SetTableRecords(db string, tableRecords TableRecords
(*dbRecords)[db] = tableRecords
}

func (dbRecords *DBRecords) SetTableInfo(db, table string, tableInfo TableInfo) {
if *dbRecords == nil {
*dbRecords = make(DBRecords)
}
tableRecords := (*dbRecords)[db]
if tableRecords == nil {
tableRecords = make(TableRecords)
}
tableRecords[table] = tableInfo
(*dbRecords)[db] = tableRecords
}

func (dbRecords *DBRecords) GetTableRecords(db string) TableRecords {
if *dbRecords != nil {
return (*dbRecords)[db]
Expand Down Expand Up @@ -489,17 +507,17 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l
return lastDB, lastTable, true
}

omitDoneDBRecords = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

默认不是false吗?true的时候你直接return了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为现在出错的时候会continue,不是直接return true了

recordsDoneLength := len(recordsDone)
if recordsDoneLength <= 0 {
return lastDB, lastTable, true
}

omitDoneDBRecords = false
for idx, record := range recordsDone {
tmpDBRecords := TrimeList(strings.Split(record, sqlOffsetConnector))
if int64(len(tmpDBRecords)) != 2 {
log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), record)
return lastDB, lastTable, true
continue
}

database := tmpDBRecords[0]
Expand All @@ -512,16 +530,17 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l
tmpTablesRecords := TrimeList(strings.Split(tmpDBRecords[1], "@"))
if int64(len(tmpTablesRecords)) < 1 {
log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), tmpDBRecords)
return lastDB, lastTable, true
continue
}

for idx, tableRecord := range tmpTablesRecords {
tableRecordArr := strings.Split(tableRecord, ",")
if int64(len(tableRecordArr)) != 4 {
log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), tableRecord)
return lastDB, lastTable, true
continue
}

omitDoneDBRecords = false
size, err := strconv.ParseInt(tableRecordArr[1], 10, 64)
if err != nil {
log.Errorf("Runner[%v] %v -meta file sql is out of date %v or parse size err %v, omit this offset", meta.RunnerName, meta.MetaFile(), tableRecordArr[1], err)
Expand All @@ -544,13 +563,15 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l
}
}

dbRecords.SetTableRecords(database, tableRecords)
if len(tableRecords) != 0 {
dbRecords.SetTableRecords(database, tableRecords)
}
if idx == recordsDoneLength-1 {
lastDB = database
}
}

return lastDB, lastTable, false
return lastDB, lastTable, omitDoneDBRecords
}

func convertMagic(magic string, now time.Time) (ret string) {
Expand Down Expand Up @@ -1159,7 +1180,6 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR
exit := false
var tableName string
var readSize int64
tmpTablesRecords := r.syncRecords.GetTableRecords(curDB)
for !exit {
if r.rawsqls == "" && idx < tablesLen {
tableName = tables[idx]
Expand All @@ -1174,9 +1194,8 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR
}

if r.rawsqls == "" {
tmpTablesRecords.SetTableInfo(tableName, TableInfo{size: readSize, offset: -1})
r.syncRecords.SetTableRecords(curDB, tmpTablesRecords)
r.doneRecords.SetTableRecords(curDB, tmpTablesRecords)
r.syncRecords.SetTableInfo(curDB, tableName, TableInfo{size: readSize, offset: -1})
r.doneRecords.SetTableInfo(curDB, tableName, TableInfo{size: readSize, offset: -1})
recordTablesDone.SetTableInfo(tableName, TableInfo{size: readSize, offset: -1})
}

Expand Down Expand Up @@ -1498,17 +1517,12 @@ func (r *Reader) SyncMeta() {
dbRecords := r.syncRecords.GetDBRecords()

for database, tablesRecord := range dbRecords {
var tablesRecordStr string
for table, tableInfo := range tablesRecord {
tablesRecordStr += table + "," +
all += database + sqlOffsetConnector + table + "," +
strconv.FormatInt(tableInfo.size, 10) + "," +
strconv.FormatInt(tableInfo.offset, 10) + "," +
now + "@"
}
if tablesRecordStr == "" {
continue
now + "@" + "\n"
}
all += database + sqlOffsetConnector + tablesRecordStr + "\n"
}

if len(all) <= 0 {
Expand All @@ -1518,7 +1532,6 @@ func (r *Reader) SyncMeta() {

if err := WriteRecordsFile(r.meta.DoneFilePath, all); err != nil {
log.Errorf("Runner[%v] %v SyncMeta error %v", r.meta.RunnerName, r.Name(), err)
return
}
r.syncRecords.Reset()
return
Expand Down
Loading