Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: let ignore columns be compatible with tidb backend #27850

Merged
merged 32 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cf67771
let ignore columns be compatible with tidb backend
glorv Sep 7, 2021
7241543
Merge branch 'master' into fix-ignore-cols
glorv Sep 7, 2021
c8e1f7f
fix ut
glorv Sep 7, 2021
7ded42f
Merge branch 'fix-ignore-cols' of ssh://github.com/glorv/tidb into fi…
glorv Sep 7, 2021
c677ba0
Merge branch 'master' into fix-ignore-cols
glorv Sep 7, 2021
ddec21a
fix
glorv Sep 8, 2021
263d7ed
Merge branch 'fix-ignore-cols' of ssh://github.com/glorv/tidb into fi…
glorv Sep 8, 2021
466bd14
Merge branch 'master' into fix-ignore-cols
glorv Sep 8, 2021
c66cdf5
fmt code
glorv Sep 8, 2021
62ab23e
Merge branch 'master' into fix-ignore-cols
glorv Sep 8, 2021
6fc3880
Merge branch 'master' into fix-ignore-cols
glorv Sep 28, 2021
5ecb9c6
Merge branch 'master' of ssh://github.com/pingcap/tidb into fix-ignor…
glorv Nov 2, 2021
59dd643
Merge branch 'master' into fix-ignore-cols
3pointer Nov 19, 2021
9d22c6c
fix generated columns
glorv Nov 19, 2021
e594e8d
Merge branch 'master' into fix-ignore-cols
glorv Nov 19, 2021
5f241ed
fix test
glorv Nov 19, 2021
bd4cbb5
fix generated columns
glorv Nov 19, 2021
0c35b75
Merge branch 'master' into fix-ignore-cols
glorv Nov 19, 2021
1cd4967
fix build
glorv Nov 19, 2021
4af3a35
fix
glorv Nov 22, 2021
b024206
Merge branch 'master' into fix-ignore-cols
glorv Nov 22, 2021
9fe66e2
do not check tableHasAutoID fro tidb backend
glorv Nov 22, 2021
110ef36
Merge branch 'master' into fix-ignore-cols
glorv Nov 22, 2021
50addca
Merge branch 'master' into fix-ignore-cols
glorv Nov 22, 2021
afb8cf7
slow down write speed for lightning_distributed_import
glorv Nov 23, 2021
88ba4bc
Merge branch 'master' of ssh://github.com/pingcap/tidb into fix-ignor…
glorv Nov 23, 2021
5459b6a
fix unit test
glorv Nov 23, 2021
80d22a0
fix test
glorv Nov 23, 2021
a149d7f
Merge branch 'master' into fix-ignore-cols
glorv Nov 23, 2021
8935105
Merge branch 'master' of ssh://github.com/pingcap/tidb into fix-ignor…
glorv Nov 23, 2021
7c10715
Merge branch 'fix-ignore-cols' of ssh://github.com/glorv/tidb into fi…
glorv Nov 23, 2021
58f035d
rollback change
glorv Nov 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type tidbEncoder struct {
// the index of table columns for each data field.
// index == len(table.columns) means this field is `_tidb_rowid`
columnIdx []int
// the max index used in this chunk, due to the ignore-columns config, we can't
// directly check the total column count, so we fall back to only check that
// the there are enough columns.
columnCnt int
}

Expand Down Expand Up @@ -284,22 +287,27 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnCount := 0
columnMaxIdx := -1
columnIdx := make([]int, len(columnPermutation))
for i := 0; i < len(columnPermutation); i++ {
columnIdx[i] = -1
}
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
columnCount++
if idx > columnMaxIdx {
columnMaxIdx = idx
}
}
}
enc.columnIdx = columnIdx
enc.columnCnt = columnCount
enc.columnCnt = columnMaxIdx + 1
}

// TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently.
// See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the
// column permutation with default, thus enc.columnCnt > len(row).
if len(row) > enc.columnCnt {
if len(row) < enc.columnCnt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain why len(row) should be greater than or equal to columnCnt(which is actual columenMaxIdx + 1)?

Suppose a table has 5 columns, namely a, b, c, d, e. We have a SQL statement in a source file like insert into t(a, d, c) values(xxx, xxx, xxx). Then the columnPermutation should be (0, -1, 2, 1, -1, -1), columnCnt = columnMaxIdx + 1 = 4. len(row) is 3 which is smaller than columnCnt, so in this scenario, data cannot be imported correctly.

Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gozssky In this case, the columnMaxIdx should be 2, so columnCnt = 3.

This check is used to handle there are ignored columns. Giving a source file insert into t(a, d, c, f) values(xxx, xxx, xxx, xx), and ignore field f in lightning's config, this is still a valid source file. Then len(row) (4) will bigger than columnCnt (3).

BTW, there may be case that row count is smaller than column count if user manually create table schema has extra column with default value, e.g. updated timestamp default current_timestamp, then current lightning will result in an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mistook i for idx 😂.

logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
zap.Array("data", kv.RowArrayMarshaler(row)))
return emptyTiDBRow, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row))
Expand All @@ -308,8 +316,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
cnt := 0
for i, field := range row {
if i != 0 {
if enc.columnIdx[i] < 0 {
continue
}
if cnt > 0 {
encoded.WriteByte(',')
}
datum := field
Expand All @@ -321,6 +333,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
)
return nil, err
}
cnt++
}
encoded.WriteByte(')')
return tidbRow{
Expand Down Expand Up @@ -569,7 +582,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
serverInfo := version.ParseServerInfo(versionStr)

rows, e := tx.Query(`
SELECT table_name, column_name, column_type, extra
SELECT table_name, column_name, column_type, generation_expression, extra
FROM information_schema.columns
WHERE table_schema = ?
ORDER BY table_name, ordinal_position;
Expand All @@ -585,8 +598,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
curTable *model.TableInfo
)
for rows.Next() {
var tableName, columnName, columnType, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
return e
}
if tableName != curTableName {
Expand Down Expand Up @@ -615,6 +628,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
FieldType: types.FieldType{
Flag: flag,
},
GeneratedExprString: generationExpr,
})
curColOffset++
}
Expand Down
30 changes: 17 additions & 13 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *mysqlSuite) TearDownTest(c *C) {

func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
s.mockDB.
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`b`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(-9223372036854775808,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
Expand All @@ -102,6 +102,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
perms = append(perms, i)
}
perms = append(perms, -1)
// skip column a,c due to ignore-columns
perms[0] = -1
perms[2] = -1
encoder, err := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890})
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
Expand Down Expand Up @@ -254,10 +257,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_3_x(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v3.0.18"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "int(10)", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "int(10)", "", "auto_increment"))
s.mockDB.ExpectCommit()

bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig()))
Expand Down Expand Up @@ -286,10 +289,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_0(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.0"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "", "auto_increment"))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID"}).
AddRow("test", "t", "id", int64(1)))
Expand Down Expand Up @@ -321,10 +324,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_increment(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT"))
Expand Down Expand Up @@ -356,10 +359,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "1 + 2", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_RANDOM"))
Expand All @@ -382,6 +385,7 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) {
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag,
},
GeneratedExprString: "1 + 2",
},
},
},
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,14 @@ type IgnoreColumns struct {
Columns []string `toml:"columns" json:"columns"`
}

func (ic *IgnoreColumns) ColumnsMap() map[string]struct{} {
columnMap := make(map[string]struct{}, len(ic.Columns))
for _, c := range ic.Columns {
columnMap[c] = struct{}{}
}
return columnMap
}

// GetIgnoreColumns gets Ignore config by schema name/regex and table name/regex.
func (igCols AllIgnoreColumns) GetIgnoreColumns(db string, table string, caseSensitive bool) (*IgnoreColumns, error) {
if !caseSensitive {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/lightning_server_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func TestHTTPAPIOutsideServerMode(t *testing.T) {

errCh := make(chan error)
cfg := config.NewConfig()
cfg.TiDB.DistSQLScanConcurrency = 4
err := cfg.LoadFromGlobal(s.lightning.globalCfg)
require.NoError(t, err)
go func() {
Expand Down
7 changes: 2 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,11 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
return msgs, nil
}

igCols := make(map[string]struct{})
igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive)
if err != nil {
return nil, errors.Trace(err)
}
for _, col := range igCol.Columns {
igCols[col] = struct{}{}
}
igCols := igCol.ColumnsMap()

colCountFromTiDB := len(info.Core.Columns)
core := info.Core
Expand Down Expand Up @@ -996,7 +993,7 @@ outloop:
case nil:
if !initializedColumns {
if len(columnPermutation) == 0 {
columnPermutation, err = createColumnPermutation(columnNames, igCols.Columns, tableInfo)
columnPermutation, err = createColumnPermutation(columnNames, igCols.ColumnsMap(), tableInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down
34 changes: 32 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.Columns)
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -2306,6 +2306,16 @@ func (cr *chunkRestore) encodeLoop(

pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs
initializedColumns, reachEOF := false, false
// filteredColumns is column names that excluded ignored columns
// WARN: this might be not correct when different SQL statements contains different fields,
// but since ColumnPermutation also depends on the hypothesis that the columns in one source file is the same
// so this should be ok.
var filteredColumns []string
ignoreColumns, err1 := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(t.dbInfo.Name, t.tableInfo.Core.Name.O, rc.cfg.Mydumper.CaseSensitive)
if err1 != nil {
err = err1
return
}
for !reachEOF {
if err = pauser.Wait(ctx); err != nil {
return
Expand Down Expand Up @@ -2336,6 +2346,26 @@ func (cr *chunkRestore) encodeLoop(
return
}
}
filteredColumns = columnNames
if ignoreColumns != nil && len(ignoreColumns.Columns) > 0 {
filteredColumns = make([]string, 0, len(columnNames))
ignoreColsMap := ignoreColumns.ColumnsMap()
if len(columnNames) > 0 {
for _, c := range columnNames {
if _, ok := ignoreColsMap[c]; !ok {
filteredColumns = append(filteredColumns, c)
}
}
} else {
// init column names by table schema
// after filtered out some columns, we must explicitly set the columns for TiDB backend
for _, col := range t.tableInfo.Core.Columns {
if _, ok := ignoreColsMap[col.Name.L]; !col.Hidden && !ok {
filteredColumns = append(filteredColumns, col.Name.O)
}
}
}
}
initializedColumns = true
}
case io.EOF:
Expand Down Expand Up @@ -2369,7 +2399,7 @@ func (cr *chunkRestore) encodeLoop(
continue
}

kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID})
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID})
kvSize += kvs.Size()
failpoint.Inject("mock-kv-size", func(val failpoint.Value) {
kvSize += uint64(val.(int))
Expand Down
Loading