From 2e100c76519f40a7c3c7f547d638b86e6999a384 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E7=BA=A2?= Date: Mon, 23 Jul 2018 12:11:50 +0800 Subject: [PATCH] meta fix --- reader/sql/sql.go | 49 ++++++++++------ reader/sql/sql_test.go | 128 +++++++++++++++++++++++++++++++++-------- 2 files changed, 135 insertions(+), 42 deletions(-) diff --git a/reader/sql/sql.go b/reader/sql/sql.go index 98db61539..5af2d07ff 100644 --- a/reader/sql/sql.go +++ b/reader/sql/sql.go @@ -143,6 +143,12 @@ func (syncDBRecords *SyncDBRecords) SetTableRecords(db string, tableRecords Tabl syncDBRecords.mutex.Unlock() } +func (syncDBRecords *SyncDBRecords) SetTableInfo(db, table string, tableInfo TableInfo) { + syncDBRecords.mutex.Lock() + syncDBRecords.records.SetTableInfo(db, table, tableInfo) + syncDBRecords.mutex.Unlock() +} + func (syncDBRecords *SyncDBRecords) GetTableRecords(db string) TableRecords { syncDBRecords.mutex.RLock() tableRecords := syncDBRecords.records.GetTableRecords(db) @@ -177,6 +183,18 @@ func (dbRecords *DBRecords) SetTableRecords(db string, tableRecords TableRecords (*dbRecords)[db] = tableRecords } +func (dbRecords *DBRecords) SetTableInfo(db, table string, tableInfo TableInfo) { + if *dbRecords == nil { + *dbRecords = make(DBRecords) + } + tableRecords := (*dbRecords)[db] + if tableRecords == nil { + tableRecords = make(TableRecords) + } + tableRecords[table] = tableInfo + (*dbRecords)[db] = tableRecords +} + func (dbRecords *DBRecords) GetTableRecords(db string) TableRecords { if *dbRecords != nil { return (*dbRecords)[db] @@ -489,17 +507,17 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l return lastDB, lastTable, true } + omitDoneDBRecords = true recordsDoneLength := len(recordsDone) if recordsDoneLength <= 0 { return lastDB, lastTable, true } - omitDoneDBRecords = false for idx, record := range recordsDone { tmpDBRecords := TrimeList(strings.Split(record, sqlOffsetConnector)) if int64(len(tmpDBRecords)) != 2 { log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), record) - return lastDB, lastTable, true + continue } database := tmpDBRecords[0] @@ -512,16 +530,17 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l tmpTablesRecords := TrimeList(strings.Split(tmpDBRecords[1], "@")) if int64(len(tmpTablesRecords)) < 1 { log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), tmpDBRecords) - return lastDB, lastTable, true + continue } for idx, tableRecord := range tmpTablesRecords { tableRecordArr := strings.Split(tableRecord, ",") if int64(len(tableRecordArr)) != 4 { log.Errorf("Runner[%v] %v -meta records done file is not invalid sql records done file %v, omit meta data", meta.RunnerName, meta.MetaFile(), tableRecord) - return lastDB, lastTable, true + continue } + omitDoneDBRecords = false size, err := strconv.ParseInt(tableRecordArr[1], 10, 64) if err != nil { log.Errorf("Runner[%v] %v -meta file sql is out of date %v or parse size err %v, omit this offset", meta.RunnerName, meta.MetaFile(), tableRecordArr[1], err) @@ -544,13 +563,15 @@ func (dbRecords *SyncDBRecords) restoreRecordsFile(meta *reader.Meta) (lastDB, l } } - dbRecords.SetTableRecords(database, tableRecords) + if len(tableRecords) != 0 { + dbRecords.SetTableRecords(database, tableRecords) + } if idx == recordsDoneLength-1 { lastDB = database } } - return lastDB, lastTable, false + return lastDB, lastTable, omitDoneDBRecords } func convertMagic(magic string, now time.Time) (ret string) { @@ -1159,7 +1180,6 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR exit := false var tableName string var readSize int64 - tmpTablesRecords := r.syncRecords.GetTableRecords(curDB) for !exit { if r.rawsqls == "" && idx < tablesLen { tableName = tables[idx] @@ -1174,9 +1194,8 @@ func (r *Reader) execReadDB(curDB string, now time.Time, recordTablesDone TableR } if r.rawsqls == "" { - tmpTablesRecords.SetTableInfo(tableName, TableInfo{size: readSize, offset: -1}) - r.syncRecords.SetTableRecords(curDB, tmpTablesRecords) - r.doneRecords.SetTableRecords(curDB, tmpTablesRecords) + r.syncRecords.SetTableInfo(curDB, tableName, TableInfo{size: readSize, offset: -1}) + r.doneRecords.SetTableInfo(curDB, tableName, TableInfo{size: readSize, offset: -1}) recordTablesDone.SetTableInfo(tableName, TableInfo{size: readSize, offset: -1}) } @@ -1498,17 +1517,12 @@ func (r *Reader) SyncMeta() { dbRecords := r.syncRecords.GetDBRecords() for database, tablesRecord := range dbRecords { - var tablesRecordStr string for table, tableInfo := range tablesRecord { - tablesRecordStr += table + "," + + all += database + sqlOffsetConnector + table + "," + strconv.FormatInt(tableInfo.size, 10) + "," + strconv.FormatInt(tableInfo.offset, 10) + "," + - now + "@" - } - if tablesRecordStr == "" { - continue + now + "@" + "\n" } - all += database + sqlOffsetConnector + tablesRecordStr + "\n" } if len(all) <= 0 { @@ -1518,7 +1532,6 @@ func (r *Reader) SyncMeta() { if err := WriteRecordsFile(r.meta.DoneFilePath, all); err != nil { log.Errorf("Runner[%v] %v SyncMeta error %v", r.meta.RunnerName, r.Name(), err) - return } r.syncRecords.Reset() return diff --git a/reader/sql/sql_test.go b/reader/sql/sql_test.go index cc31cc75a..7697496c3 100644 --- a/reader/sql/sql_test.go +++ b/reader/sql/sql_test.go @@ -7,6 +7,7 @@ import ( "reflect" "strconv" "strings" + "sync" "testing" "time" @@ -494,7 +495,7 @@ func Test_checkMagic(t *testing.T) { } func TestSQLReader(t *testing.T) { - meta, err := getMeta() + meta, err := getMeta(MetaDir) assert.NoError(t, err) defer os.RemoveAll(MetaDir) database := "TestSQLReaderdatabase" @@ -858,7 +859,7 @@ func Test_equalTime(t *testing.T) { } func Test_isMatchData(t *testing.T) { - meta, err := getMeta() + meta, err := getMeta(MetaDir) assert.NoError(t, err) defer os.RemoveAll(MetaDir) mr := &Reader{ @@ -1274,10 +1275,10 @@ func getContent(readRecords DBRecords) string { return all } -func getMeta() (*reader.Meta, error) { +func getMeta(metaDir string) (*reader.Meta, error) { logkitConf := conf.MapConf{ - reader.KeyMetaPath: MetaDir, - reader.KeyFileDone: MetaDir, + reader.KeyMetaPath: metaDir, + reader.KeyFileDone: metaDir, reader.KeyMode: reader.ModeMySQL, } return reader.NewMetaWithConf(logkitConf) @@ -1285,7 +1286,7 @@ func getMeta() (*reader.Meta, error) { type DataTest struct { database string - createTable string + createTable []string insertData []string } @@ -1306,27 +1307,27 @@ var ( databasesTest = []DataTest{ { database: "Test_MySql20180510", - createTable: "CREATE TABLE runoob_tbl20180510est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;", + createTable: []string{"CREATE TABLE runoob_tbl20180510est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;"}, insertData: []string{"INSERT INTO runoob_tbl20180510est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());"}, }, { database: "Test_MySql20170610", - createTable: "CREATE TABLE runoob_tbl20170610est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;", + createTable: []string{"CREATE TABLE runoob_tbl20170610est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;"}, insertData: []string{"INSERT INTO runoob_tbl20170610est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());"}, }, { database: "Test_MySql20171210", - createTable: "CREATE TABLE runoob_tbl20171210est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;", + createTable: []string{"CREATE TABLE runoob_tbl20171210est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;"}, insertData: []string{"INSERT INTO runoob_tbl20171210est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());"}, }, { database: "Test_MySql20170910", - createTable: "CREATE TABLE runoob_tbl20170910est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;", + createTable: []string{"CREATE TABLE runoob_tbl20170910est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;"}, insertData: []string{"INSERT INTO runoob_tbl20170910est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());"}, }, { database: "Test_MySql20180110", - createTable: "CREATE TABLE runoob_tbl20180110est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;", + createTable: []string{"CREATE TABLE runoob_tbl20180110est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;"}, insertData: []string{"INSERT INTO runoob_tbl20180110est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());"}, }, } @@ -1334,7 +1335,7 @@ var ( todayDataTests = []DataTest{ { "Test_MySql" + year + month + day, - "CREATE TABLE runoob_tbl" + year + month + day + "est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;", + []string{"CREATE TABLE runoob_tbl" + year + month + day + "est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;"}, []string{"INSERT INTO runoob_tbl" + year + month + day + "est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());"}, }, } @@ -1360,7 +1361,7 @@ func TestMySql(t *testing.T) { // test exec on start runnerName := "mr" - mr, err := getMySqlReader(false, false, runnerName, CronInfo{}) + mr, err := getMySqlReader(false, false, false, runnerName, CronInfo{}) defer os.RemoveAll(MetaDir) assert.NoError(t, err) mrData, ok := mr.(reader.DataReader) @@ -1405,7 +1406,7 @@ func TestMySql(t *testing.T) { // test exec on start, sql not empty runnerName = "mrRawSql" - mrRawSql, err := getMySqlReader(false, true, runnerName, CronInfo{}) + mrRawSql, err := getMySqlReader(false, true, false, runnerName, CronInfo{}) assert.NoError(t, err) mrRawSqlData, ok := mrRawSql.(reader.DataReader) if !ok { @@ -1449,7 +1450,7 @@ func TestMySql(t *testing.T) { // test history all runnerName = "mrHistoryAll" - mrHistoryAll, err := getMySqlReader(true, false, runnerName, CronInfo{}) + mrHistoryAll, err := getMySqlReader(true, false, false, runnerName, CronInfo{}) assert.NoError(t, err) mrHistoryAllData, ok := mrHistoryAll.(reader.DataReader) if !ok { @@ -1474,7 +1475,7 @@ func TestMySql(t *testing.T) { mrHistoryAll.Close() // test file done in meta dir - mrHistoryAll2, err := getMySqlReader(true, false, runnerName, CronInfo{}) + mrHistoryAll2, err := getMySqlReader(true, false, false, runnerName, CronInfo{}) assert.NoError(t, err) mrHistoryAllData2, ok := mrHistoryAll2.(reader.DataReader) if !ok { @@ -1503,7 +1504,7 @@ func TestMySql(t *testing.T) { } // cron task, not exec on start runnerName = "mrCron" - mrCron, err := getMySqlReader(false, false, runnerName, CronInfo{true, secondAdd3, true}) + mrCron, err := getMySqlReader(false, false, false, runnerName, CronInfo{true, secondAdd3, true}) assert.NoError(t, err) mrCronData, ok := mrCron.(reader.DataReader) if !ok { @@ -1534,7 +1535,7 @@ func TestMySql(t *testing.T) { } // cron task, exec on start runnerName = "mrCronExecOnStart" - mrCronExecOnStart, err := getMySqlReader(false, false, runnerName, CronInfo{true, secondAdd3, false}) + mrCronExecOnStart, err := getMySqlReader(false, false, false, runnerName, CronInfo{true, secondAdd3, false}) assert.NoError(t, err) mrCronExecOnStartData, ok := mrCronExecOnStart.(reader.DataReader) if !ok { @@ -1559,7 +1560,7 @@ func TestMySql(t *testing.T) { mrCronExecOnStart.SyncMeta() mrCronExecOnStart.Close() - mrCronExecOnStart2, err := getMySqlReader(false, false, runnerName, CronInfo{true, secondAdd3, false}) + mrCronExecOnStart2, err := getMySqlReader(false, false, false, runnerName, CronInfo{true, secondAdd3, false}) assert.NoError(t, err) mrCronExecOnStartData2, ok := mrCronExecOnStart2.(reader.DataReader) if !ok { @@ -1583,9 +1584,55 @@ func TestMySql(t *testing.T) { assert.Equal(t, 0, dataLine) mrCronExecOnStart2.SyncMeta() mrCronExecOnStart2.Close() + + minDataTestsLine, _, err = setSecond() + if err != nil { + t.Errorf("prepare mysql database failed: %v", err) + } + // cron task, exec on start + runnerName = "mrLoopcOnStart" + mrLoopOnStart, err := getMySqlReader(false, false, true, runnerName, CronInfo{false, "", false}) + assert.NoError(t, err) + mrLoopOnStartData, ok := mrLoopOnStart.(reader.DataReader) + if !ok { + t.Error("mysql read should have readdata interface") + } + dataLine = 0 + before = time.Now() + log.Infof("before: %v", before) + for !batchTimeout(before, 5) { + data, bytes, err := mrLoopOnStartData.ReadData() + if err != nil { + t.Error(err) + } + if len(data) <= 0 { + continue + } + assert.Equal(t, int64(36), bytes) + assert.Equal(t, expectData, data) + dataLine++ + } + assert.Equal(t, minDataTestsLine, dataLine) + mrLoopOnStart.SyncMeta() + + meta, err := getMeta(path.Join(MetaDir, runnerName)) + assert.NoError(t, err) + var doneRecords = SyncDBRecords{ + mutex: sync.RWMutex{}, + } + lastDB, lastTable, omitDoneFile := doneRecords.restoreRecordsFile(meta) + assert.False(t, omitDoneFile) + assert.Equal(t, 1, len(doneRecords.records)) + expectDB := "Test_MySql" + year + month + day + assert.Equal(t, 2, len(doneRecords.records.GetTableRecords(expectDB))) + assert.Equal(t, expectDB, lastDB) + assert.NotEmpty(t, lastTable) + + mrLoopOnStart.Close() + } -func getMySqlReader(historyAll, rawsql bool, runnerName string, cronInfo CronInfo) (reader.Reader, error) { +func getMySqlReader(historyAll, rawsql, loop bool, runnerName string, cronInfo CronInfo) (reader.Reader, error) { readerConf := conf.MapConf{ "mysql_database": "Test_MySql@(YYYY)@(MM)@(DD)", "mysql_table": "runoob_tbl@(YYYY)@(MM)@(DD)est", @@ -1614,6 +1661,10 @@ func getMySqlReader(historyAll, rawsql bool, runnerName string, cronInfo CronInf if cronInfo.notExecOnStart { readerConf["mysql_exec_onstart"] = "false" } + if loop { + readerConf["mysql_cron"] = "loop 1s" + readerConf["mysql_table"] = "runoob_tbl@(YYYY)@(MM)@(DD)@(ss)est" + } mr, err := reader.NewReader(readerConf, true) if err != nil { return nil, err @@ -1648,9 +1699,11 @@ func prepareMysql() error { return err } - _, err = db.Exec(dbInfo.createTable) - if err != nil { - return err + for _, createTable := range dbInfo.createTable { + _, err = db.Exec(createTable) + if err != nil { + return err + } } for _, data := range dbInfo.insertData { @@ -1707,7 +1760,7 @@ func setMinute() (int, string, error) { var minDataTests = []DataTest{ { "Test_MySql" + year + month + day + minute, - "CREATE TABLE runoob_tbl" + year + month + day + minute + "est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;", + []string{"CREATE TABLE runoob_tbl" + year + month + day + minute + "est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;"}, []string{"INSERT INTO runoob_tbl" + year + month + day + minute + "est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());"}, }, } @@ -1722,3 +1775,30 @@ func setMinute() (int, string, error) { } return minDataTestsLine, secondAdd3, nil } + +func setSecond() (int, string, error) { + var ( + nowCron = time.Now() + secondAdd2 = getDateStr((nowCron.Second() + 2) % 60) + secondAdd4 = getDateStr((nowCron.Second() + 4) % 60) + minute = getDateStr(nowCron.Minute()) + ) + var minDataTests = []DataTest{ + { + "Test_MySql" + year + month + day, + []string{"CREATE TABLE runoob_tbl" + year + month + day + secondAdd2 + "est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;", + "CREATE TABLE runoob_tbl" + year + month + day + secondAdd4 + "est(runoob_id INT NOT NULL AUTO_INCREMENT,runoob_title VARCHAR(100) NOT NULL,runoob_author VARCHAR(40) NOT NULL,submission_date DATE,PRIMARY KEY ( runoob_id ))ENGINE=InnoDB DEFAULT CHARSET=utf8;"}, + []string{"INSERT INTO runoob_tbl" + year + month + day + secondAdd2 + "est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());", + "INSERT INTO runoob_tbl" + year + month + day + secondAdd4 + "est (runoob_title, runoob_author, submission_date) VALUES (\"学习 mysql\", \"教程\", NOW());"}, + }, + } + log.Infof("time now cron: %v, minute: %v, secondAdd2: %v, secondAdd4: %v", nowCron, minute, secondAdd2, secondAdd4) + databasesTest = append(databasesTest, minDataTests...) + if err := cleanMysql(); err != nil { + return 0, "", err + } + if err := prepareMysql(); err != nil { + return 0, "", err + } + return 2, "", nil +}