diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index 1b86ee482f362..e89eb1767fb3b 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -41,7 +41,9 @@ import ( "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" "github.com/tikv/pd/server/api" pdconfig "github.com/tikv/pd/server/config" @@ -577,7 +579,7 @@ func hasDefault(col *model.ColumnInfo) bool { col.IsGenerated() || mysql.HasAutoIncrementFlag(col.Flag) } -func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta mydump.SourceFileMeta) (cols []string, colCnt int, err error) { +func (rc *Controller) readFirstRow(ctx context.Context, dataFileMeta mydump.SourceFileMeta) (cols []string, row []types.Datum, err error) { var reader storage.ReadSeekCloser if dataFileMeta.Type == mydump.SourceTypeParquet { reader, err = mydump.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize) @@ -585,7 +587,7 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta mydu reader, err = rc.store.Open(ctx, dataFileMeta.Path) } if err != nil { - return nil, 0, errors.Trace(err) + return nil, nil, errors.Trace(err) } var parser mydump.Parser @@ -596,18 +598,18 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta mydu // Create a utf8mb4 convertor to encode and decode data with the charset of CSV files. charsetConvertor, err := mydump.NewCharsetConvertor(rc.cfg.Mydumper.DataCharacterSet, rc.cfg.Mydumper.DataInvalidCharReplace) if err != nil { - return nil, 0, errors.Trace(err) + return nil, nil, errors.Trace(err) } parser, err = mydump.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader, charsetConvertor) if err != nil { - return nil, 0, errors.Trace(err) + return nil, nil, errors.Trace(err) } case mydump.SourceTypeSQL: parser = mydump.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) case mydump.SourceTypeParquet: parser, err = mydump.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path) if err != nil { - return nil, 0, errors.Trace(err) + return nil, nil, errors.Trace(err) } default: panic(fmt.Sprintf("unknown file type '%s'", dataFileMeta.Type)) @@ -616,13 +618,18 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta mydu err = parser.ReadRow() if err != nil && errors.Cause(err) != io.EOF { - return nil, 0, errors.Trace(err) + return nil, nil, errors.Trace(err) } - return parser.Columns(), len(parser.LastRow().Row), nil + return parser.Columns(), parser.LastRow().Row, nil } // SchemaIsValid checks the import file and cluster schema is match. func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, error) { + if len(tableInfo.DataFiles) == 0 { + log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) + return nil, nil + } + msgs := make([]string, 0) info, ok := rc.dbInfos[tableInfo.DB].Tables[tableInfo.Name] if !ok { @@ -640,11 +647,6 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab igCols[col] = struct{}{} } - if len(tableInfo.DataFiles) == 0 { - log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) - return nil, nil - } - colCountFromTiDB := len(info.Core.Columns) core := info.Core defaultCols := make(map[string]struct{}) @@ -658,86 +660,264 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab defaultCols[model.ExtraHandleName.String()] = struct{}{} // only check the first file of this table. - if len(tableInfo.DataFiles) > 0 { - dataFile := tableInfo.DataFiles[0] - log.L().Info("datafile to check", zap.String("db", tableInfo.DB), - zap.String("table", tableInfo.Name), zap.String("path", dataFile.FileMeta.Path)) - // get columns name from data file. - dataFileMeta := dataFile.FileMeta - - if tp := dataFileMeta.Type; tp != mydump.SourceTypeCSV && tp != mydump.SourceTypeSQL && tp != mydump.SourceTypeParquet { - msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String())) - return msgs, nil - } - colsFromDataFile, colCountFromDataFile, err := rc.readColumnsAndCount(ctx, dataFileMeta) - if err != nil { - return nil, errors.Trace(err) - } - if colsFromDataFile == nil && colCountFromDataFile == 0 { - log.L().Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path)) - return msgs, nil + dataFile := tableInfo.DataFiles[0] + log.L().Info("datafile to check", zap.String("db", tableInfo.DB), + zap.String("table", tableInfo.Name), zap.String("path", dataFile.FileMeta.Path)) + // get columns name from data file. + dataFileMeta := dataFile.FileMeta + + if tp := dataFileMeta.Type; tp != mydump.SourceTypeCSV && tp != mydump.SourceTypeSQL && tp != mydump.SourceTypeParquet { + msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String())) + return msgs, nil + } + colsFromDataFile, row, err := rc.readFirstRow(ctx, dataFileMeta) + if err != nil { + return nil, errors.Trace(err) + } + if colsFromDataFile == nil && len(row) == 0 { + log.L().Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path)) + return msgs, nil + } + + if colsFromDataFile == nil { + // when there is no columns name in data file. we must insert data in order. + // so the last several columns either can be ignored or has a default value. + for i := len(row); i < colCountFromTiDB; i++ { + if _, ok := defaultCols[core.Columns[i].Name.L]; !ok { + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` has %d columns,"+ + "and data file has %d columns, but column %s are missing the default value,"+ + "please give column a default value to skip this check", + tableInfo.DB, tableInfo.Name, colCountFromTiDB, len(row), core.Columns[i].Name.L)) + } } + return msgs, nil + } - if colsFromDataFile == nil { - // when there is no columns name in data file. we must insert data in order. - // so the last several columns either can be ignored or has a default value. - for i := colCountFromDataFile; i < colCountFromTiDB; i++ { - if _, ok := defaultCols[core.Columns[i].Name.L]; !ok { - msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` has %d columns,"+ - "and data file has %d columns, but column %s are missing the default value,"+ - "please give column a default value to skip this check", - tableInfo.DB, tableInfo.Name, colCountFromTiDB, colCountFromDataFile, core.Columns[i].Name.L)) - } + // compare column names and make sure + // 1. TiDB table info has data file's all columns(besides ignore columns) + // 2. Those columns not introduced in data file always have a default value. + colMap := make(map[string]struct{}) + for col := range igCols { + colMap[col] = struct{}{} + } + for _, col := range core.Columns { + if _, ok := colMap[col.Name.L]; ok { + // tidb's column is ignored + // we need ensure this column has the default value. + if _, hasDefault := defaultCols[col.Name.L]; !hasDefault { + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s`'s column %s cannot be ignored,"+ + "because it doesn't have a default value, please set tables.ignoreColumns properly", + tableInfo.DB, tableInfo.Name, col.Name.L)) } } else { - // compare column names and make sure - // 1. TiDB table info has data file's all columns(besides ignore columns) - // 2. Those columns not introduced in data file always have a default value. - colMap := make(map[string]struct{}) - for col := range igCols { - colMap[col] = struct{}{} + colMap[col.Name.L] = struct{}{} + } + } + // tidb_rowid can be ignored in check + colMap[model.ExtraHandleName.String()] = struct{}{} + for _, col := range colsFromDataFile { + if _, ok := colMap[col]; !ok { + checkMsg := "please check table schema" + if dataFileMeta.Type == mydump.SourceTypeCSV && rc.cfg.Mydumper.CSV.Header { + checkMsg += " and csv file header" } - for _, col := range core.Columns { - if _, ok := colMap[col.Name.L]; ok { - // tidb's column is ignored - // we need ensure this column has the default value. - if _, hasDefault := defaultCols[col.Name.L]; !hasDefault { - msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s`'s column %s cannot be ignored,"+ - "because it doesn't hava a default value, please set tables.ignoreColumns properly", - tableInfo.DB, tableInfo.Name, col.Name.L)) - } - } else { - colMap[col.Name.L] = struct{}{} - } + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have column %s, "+ + "%s or use tables.ignoreColumns to ignore %s", + tableInfo.DB, tableInfo.Name, col, checkMsg, col)) + } else { + // remove column for next iteration + delete(colMap, col) + } + } + // if theses rest columns don't have a default value. + for col := range colMap { + if _, ok := defaultCols[col]; ok { + continue + } + msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have the default value for %s"+ + "please give a default value for %s or choose another column to ignore or add this column in data file", + tableInfo.DB, tableInfo.Name, col, col)) + } + return msgs, nil +} + +// checkCSVHeader try to check whether the csv header config is consistent with the source csv files by: +// 1. pick one table with two CSV files and a unique/primary key +// 2. read the first row of those two CSV files +// 3. checks if the content of those first rows are compatible with the table schema, and whether the +// two rows are identical, to determine if the first rows are a header rows. +func (rc *Controller) checkCSVHeader(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta) error { + // if cfg set header = ture but source files actually contain not header, former SchemaCheck should + // return error in this situation, so we need do it again. + if rc.cfg.Mydumper.CSV.Header { + return nil + } + var ( + tableMeta *mydump.MDTableMeta + csvCount int + hasUniqueIdx bool + ) + // only check one table source files for better performance. The checked table is chosen based on following two factor: + // 1. contains at least 1 csv source file, 2 is preferable + // 2. table schema contains primary key or unique key + // if the two factors can't be both satisfied, the first one has a higher priority +outer: + for _, dbMeta := range dbMetas { + for _, tblMeta := range dbMeta.Tables { + if len(tblMeta.DataFiles) == 0 { + continue } - // tidb_rowid can be ignored in check - colMap[model.ExtraHandleName.String()] = struct{}{} - for _, col := range colsFromDataFile { - if _, ok := colMap[col]; !ok { - checkMsg := "please check table schema" - if dataFileMeta.Type == mydump.SourceTypeCSV && rc.cfg.Mydumper.CSV.Header { - checkMsg += " and csv file header" + tableHasUniqueIdx := false + tableCSVCount := 0 + for _, f := range tblMeta.DataFiles { + if f.FileMeta.Type == mydump.SourceTypeCSV { + tableCSVCount++ + if tableCSVCount >= 2 { + break } - msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have column %s, "+ - "%s or use tables.ignoreColumns to ignore %s", - tableInfo.DB, tableInfo.Name, col, checkMsg, col)) - } else { - // remove column for next iteration - delete(colMap, col) } } - // if theses rest columns don't have a default value. - for col := range colMap { - if _, ok := defaultCols[col]; ok { - continue + if tableCSVCount == 0 { + continue + } + + info := rc.dbInfos[tblMeta.DB].Tables[tblMeta.Name] + for _, idx := range info.Core.Indices { + if idx.Primary || idx.Unique { + tableHasUniqueIdx = true } - msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have the default value for %s"+ - "please give a default value for %s or choose another column to ignore or add this column in data file", - tableInfo.DB, tableInfo.Name, col, col)) + } + + if tableCSVCount >= 2 && hasUniqueIdx { + tableMeta = tblMeta + csvCount = tableCSVCount + hasUniqueIdx = tableHasUniqueIdx + // if a perfect table source is found, we can stop check more tables + break outer + } + if tableCSVCount > csvCount || (tableCSVCount == csvCount && !hasUniqueIdx && tableHasUniqueIdx) { + tableMeta = tblMeta + csvCount = tableCSVCount + hasUniqueIdx = tableHasUniqueIdx } } } - return msgs, nil + + if tableMeta == nil { + return nil + } + + var rows [][]types.Datum + for _, f := range tableMeta.DataFiles { + if f.FileMeta.Type != mydump.SourceTypeCSV { + continue + } + _, row, err := rc.readFirstRow(ctx, f.FileMeta) + if err != nil { + return errors.Trace(err) + } + if len(row) > 0 { + rows = append(rows, row) + } + // only check at most two of all the files + if len(rows) >= 2 { + break + } + } + if len(rows) == 0 { + return nil + } else if len(rows) >= 2 { + // if the first row in two source files are not the same, they should not be the header line + // NOTE: though lightning's logic allows different source files contains different columns or the + // order is difference, here we only check if they are exactly the same because this is the common case. + if len(rows[0]) != len(rows[1]) { + return nil + } + + for i := 0; i < len(rows[0]); i++ { + if rows[0][i].GetString() != rows[1][i].GetString() { + return nil + } + } + } + + // check if some fields are unique and not ignored + // if at least one field appears in a unique key, we can sure there is something wrong, + // they should be either the header line or the data is duplicated. + tableInfo := rc.dbInfos[tableMeta.DB].Tables[tableMeta.Name] + tableFields := make(map[string]struct{}) + uniqueIdxFields := make(map[string]struct{}) + ignoreColumns, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableMeta.DB, tableMeta.Name, rc.cfg.Mydumper.CaseSensitive) + if err != nil { + return errors.Trace(err) + } + ignoreColsSet := make(map[string]struct{}) + for _, col := range ignoreColumns.Columns { + ignoreColsSet[col] = struct{}{} + } + for _, idx := range tableInfo.Core.Indices { + if !idx.Unique && !idx.Primary { + continue + } + for _, col := range idx.Columns { + if _, ok := ignoreColsSet[col.Name.L]; !ok { + uniqueIdxFields[col.Name.L] = struct{}{} + } + } + } + for _, f := range tableInfo.Core.Columns { + tableFields[f.Name.L] = struct{}{} + } + if common.TableHasAutoRowID(tableInfo.Core) { + tableFields[model.ExtraHandleName.L] = struct{}{} + } + hasUniqueField := false + for _, d := range rows[0] { + val := strings.ToLower(d.GetString()) + if _, ok := tableFields[val]; !ok { + return nil + } + if _, ok := uniqueIdxFields[val]; ok { + hasUniqueField = true + break + } + } + + msg := fmt.Sprintf("source csv files contains header row but `mydumper.csv.header` is false, checked table is `%s`.`%s`", + tableMeta.DB, tableMeta.Name) + level := Warn + if hasUniqueField && len(rows) > 1 { + level = Critical + } else if !checkFieldCompatibility(tableInfo.Core, ignoreColsSet, rows[0]) { + // if there are only 1 csv file or there is not unique key, try to check if all columns are compatible with string value + level = Critical + } + rc.checkTemplate.Collect(level, false, msg) + + return nil +} + +func checkFieldCompatibility(tbl *model.TableInfo, ignoreCols map[string]struct{}, values []types.Datum) bool { + se := kv.NewSession(&kv.SessionOptions{ + SQLMode: mysql.ModeStrictTransTables, + }) + for i, col := range tbl.Columns { + // do not check ignored columns + if _, ok := ignoreCols[col.Name.L]; ok { + continue + } + if i >= len(values) { + break + } + _, err := table.CastValue(se, values[i], col, true, false) + if err != nil { + log.L().Error("field value is not consistent with column type", zap.String("value", values[i].GetString()), + zap.Any("column_info", col), zap.Error(err)) + return false + } + } + + return true } func (rc *Controller) sampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error { diff --git a/br/pkg/lightning/restore/check_info_test.go b/br/pkg/lightning/restore/check_info_test.go new file mode 100644 index 0000000000000..e1dd939d9c2b1 --- /dev/null +++ b/br/pkg/lightning/restore/check_info_test.go @@ -0,0 +1,405 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package restore + +import ( + "context" + "fmt" + "os" + "path/filepath" + + . "github.com/pingcap/check" + + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/lightning/worker" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + tmock "github.com/pingcap/tidb/util/mock" +) + +var _ = Suite(&checkInfoSuite{}) + +type checkInfoSuite struct{} + +const passed CheckType = "pass" + +func (s *checkInfoSuite) TestCheckCSVHeader(c *C) { + dir := c.MkDir() + ctx := context.Background() + mockStore, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + + type tableSource struct { + Name string + SQL string + Sources []string + } + + cases := []struct { + ignoreColumns []*config.IgnoreColumns + // empty msg means check pass + level CheckType + Sources map[string][]*tableSource + }{ + + { + nil, + + passed, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8))", + []string{ + "aa,b\r\n", + }, + }, + }, + }, + }, + { + nil, + + passed, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8))", + []string{ + "a,b\r\ntest1,test2\r\n", + "aa,b\r\n", + }, + }, + }, + }, + }, + { + nil, + + Warn, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8))", + []string{ + "a,b\r\n", + }, + }, + }, + }, + }, + { + nil, + + Warn, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8))", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\n", + }, + }, + }, + }, + }, + { + nil, + + Warn, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8), PRIMARY KEY (`a`))", + []string{ + "a,b\r\ntest1,test2\r\n", + }, + }, + }, + }, + }, + { + nil, + + Critical, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8), PRIMARY KEY (`a`))", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\r\n", + }, + }, + }, + }, + }, + // ignore primary key, should still be warn + { + []*config.IgnoreColumns{ + { + DB: "db", + Table: "tbl1", + Columns: []string{"a"}, + }, + }, + Warn, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8), PRIMARY KEY (`a`))", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\r\n", + }, + }, + }, + }, + }, + // ignore primary key, but has other unique key + { + []*config.IgnoreColumns{ + { + DB: "db", + Table: "tbl1", + Columns: []string{"a"}, + }, + }, + Critical, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8), PRIMARY KEY (`a`), unique key uk (`b`))", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\r\n", + }, + }, + }, + }, + }, + // ignore primary key, non other unique key + { + []*config.IgnoreColumns{ + { + DB: "db", + Table: "tbl1", + Columns: []string{"a"}, + }, + }, + Warn, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(16), b varchar(8), PRIMARY KEY (`a`), KEY idx_b (`b`))", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\r\n", + }, + }, + }, + }, + }, + // non unique key, but data type inconsistent + { + nil, + Critical, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a bigint, b varchar(8));", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\r\n", + }, + }, + }, + }, + }, + // non unique key, but ignore inconsistent field + { + []*config.IgnoreColumns{ + { + DB: "db", + Table: "tbl1", + Columns: []string{"a"}, + }, + }, + Warn, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a bigint, b varchar(8));", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\r\n", + }, + }, + }, + }, + }, + // multiple tables, test the choose priority + { + nil, + Critical, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(8), b varchar(8));", + []string{ + "a,b\r\ntest1,test2\r\n", + }, + }, + { + "tbl2", + "create table tbl1 (a varchar(8) primary key, b varchar(8));", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\r\n", + }, + }, + }, + }, + }, + { + nil, + Critical, + map[string][]*tableSource{ + "db": { + { + "tbl1", + "create table tbl1 (a varchar(8), b varchar(8));", + []string{ + "a,b\r\ntest1,test2\r\n", + }, + }, + }, + "db2": { + { + "tbl2", + "create table tbl1 (a bigint, b varchar(8));", + []string{ + "a,b\r\ntest1,test2\r\n", + "a,b\r\ntest3,test4\r\n", + }, + }, + }, + }, + }, + } + + cfg := &config.Config{ + Mydumper: config.MydumperRuntime{ + ReadBlockSize: config.ReadBlockSize, + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Header: false, + NotNull: false, + Null: `\N`, + BackslashEscape: true, + TrimLastSep: false, + }, + }, + } + rc := &Controller{ + cfg: cfg, + store: mockStore, + ioWorkers: worker.NewPool(context.Background(), 1, "io"), + } + + p := parser.New() + p.SetSQLMode(mysql.ModeANSIQuotes) + se := tmock.NewContext() + + for _, ca := range cases { + rc.checkTemplate = NewSimpleTemplate() + cfg.Mydumper.IgnoreColumns = ca.ignoreColumns + rc.dbInfos = make(map[string]*checkpoints.TidbDBInfo) + + dbMetas := make([]*mydump.MDDatabaseMeta, 0) + for db, tbls := range ca.Sources { + tblMetas := make([]*mydump.MDTableMeta, 0, len(tbls)) + dbInfo := &checkpoints.TidbDBInfo{ + Name: db, + Tables: make(map[string]*checkpoints.TidbTableInfo), + } + rc.dbInfos[db] = dbInfo + + for _, tbl := range tbls { + node, err := p.ParseOneStmt(tbl.SQL, "", "") + c.Assert(err, IsNil) + core, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 0xabcdef) + c.Assert(err, IsNil) + core.State = model.StatePublic + dbInfo.Tables[tbl.Name] = &checkpoints.TidbTableInfo{ + ID: core.ID, + DB: db, + Name: tbl.Name, + Core: core, + } + + fileInfos := make([]mydump.FileInfo, 0, len(tbl.Sources)) + for i, s := range tbl.Sources { + fileName := fmt.Sprintf("%s.%s.%d.csv", db, tbl.Name, i) + err = os.WriteFile(filepath.Join(dir, fileName), []byte(s), 0o644) + c.Assert(err, IsNil) + fileInfos = append(fileInfos, mydump.FileInfo{ + FileMeta: mydump.SourceFileMeta{ + Path: fileName, + Type: mydump.SourceTypeCSV, + FileSize: int64(len(s)), + }, + }) + } + tblMetas = append(tblMetas, &mydump.MDTableMeta{ + DB: db, + Name: tbl.Name, + DataFiles: fileInfos, + }) + } + dbMetas = append(dbMetas, &mydump.MDDatabaseMeta{ + Name: db, + Tables: tblMetas, + }) + } + + err := rc.checkCSVHeader(ctx, dbMetas) + c.Assert(err, IsNil) + if ca.level != passed { + c.Assert(rc.checkTemplate.FailedCount(ca.level), Equals, 1) + } + } + +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 9d05f1b9a8f61..6e666d1954524 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1968,6 +1968,11 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } } } + err = rc.checkCSVHeader(ctx, rc.dbMetas) + if err != nil { + return err + } + if len(checkPointCriticalMsgs) != 0 { rc.checkTemplate.Collect(Critical, false, strings.Join(checkPointCriticalMsgs, "\n")) } else {