Skip to content

Commit

Permalink
Merge pull request #627 from redHJ/sqlmetafix
Browse files Browse the repository at this point in the history
meta fix
  • Loading branch information
wonderflow authored Jul 23, 2018
2 parents 8646fcf + 2e100c7 commit 4cdcbdd
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 42 deletions.
49 changes: 31 additions & 18 deletions reader/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (syncDBRecords *SyncDBRecords) SetTableRecords(db string, tableRecords Tabl
syncDBRecords.mutex.Unlock()
}

func (syncDBRecords *SyncDBRecords) SetTableInfo(db, table string, tableInfo TableInfo) {
syncDBRecords.mutex.Lock()
syncDBRecords.records.SetTableInfo(db, table, tableInfo)
syncDBRecords.mutex.Unlock()
}

func (syncDBRecords *SyncDBRecords) GetTableRecords(db string) TableRecords {
syncDBRecords.mutex.RLock()
tableRecords := syncDBRecords.records.GetTableRecords(db)
Expand Down Expand Up @@ -177,6 +183,18 @@ func (dbRecords *DBRecords) SetTableRecords(db string, tableRecords TableRecords
(*dbRecords)[db] = tableRecords
}

func (dbRecords *DBRecords) SetTableInfo(db, table string, tableInfo TableInfo) {
if *dbRecords == nil {
*dbRecords = make(DBRecords)
}
tableRecords := (*dbRecords)[db]
if tableRecords == nil {
tableRecords = make(TableRecords)
}
tableRecords[table] = tableInfo
(*dbRecords)[db] = tableRecords
}

func (dbRecords *DBRecords) GetTableRecords(db string) TableRecords {
if *dbRecords != nil {
return (*dbRecords)[db]
Expand Down Expand Up @@ -489,17 +507,17 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l
return lastDB, lastTable, true
}

omitDoneDBRecords = true
recordsDoneLength := len(recordsDone)
if recordsDoneLength <= 0 {
return lastDB, lastTable, true
}

omitDoneDBRecords = false
for idx, record := range recordsDone {
tmpDBRecords := TrimeList(strings.Split(record, sqlOffsetConnector))
if int64(len(tmpDBRecords)) != 2 {
log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), record)
return lastDB, lastTable, true
continue
}

database := tmpDBRecords[0]
Expand All @@ -512,16 +530,17 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l
tmpTablesRecords := TrimeList(strings.Split(tmpDBRecords[1], "@"))
if int64(len(tmpTablesRecords)) < 1 {
log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), tmpDBRecords)
return lastDB, lastTable, true
continue
}

for idx, tableRecord := range tmpTablesRecords {
tableRecordArr := strings.Split(tableRecord, ",")
if int64(len(tableRecordArr)) != 4 {
log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), tableRecord)
return lastDB, lastTable, true
continue
}

omitDoneDBRecords = false
size, err := strconv.ParseInt(tableRecordArr[1], 10, 64)
if err != nil {
log.Errorf("Runner[%v] %v -meta file sql is out of date %v or parse size err %v, omit this offset", meta.RunnerName, meta.MetaFile(), tableRecordArr[1], err)
Expand All @@ -544,13 +563,15 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l
}
}

dbRecords.SetTableRecords(database, tableRecords)
if len(tableRecords) != 0 {
dbRecords.SetTableRecords(database, tableRecords)
}
if idx == recordsDoneLength-1 {
lastDB = database
}
}

return lastDB, lastTable, false
return lastDB, lastTable, omitDoneDBRecords
}

func convertMagic(magic string, now time.Time) (ret string) {
Expand Down Expand Up @@ -1159,7 +1180,6 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR
exit := false
var tableName string
var readSize int64
tmpTablesRecords := r.syncRecords.GetTableRecords(curDB)
for !exit {
if r.rawsqls == "" && idx < tablesLen {
tableName = tables[idx]
Expand All @@ -1174,9 +1194,8 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR
}

if r.rawsqls == "" {
tmpTablesRecords.SetTableInfo(tableName, TableInfo{size: readSize, offset: -1})
r.syncRecords.SetTableRecords(curDB, tmpTablesRecords)
r.doneRecords.SetTableRecords(curDB, tmpTablesRecords)
r.syncRecords.SetTableInfo(curDB, tableName, TableInfo{size: readSize, offset: -1})
r.doneRecords.SetTableInfo(curDB, tableName, TableInfo{size: readSize, offset: -1})
recordTablesDone.SetTableInfo(tableName, TableInfo{size: readSize, offset: -1})
}

Expand Down Expand Up @@ -1498,17 +1517,12 @@ func (r *Reader) SyncMeta() {
dbRecords := r.syncRecords.GetDBRecords()

for database, tablesRecord := range dbRecords {
var tablesRecordStr string
for table, tableInfo := range tablesRecord {
tablesRecordStr += table + "," +
all += database + sqlOffsetConnector + table + "," +
strconv.FormatInt(tableInfo.size, 10) + "," +
strconv.FormatInt(tableInfo.offset, 10) + "," +
now + "@"
}
if tablesRecordStr == "" {
continue
now + "@" + "\n"
}
all += database + sqlOffsetConnector + tablesRecordStr + "\n"
}

if len(all) <= 0 {
Expand All @@ -1518,7 +1532,6 @@ func (r *Reader) SyncMeta() {

if err := WriteRecordsFile(r.meta.DoneFilePath, all); err != nil {
log.Errorf("Runner[%v] %v SyncMeta error %v", r.meta.RunnerName, r.Name(), err)
return
}
r.syncRecords.Reset()
return
Expand Down
Loading

0 comments on commit 4cdcbdd

Please sign in to comment.