Skip to content

Commit

Permalink
lightning: let ignore columns be compatible with tidb backend (#27850)
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Nov 23, 2021
1 parent 9535f05 commit c687915
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 80 deletions.
30 changes: 22 additions & 8 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type tidbEncoder struct {
// the index of table columns for each data field.
// index == len(table.columns) means this field is `_tidb_rowid`
columnIdx []int
// the max index used in this chunk, due to the ignore-columns config, we can't
// directly check the total column count, so we fall back to only check that
// the there are enough columns.
columnCnt int
}

Expand Down Expand Up @@ -284,22 +287,27 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnCount := 0
columnMaxIdx := -1
columnIdx := make([]int, len(columnPermutation))
for i := 0; i < len(columnPermutation); i++ {
columnIdx[i] = -1
}
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
columnCount++
if idx > columnMaxIdx {
columnMaxIdx = idx
}
}
}
enc.columnIdx = columnIdx
enc.columnCnt = columnCount
enc.columnCnt = columnMaxIdx + 1
}

// TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently.
// See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the
// column permutation with default, thus enc.columnCnt > len(row).
if len(row) > enc.columnCnt {
if len(row) < enc.columnCnt {
logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
zap.Array("data", kv.RowArrayMarshaler(row)))
return emptyTiDBRow, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row))
Expand All @@ -308,8 +316,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
cnt := 0
for i, field := range row {
if i != 0 {
if enc.columnIdx[i] < 0 {
continue
}
if cnt > 0 {
encoded.WriteByte(',')
}
datum := field
Expand All @@ -321,6 +333,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
)
return nil, err
}
cnt++
}
encoded.WriteByte(')')
return tidbRow{
Expand Down Expand Up @@ -569,7 +582,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
serverInfo := version.ParseServerInfo(versionStr)

rows, e := tx.Query(`
SELECT table_name, column_name, column_type, extra
SELECT table_name, column_name, column_type, generation_expression, extra
FROM information_schema.columns
WHERE table_schema = ?
ORDER BY table_name, ordinal_position;
Expand All @@ -585,8 +598,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
curTable *model.TableInfo
)
for rows.Next() {
var tableName, columnName, columnType, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
return e
}
if tableName != curTableName {
Expand Down Expand Up @@ -615,6 +628,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
FieldType: types.FieldType{
Flag: flag,
},
GeneratedExprString: generationExpr,
})
curColOffset++
}
Expand Down
46 changes: 25 additions & 21 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestWriteRowsReplaceOnDup(t *testing.T) {
s := createMysqlSuite(t)
defer s.TearDownTest(t)
s.mockDB.
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`b`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(-9223372036854775808,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
Expand All @@ -96,6 +96,9 @@ func TestWriteRowsReplaceOnDup(t *testing.T) {
perms = append(perms, i)
}
perms = append(perms, -1)
// skip column a,c due to ignore-columns
perms[0] = -1
perms[2] = -1
encoder, err := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890})
require.NoError(t, err)
row, err := encoder.Encode(logger, []types.Datum{
Expand All @@ -120,7 +123,7 @@ func TestWriteRowsReplaceOnDup(t *testing.T) {

writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows)
err = writer.WriteRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows)
require.NoError(t, err)
st, err := writer.Close(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -151,7 +154,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) {
require.NoError(t, err)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "1.csv", 0)
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "1.csv", 0)
require.NoError(t, err)
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

Expand Down Expand Up @@ -198,7 +201,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) {
require.NoError(t, err)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "3.csv", 0)
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "3.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
Expand Down Expand Up @@ -261,10 +264,10 @@ func TestFetchRemoteTableModels_3_x(t *testing.T) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v3.0.18"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "int(10)", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "int(10)", "", "auto_increment"))
s.mockDB.ExpectCommit()

bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig()))
Expand Down Expand Up @@ -296,10 +299,10 @@ func TestFetchRemoteTableModels_4_0(t *testing.T) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.0"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "", "auto_increment"))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID"}).
AddRow("test", "t", "id", int64(1)))
Expand Down Expand Up @@ -334,10 +337,10 @@ func TestFetchRemoteTableModels_4_x_auto_increment(t *testing.T) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT"))
Expand Down Expand Up @@ -372,10 +375,10 @@ func TestFetchRemoteTableModels_4_x_auto_random(t *testing.T) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "1 + 2", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_RANDOM"))
Expand All @@ -398,6 +401,7 @@ func TestFetchRemoteTableModels_4_x_auto_random(t *testing.T) {
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag,
},
GeneratedExprString: "1 + 2",
},
},
},
Expand Down Expand Up @@ -465,35 +469,35 @@ func TestWriteRowsErrorDowngrading(t *testing.T) {
require.NoError(t, err)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "7.csv", 0)
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "7.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(2),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "8.csv", 0)
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "8.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(3),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "9.csv", 0)
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "9.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(4),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "10.csv", 0)
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "10.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(5),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "11.csv", 0)
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "11.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,14 @@ type IgnoreColumns struct {
Columns []string `toml:"columns" json:"columns"`
}

func (ic *IgnoreColumns) ColumnsMap() map[string]struct{} {
columnMap := make(map[string]struct{}, len(ic.Columns))
for _, c := range ic.Columns {
columnMap[c] = struct{}{}
}
return columnMap
}

// GetIgnoreColumns gets Ignore config by schema name/regex and table name/regex.
func (igCols AllIgnoreColumns) GetIgnoreColumns(db string, table string, caseSensitive bool) (*IgnoreColumns, error) {
if !caseSensitive {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/lightning_server_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func TestHTTPAPIOutsideServerMode(t *testing.T) {

errCh := make(chan error)
cfg := config.NewConfig()
cfg.TiDB.DistSQLScanConcurrency = 4
err := cfg.LoadFromGlobal(s.lightning.globalCfg)
require.NoError(t, err)
go func() {
Expand Down
7 changes: 2 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,11 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
return msgs, nil
}

igCols := make(map[string]struct{})
igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive)
if err != nil {
return nil, errors.Trace(err)
}
for _, col := range igCol.Columns {
igCols[col] = struct{}{}
}
igCols := igCol.ColumnsMap()

colCountFromTiDB := len(info.Core.Columns)
core := info.Core
Expand Down Expand Up @@ -996,7 +993,7 @@ outloop:
case nil:
if !initializedColumns {
if len(columnPermutation) == 0 {
columnPermutation, err = createColumnPermutation(columnNames, igCols.Columns, tableInfo)
columnPermutation, err = createColumnPermutation(columnNames, igCols.ColumnsMap(), tableInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down
34 changes: 32 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,7 +1490,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.Columns)
tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -2306,6 +2306,16 @@ func (cr *chunkRestore) encodeLoop(

pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs
initializedColumns, reachEOF := false, false
// filteredColumns is column names that excluded ignored columns
// WARN: this might be not correct when different SQL statements contains different fields,
// but since ColumnPermutation also depends on the hypothesis that the columns in one source file is the same
// so this should be ok.
var filteredColumns []string
ignoreColumns, err1 := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(t.dbInfo.Name, t.tableInfo.Core.Name.O, rc.cfg.Mydumper.CaseSensitive)
if err1 != nil {
err = err1
return
}
for !reachEOF {
if err = pauser.Wait(ctx); err != nil {
return
Expand Down Expand Up @@ -2336,6 +2346,26 @@ func (cr *chunkRestore) encodeLoop(
return
}
}
filteredColumns = columnNames
if ignoreColumns != nil && len(ignoreColumns.Columns) > 0 {
filteredColumns = make([]string, 0, len(columnNames))
ignoreColsMap := ignoreColumns.ColumnsMap()
if len(columnNames) > 0 {
for _, c := range columnNames {
if _, ok := ignoreColsMap[c]; !ok {
filteredColumns = append(filteredColumns, c)
}
}
} else {
// init column names by table schema
// after filtered out some columns, we must explicitly set the columns for TiDB backend
for _, col := range t.tableInfo.Core.Columns {
if _, ok := ignoreColsMap[col.Name.L]; !col.Hidden && !ok {
filteredColumns = append(filteredColumns, col.Name.O)
}
}
}
}
initializedColumns = true
}
case io.EOF:
Expand Down Expand Up @@ -2369,7 +2399,7 @@ func (cr *chunkRestore) encodeLoop(
continue
}

kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID})
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID})
kvSize += kvs.Size()
failpoint.Inject("mock-kv-size", func(val failpoint.Value) {
kvSize += uint64(val.(int))
Expand Down
Loading

0 comments on commit c687915

Please sign in to comment.