Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: support generated column in loader and binlog replication #42

Merged
merged 20 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions loader/convert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ func bytes2str(bs []byte) string {
func parseInsertStmt(sql []byte, table *tableInfo, columnMapping *cm.Mapping) ([][]string, error) {
var s, e, size int
var rows = make([][]string, 0, 1024)
var VALUES = []byte("VALUES")

// If table has generated column, the dumped SQL file has a different `INSERT INTO` line,
// which provides column names except generated column. such as following:
// INSERT INTO `t1` (`id`,`uid`,`name`,`info`) VALUES
// (1,10001,"Gabriel García Márquez",NULL),
// (2,10002,"Cien años de soledad",NULL);
// otherwise dumped SQL file has content like folloing:
// INSERT INTO `t1` VALUES
// (1,"hello"),
// (2,"world");

for {
sql = sql[s:]
Expand All @@ -56,6 +67,10 @@ func parseInsertStmt(sql []byte, table *tableInfo, columnMapping *cm.Mapping) ([
if sql[e] == '\n' && (sql[e-1] == ',' || sql[e-1] == ';') && sql[e-2] == ')' {
break
}
if sql[e] == '\n' && e-6 > s && bytes.Compare(sql[e-6:e], VALUES) == 0 {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
s = e + 1
continue
}
}
if e == size {
return nil, errors.New("not found cooresponding ending of sql: ')'")
Expand Down Expand Up @@ -236,10 +251,25 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error)
return nil, errors.Errorf("statement %s for %s/%s is not create table statement", statement, schema, table)
}

columns := make([]string, 0, len(ct.Cols))
var (
columns = make([]string, 0, len(ct.Cols))
hasGeneragedCols = false
columnNameFields = ""
)
for _, col := range ct.Cols {
if len(col.Options) > 0 && col.Options[0].Tp == ast.ColumnOptionGenerated {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
hasGeneragedCols = true
continue
}
columns = append(columns, col.Name.Name.O)
}
if hasGeneragedCols {
var escapeColumns []string
for _, column := range columns {
escapeColumns = append(escapeColumns, fmt.Sprintf("`%s`", column))
}
columnNameFields = "(" + strings.Join(escapeColumns, ",") + ") "
}

dstSchema, dstTable := fetchMatchedLiteral(r, schema, table)
return &tableInfo{
Expand All @@ -248,7 +278,7 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error)
targetSchema: dstSchema,
targetTable: dstTable,
columnNameList: columns,
insertHeadStmt: fmt.Sprintf("INSERT INTO `%s` VALUES", dstTable),
insertHeadStmt: fmt.Sprintf("INSERT INTO `%s` %sVALUES", dstTable, columnNameFields),
}, nil
}

Expand Down
69 changes: 69 additions & 0 deletions loader/convert_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,50 @@ func (t *testConvertDataSuite) TestReassemble(c *C) {
}
}

func (t *testConvertDataSuite) TestReassembleWithGeneratedColumn(c *C) {
table := &tableInfo{
sourceSchema: "test2",
sourceTable: "t3",
targetSchema: "test",
targetTable: "t",
columnNameList: []string{
"id",
"t_json",
},
insertHeadStmt: "INSERT INTO t (`id`,`t_json`) VALUES",
}

sql := `INSERT INTO t1 (id,t_json) VALUES
(10,'{}'),
(9,NULL);
(8,'{"a":123}');
`

expected := []string{
"INSERT INTO t (`id`,`t_json`) VALUES(585520728116297738,'{}'),(585520728116297737,NULL),(585520728116297736,'{\"a\":123}');",
"INSERT INTO t (`id`,`t_json`) VALUES(10,'{}'),(9,NULL),(8,'{\"a\":123}');",
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}

rules := []*cm.Rule{
{
PatternSchema: "test*",
PatternTable: "t*",
TargetColumn: "id",
Expression: cm.PartitionID,
Arguments: []string{"1", "test", "t"},
},
}

for i, r := range rules {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rules only contains one rule,but expected contains two case

columnMapping, err := cm.NewMapping(false, []*cm.Rule{r})
c.Assert(err, IsNil)

query, err := reassemble([]byte(sql), table, columnMapping)
c.Assert(err, IsNil)
c.Assert(query, Equals, expected[i])
}
}

func (t *testConvertDataSuite) TestParseTable(c *C) {
rules := []*router.TableRule{
{"test*", "t*", "test", "t"},
Expand Down Expand Up @@ -128,3 +172,28 @@ func (t *testConvertDataSuite) TestParseTable(c *C) {
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}

func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) {
rules := []*router.TableRule{
{"test*", "t*", "test", "t"},
}

expectedTableInfo := &tableInfo{
sourceSchema: "test1",
sourceTable: "t3",
targetSchema: "test",
targetTable: "t",
columnNameList: []string{
"id",
"t_json",
},
insertHeadStmt: "INSERT INTO `t` (`id`,`t_json`) VALUES",
}

r, err := router.NewTableRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(r, "test1", "t3", "./dumpfile/test1.t3-schema.sql")
c.Assert(err, IsNil)
c.Assert(tableInfo, DeepEquals, expectedTableInfo)
}
9 changes: 9 additions & 0 deletions loader/dumpfile/test1.t3-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*!40101 SET NAMES binary*/;
/*!40014 SET FOREIGN_KEY_CHECKS=0*/;

CREATE TABLE `binlog_1` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`t_json` VARCHAR(100),
`t_json_gen` json as (`t_json`) VIRTUAL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
8 changes: 8 additions & 0 deletions syncer/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ func columnOptionsToSQL(options []*ast.ColumnOption) string {
sql += fmt.Sprintf(" COMMENT '%s'", comment)
case ast.ColumnOptionOnUpdate: // For Timestamp and Datetime only.
sql += " ON UPDATE CURRENT_TIMESTAMP"
case ast.ColumnOptionGenerated:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use restore to implement it later

var store string
if opt.Stored {
store = "STORED"
} else {
store = "VIRTUAL"
}
sql += fmt.Sprintf(" GENERATED ALWAYS AS (%s) %s", opt.Expr.Text(), store)
case ast.ColumnOptionFulltext:
panic("not implemented yet")
default:
Expand Down
6 changes: 6 additions & 0 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type column struct {
NotNull bool
unsigned bool
tp string
extra string
}

type table struct {
Expand Down Expand Up @@ -390,6 +391,7 @@ func getTableColumns(db *Conn, table *table, maxRetry int) error {
column.idx = idx
column.name = string(data[0])
column.tp = string(data[1])
column.extra = string(data[5])
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Feb 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update example at L365


if strings.ToLower(string(data[2])) == "no" {
column.NotNull = true
Expand Down Expand Up @@ -456,3 +458,7 @@ func getBinaryLogs(db *sql.DB) ([]binlogSize, error) {
}
return files, nil
}

func (c *column) isGeneratedColumn() bool {
return strings.Contains(c.extra, "VIRTUAL GENERATED") || strings.Contains(c.extra, "STORED GENERATED")
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must extra be uppercase? should we use == strings.TrimSpace(xxx) rather than strings.Contains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to MySQL doc, VIRTUAL GENERATED and STORED GENERATED in Extra field are always uppercase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strings.Contains is used because there may exist other data in Extra?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid some comment options are in it

Copy link
Contributor Author

@amyangfei amyangfei Feb 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://dev.mysql.com/doc/refman/5.7/en/show-columns.html
In MySQL doc, there are only four Extra fields: auto_increment, on update CURRENT_TIMESTAMP, VIRTUAL GENERATED or VIRTUAL STORED

}
54 changes: 54 additions & 0 deletions syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"

. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -86,9 +87,14 @@ func (s *testSyncerSuite) TestGenDDLSQL(c *C) {
c.Assert(err, IsNil)
stmt, err := p.ParseOneStmt(t[0], "", "")
c.Assert(err, IsNil)

sql, err := genDDLSQL(t[0], stmt, originTableNameSingle, targetTableNameSingle, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])

sql, err = genDDLSQL(t[1], stmt, originTableNameSingle, targetTableNameSingle, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])
}

testCase = [][]string{
Expand All @@ -101,9 +107,14 @@ func (s *testSyncerSuite) TestGenDDLSQL(c *C) {
c.Assert(err, IsNil)
stmt, err := p.ParseOneStmt(t[0], "", "")
c.Assert(err, IsNil)

sql, err := genDDLSQL(t[0], stmt, originTableNameDouble, targetTableNameDouble, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])

sql, err = genDDLSQL(t[1], stmt, originTableNameDouble, targetTableNameDouble, true)
c.Assert(err, IsNil)
c.Assert(sql, Equals, t[2])
}

}
Expand Down Expand Up @@ -338,3 +349,46 @@ func (s *testSyncerSuite) TestIgnoreDMLInQuery(c *C) {
c.Assert(pr.isDDL, Equals, cs.isDDL)
}
}

func (s *testSyncerSuite) TestResolveSQL(c *C) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is TestResolveGeneratedColumnDDL better?

testCases := []struct {
sql string
expected string
}{
{
"ALTER TABLE `test`.`test` ADD COLUMN d int(11) GENERATED ALWAYS AS (c + 1) VIRTUAL",
"ALTER TABLE `test`.`test` ADD COLUMN `d` int(11) GENERATED ALWAYS AS (c + 1) VIRTUAL",
},
{
"ALTER TABLE `test`.`test` ADD COLUMN d int(11) AS (1 + 1) STORED",
"ALTER TABLE `test`.`test` ADD COLUMN `d` int(11) GENERATED ALWAYS AS (1 + 1) STORED",
},
}

syncer := &Syncer{}
parser, err := utils.GetParser(s.db, false)
c.Assert(err, IsNil)

for _, tc := range testCases {
ast1, err := parser.ParseOneStmt(tc.sql, "", "")
c.Assert(err, IsNil)

sqls, _, _, err := syncer.resolveDDLSQL(tc.sql, parser, "")
c.Assert(err, IsNil)

c.Assert(len(sqls), Equals, 1)
getSQL := sqls[0]
c.Assert(getSQL, Equals, tc.expected)

ast2, err := parser.ParseOneStmt(getSQL, "", "")
c.Assert(err, IsNil)

// compare parsed ast of the resoved SQL with parsed ast of the origin SQL.
// because text fields are not always same, and the difference of text
// makes no sense to the semantics, we just ignore checking it.
atStmt1 := ast1.(*ast.AlterTableStmt)
atStmt2 := ast2.(*ast.AlterTableStmt)
c.Assert(atStmt1.Table, DeepEquals, atStmt2.Table)
c.Assert(atStmt1.Specs, DeepEquals, atStmt2.Specs)
}
}
40 changes: 40 additions & 0 deletions syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,3 +423,43 @@ func (s *Syncer) mappingDML(schema, table string, columns []string, data [][]int
}
return rows, nil
}

func (s *Syncer) pruneGenColumnDML(needPrune bool, genColumnFilter []bool, data [][]interface{}) [][]interface{} {
if !needPrune {
return data
}

rows := make([][]interface{}, 0, len(data))
for _, row := range data {
value := make([]interface{}, 0, len(row))
for i := range row {
if !genColumnFilter[i] {
value = append(value, row[i])
}
}
rows = append(rows, value)
}
return rows
}

// generatedColumnFilter iterates column list and returns
// a bool indicates where one or more generated column exists
// a bool slice indicates whether the i-th column is generated column
// a new column slice without generated columns
func generatedColumnFilter(columns []*column) (bool, []bool, []*column) {
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Feb 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would generate column be one of unique columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only secondary index is allowed on generated columns.

var (
needPrune bool
filters = make([]bool, 0, len(columns))
filterCols = make([]*column, 0, len(columns))
)
for _, c := range columns {
isGenColumn := c.isGeneratedColumn()
filters = append(filters, isGenColumn)
if isGenColumn {
needPrune = true
continue
}
filterCols = append(filterCols, c)
}
return needPrune, filters, filterCols
}
8 changes: 5 additions & 3 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,10 +1138,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return errors.Trace(err)
}
needPrune, genColumnFilter, tblColumns := generatedColumnFilter(table.columns)
rows, err := s.mappingDML(originSchema, originTable, columns, ev.Rows)
if err != nil {
return errors.Trace(err)
}
rows = s.pruneGenColumnDML(needPrune, genColumnFilter, rows)

var (
applied bool
Expand All @@ -1161,7 +1163,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
switch e.Header.EventType {
case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
if !applied {
sqls, keys, args, err = genInsertSQLs(table.schema, table.name, rows, table.columns, table.indexColumns)
sqls, keys, args, err = genInsertSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns)
if err != nil {
return errors.Errorf("gen insert sqls failed: %v, schema: %s, table: %s", errors.Trace(err), table.schema, table.name)
}
Expand All @@ -1184,7 +1186,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
if !applied {
sqls, keys, args, err = genUpdateSQLs(table.schema, table.name, rows, table.columns, table.indexColumns, safeMode.Enable())
sqls, keys, args, err = genUpdateSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns, safeMode.Enable())
if err != nil {
return errors.Errorf("gen update sqls failed: %v, schema: %s, table: %s", err, table.schema, table.name)
}
Expand All @@ -1208,7 +1210,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
case replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
if !applied {
sqls, keys, args, err = genDeleteSQLs(table.schema, table.name, rows, table.columns, table.indexColumns)
sqls, keys, args, err = genDeleteSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns)
if err != nil {
return errors.Errorf("gen delete sqls failed: %v, schema: %s, table: %s", err, table.schema, table.name)
}
Expand Down
Loading