From 0c1c957b57d42eb4c97b3956db0178a351c3d36e Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 31 Jan 2019 13:32:41 +0800 Subject: [PATCH 01/19] syncer: support generated column in DDL --- syncer/ast.go | 8 +++++++ syncer/ddl_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/syncer/ast.go b/syncer/ast.go index c07700315f..b0c4294101 100644 --- a/syncer/ast.go +++ b/syncer/ast.go @@ -122,6 +122,14 @@ func columnOptionsToSQL(options []*ast.ColumnOption) string { sql += fmt.Sprintf(" COMMENT '%s'", comment) case ast.ColumnOptionOnUpdate: // For Timestamp and Datetime only. sql += " ON UPDATE CURRENT_TIMESTAMP" + case ast.ColumnOptionGenerated: + var store string + if opt.Stored { + store = "STORED" + } else { + store = "VIRTUAL" + } + sql += fmt.Sprintf(" GENERATED ALWAYS AS (%s) %s", opt.Expr.Text(), store) case ast.ColumnOptionFulltext: panic("not implemented yet") default: diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index b158959fea..a43fd820a5 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -18,6 +18,7 @@ import ( "database/sql" . "github.com/pingcap/check" + "github.com/pingcap/parser/ast" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/dm/dm/config" @@ -86,9 +87,14 @@ func (s *testSyncerSuite) TestGenDDLSQL(c *C) { c.Assert(err, IsNil) stmt, err := p.ParseOneStmt(t[0], "", "") c.Assert(err, IsNil) + sql, err := genDDLSQL(t[0], stmt, originTableNameSingle, targetTableNameSingle, true) c.Assert(err, IsNil) c.Assert(sql, Equals, t[2]) + + sql, err = genDDLSQL(t[1], stmt, originTableNameSingle, targetTableNameSingle, true) + c.Assert(err, IsNil) + c.Assert(sql, Equals, t[2]) } testCase = [][]string{ @@ -101,9 +107,14 @@ func (s *testSyncerSuite) TestGenDDLSQL(c *C) { c.Assert(err, IsNil) stmt, err := p.ParseOneStmt(t[0], "", "") c.Assert(err, IsNil) + sql, err := genDDLSQL(t[0], stmt, originTableNameDouble, targetTableNameDouble, true) c.Assert(err, IsNil) c.Assert(sql, Equals, t[2]) + + sql, err = genDDLSQL(t[1], stmt, originTableNameDouble, targetTableNameDouble, true) + c.Assert(err, IsNil) + c.Assert(sql, Equals, t[2]) } } @@ -338,3 +349,46 @@ func (s *testSyncerSuite) TestIgnoreDMLInQuery(c *C) { c.Assert(pr.isDDL, Equals, cs.isDDL) } } + +func (s *testSyncerSuite) TestResolveSQL(c *C) { + testCases := []struct { + sql string + expected string + }{ + { + "ALTER TABLE `test`.`test` ADD COLUMN d int(11) GENERATED ALWAYS AS (c + 1) VIRTUAL", + "ALTER TABLE `test`.`test` ADD COLUMN `d` int(11) GENERATED ALWAYS AS (c + 1) VIRTUAL", + }, + { + "ALTER TABLE `test`.`test` ADD COLUMN d int(11) AS (1 + 1) STORED", + "ALTER TABLE `test`.`test` ADD COLUMN `d` int(11) GENERATED ALWAYS AS (1 + 1) STORED", + }, + } + + syncer := &Syncer{} + parser, err := utils.GetParser(s.db, false) + c.Assert(err, IsNil) + + for _, tc := range testCases { + ast1, err := parser.ParseOneStmt(tc.sql, "", "") + c.Assert(err, IsNil) + + sqls, _, _, err := syncer.resolveDDLSQL(tc.sql, parser, "") + c.Assert(err, IsNil) + + c.Assert(len(sqls), Equals, 1) + getSQL := sqls[0] + c.Assert(getSQL, Equals, tc.expected) + + ast2, err := parser.ParseOneStmt(getSQL, "", "") + c.Assert(err, IsNil) + + // compare parsed ast of the resoved SQL with parsed ast of the origin SQL. + // because text fields are not always same, and the difference of text + // makes no sense to the semantics, we just ignore checking it. + atStmt1 := ast1.(*ast.AlterTableStmt) + atStmt2 := ast2.(*ast.AlterTableStmt) + c.Assert(atStmt1.Table, DeepEquals, atStmt2.Table) + c.Assert(atStmt1.Specs, DeepEquals, atStmt2.Specs) + } +} From c18c5ab259e36a492e1b461a9476b3bbdafb7305 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 31 Jan 2019 14:23:08 +0800 Subject: [PATCH 02/19] syncer: auto remove generated column value in DML --- loader/convert_data.go | 30 +++++- syncer/db.go | 6 ++ syncer/dml.go | 40 +++++++ syncer/syncer.go | 8 +- syncer/syncer_test.go | 150 +++++++++++++++++++++++++- tests/sharding/data/db1.increment.sql | 9 +- tests/sharding/data/db1.prepare.sql | 4 +- tests/sharding/data/db2.increment.sql | 3 + tests/sharding/data/db2.prepare.sql | 8 +- 9 files changed, 245 insertions(+), 13 deletions(-) diff --git a/loader/convert_data.go b/loader/convert_data.go index 19e26c8e8f..366a29461f 100644 --- a/loader/convert_data.go +++ b/loader/convert_data.go @@ -40,6 +40,17 @@ func bytes2str(bs []byte) string { func parseInsertStmt(sql []byte, table *tableInfo, columnMapping *cm.Mapping) ([][]string, error) { var s, e, size int var rows = make([][]string, 0, 1024) + var VALUES = []byte("VALUES") + + // If table has generated column, the dumped SQL file has a different `INSERT INTO` line, + // which provides column names except generated column. such as following: + // INSERT INTO `t1` (`id`,`uid`,`name`,`info`) VALUES + // (1,10001,"Gabriel García Márquez",NULL), + // (2,10002,"Cien años de soledad",NULL); + // otherwise dumped SQL file has content like folloing: + // INSERT INTO `t1` VALUES + // (1,"hello"), + // (2,"world"); for { sql = sql[s:] @@ -56,6 +67,10 @@ func parseInsertStmt(sql []byte, table *tableInfo, columnMapping *cm.Mapping) ([ if sql[e] == '\n' && (sql[e-1] == ',' || sql[e-1] == ';') && sql[e-2] == ')' { break } + if sql[e] == '\n' && e-6 > s && bytes.Compare(sql[e-6:e], VALUES) == 0 { + s = e + 1 + continue + } } if e == size { return nil, errors.New("not found cooresponding ending of sql: ')'") @@ -236,10 +251,21 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error) return nil, errors.Errorf("statement %s for %s/%s is not create table statement", statement, schema, table) } - columns := make([]string, 0, len(ct.Cols)) + var ( + columns = make([]string, 0, len(ct.Cols)) + hasGeneragedCols = false + columnNameFields = "" + ) for _, col := range ct.Cols { + if col.Options[0].Tp == ast.ColumnOptionGenerated { + hasGeneragedCols = true + continue + } columns = append(columns, col.Name.Name.O) } + if hasGeneragedCols { + columnNameFields = "(" + strings.Join(columns, ",") + ")" + } dstSchema, dstTable := fetchMatchedLiteral(r, schema, table) return &tableInfo{ @@ -248,7 +274,7 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error) targetSchema: dstSchema, targetTable: dstTable, columnNameList: columns, - insertHeadStmt: fmt.Sprintf("INSERT INTO `%s` VALUES", dstTable), + insertHeadStmt: fmt.Sprintf("INSERT INTO `%s` %sVALUES", dstTable, columnNameFields), }, nil } diff --git a/syncer/db.go b/syncer/db.go index 99acb1f993..783744f892 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -37,6 +37,7 @@ type column struct { NotNull bool unsigned bool tp string + extra string } type table struct { @@ -390,6 +391,7 @@ func getTableColumns(db *Conn, table *table, maxRetry int) error { column.idx = idx column.name = string(data[0]) column.tp = string(data[1]) + column.extra = string(data[5]) if strings.ToLower(string(data[2])) == "no" { column.NotNull = true @@ -456,3 +458,7 @@ func getBinaryLogs(db *sql.DB) ([]binlogSize, error) { } return files, nil } + +func (c *column) isGeneratedColumn() bool { + return strings.Contains(c.extra, "VIRTUAL GENERATED") || strings.Contains(c.extra, "STORED GENERATED") +} diff --git a/syncer/dml.go b/syncer/dml.go index 88a16e2462..2f08f3483c 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -423,3 +423,43 @@ func (s *Syncer) mappingDML(schema, table string, columns []string, data [][]int } return rows, nil } + +func (s *Syncer) pruneGenColumnDML(needPrune bool, genColumnFilter []bool, data [][]interface{}) [][]interface{} { + if !needPrune { + return data + } + + rows := make([][]interface{}, 0, len(data)) + for _, row := range data { + value := make([]interface{}, 0, len(row)) + for i := range row { + if !genColumnFilter[i] { + value = append(value, row[i]) + } + } + rows = append(rows, value) + } + return rows +} + +// generatedColumnFilter iterates column list and returns +// a bool indicates where one or more generated column exists +// a bool slice indicates whether the i-th column is generated column +// a new column slice without generated columns +func generatedColumnFilter(columns []*column) (bool, []bool, []*column) { + var ( + needPrune bool + filters = make([]bool, 0, len(columns)) + filterCols = make([]*column, 0, len(columns)) + ) + for _, c := range columns { + isGenColumn := c.isGeneratedColumn() + filters = append(filters, isGenColumn) + if isGenColumn { + needPrune = true + continue + } + filterCols = append(filterCols, c) + } + return needPrune, filters, filterCols +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 3b9e4d1ece..dcbdb06308 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1138,10 +1138,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if err != nil { return errors.Trace(err) } + needPrune, genColumnFilter, tblColumns := generatedColumnFilter(table.columns) rows, err := s.mappingDML(originSchema, originTable, columns, ev.Rows) if err != nil { return errors.Trace(err) } + rows = s.pruneGenColumnDML(needPrune, genColumnFilter, rows) var ( applied bool @@ -1161,7 +1163,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { switch e.Header.EventType { case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: if !applied { - sqls, keys, args, err = genInsertSQLs(table.schema, table.name, rows, table.columns, table.indexColumns) + sqls, keys, args, err = genInsertSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns) if err != nil { return errors.Errorf("gen insert sqls failed: %v, schema: %s, table: %s", errors.Trace(err), table.schema, table.name) } @@ -1184,7 +1186,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: if !applied { - sqls, keys, args, err = genUpdateSQLs(table.schema, table.name, rows, table.columns, table.indexColumns, safeMode.Enable()) + sqls, keys, args, err = genUpdateSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns, safeMode.Enable()) if err != nil { return errors.Errorf("gen update sqls failed: %v, schema: %s, table: %s", err, table.schema, table.name) } @@ -1208,7 +1210,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } case replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: if !applied { - sqls, keys, args, err = genDeleteSQLs(table.schema, table.name, rows, table.columns, table.indexColumns) + sqls, keys, args, err = genDeleteSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns) if err != nil { return errors.Errorf("gen delete sqls failed: %v, schema: %s, table: %s", err, table.schema, table.name) } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 0158bd4a08..f294acc236 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -135,6 +135,32 @@ func (s *testSyncerSuite) resetMaster() { s.db.Exec("reset master") } +func (s *testSyncerSuite) catchUpBinlog() { + ch := make(chan interface{}) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + for { + ev, _ := s.streamer.GetEvent(ctx) + if ev == nil { + return + } + ch <- struct{}{} + } + }() + + for { + t := time.NewTimer(10 * time.Millisecond) + select { + case <-ch: + t.Stop() + case <-t.C: + cancel() + return + } + } +} + func (s *testSyncerSuite) TestSelectDB(c *C) { s.cfg.BWList = &filter.Rules{ DoDBs: []string{"~^b.*", "s1", "stest"}, @@ -188,6 +214,7 @@ func (s *testSyncerSuite) TestSelectDB(c *C) { c.Assert(r, Equals, res[i]) i++ } + s.catchUpBinlog() } func (s *testSyncerSuite) TestSelectTable(c *C) { @@ -297,6 +324,7 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { } i++ } + s.catchUpBinlog() } func (s *testSyncerSuite) TestIgnoreDB(c *C) { @@ -353,6 +381,7 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { c.Assert(r, Equals, res[i]) i++ } + s.catchUpBinlog() } func (s *testSyncerSuite) TestIgnoreTable(c *C) { @@ -457,7 +486,7 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { i++ } - + s.catchUpBinlog() } func (s *testSyncerSuite) TestSkipDML(c *C) { @@ -538,6 +567,7 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { } i++ } + s.catchUpBinlog() } func (s *testSyncerSuite) TestColumnMapping(c *C) { @@ -625,6 +655,7 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { } i++ } + s.catchUpBinlog() } func (s *testSyncerSuite) TestTimezone(c *C) { @@ -729,4 +760,121 @@ func (s *testSyncerSuite) TestTimezone(c *C) { for _, sql := range dropSQLs { s.db.Exec(sql) } + s.catchUpBinlog() +} + +func (s *testSyncerSuite) TestGeneratedColumn(c *C) { + s.cfg.BWList = &filter.Rules{ + DoDBs: []string{"~^gctest_.*"}, + } + + createSQLs := []string{ + "create database if not exists gctest_1 DEFAULT CHARSET=utf8mb4", + "create table if not exists gctest_1.t_1(id int, age int, cfg varchar(40), cfg_json json as (cfg) virtual)", + } + + testCases := []struct { + sqls []string + expected []string + args [][]interface{} + }{ + { + []string{ + "insert into gctest_1.t_1(id, age, cfg) values (1, 18, '{}')", + "insert into gctest_1.t_1(id, age, cfg) values (2, 19, '{\"key\": \"value\", \"int\": 123}')", + "insert into gctest_1.t_1(id, age, cfg) values (3, 17, NULL)", + }, + []string{ + "REPLACE INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", + "REPLACE INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", + "REPLACE INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", + }, + [][]interface{}{ + []interface{}{int32(1), int32(18), "{}"}, + []interface{}{int32(2), int32(19), "{\"key\": \"value\", \"int\": 123}"}, + []interface{}{int32(3), int32(17), nil}, + }, + }, + { + []string{ + // This test case will trigger a go-mysql bug, + // will uncomment after go-mysql fixed + "update gctest_1.t_1 set cfg = '{\"a\": 12}', age = 21 where id = 1", + "update gctest_1.t_1 set cfg = '{}' where id = 2 and age = 19", + "update gctest_1.t_1 set age = 20 where cfg is NULL", + }, + []string{ + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? LIMIT 1;", + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? LIMIT 1;", + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` IS ? LIMIT 1;", + }, + [][]interface{}{ + []interface{}{int32(1), int32(21), "{\"a\": 12}", int32(1), int32(18), "{}"}, + []interface{}{int32(2), int32(19), "{}", int32(2), int32(19), "{\"key\": \"value\", \"int\": 123}"}, + []interface{}{int32(3), int32(20), nil, int32(3), int32(17), nil}, + }, + }, + } + + dropSQLs := []string{ + "drop table gctest_1.t_1", + "drop database gctest_1", + } + + for _, sql := range createSQLs { + _, err := s.db.Exec(sql) + c.Assert(err, IsNil) + } + + syncer := NewSyncer(s.cfg) + syncer.cfg.MaxRetry = 1 + // use upstream db as mock downstream + syncer.toDBs = []*Conn{{db: s.db}} + + for _, testCase := range testCases { + for _, sql := range testCase.sqls { + _, err := s.db.Exec(sql) + c.Assert(err, IsNil) + } + idx := 0 + for { + if idx >= len(testCase.sqls) { + break + } + e, err := s.streamer.GetEvent(context.Background()) + c.Assert(err, IsNil) + switch ev := e.Event.(type) { + case *replication.RowsEvent: + table, _, err := syncer.getTable(string(ev.Table.Schema), string(ev.Table.Table)) + c.Assert(err, IsNil) + var ( + sqls []string + args [][]interface{} + ) + + needPrune, genColumnFilter, tblColumns := generatedColumnFilter(table.columns) + rows := syncer.pruneGenColumnDML(needPrune, genColumnFilter, ev.Rows) + switch e.Header.EventType { + case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + sqls, _, args, err = genInsertSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns) + c.Assert(err, IsNil) + c.Assert(sqls[0], Equals, testCase.expected[idx]) + c.Assert(args[0], DeepEquals, testCase.args[idx]) + case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + sqls, _, args, err = genUpdateSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns, false) + c.Assert(err, IsNil) + c.Assert(sqls[0], Equals, testCase.expected[idx]) + c.Assert(args[0], DeepEquals, testCase.args[idx]) + } + idx++ + default: + continue + } + } + } + + for _, sql := range dropSQLs { + s.db.Exec(sql) + } + s.catchUpBinlog() } diff --git a/tests/sharding/data/db1.increment.sql b/tests/sharding/data/db1.increment.sql index 30643a08b6..d5847dfe11 100644 --- a/tests/sharding/data/db1.increment.sql +++ b/tests/sharding/data/db1.increment.sql @@ -4,4 +4,11 @@ update t1 set name = 'Gabriel José de la Concordia García Márquez' where `uid update t1 set name = 'One Hundred Years of Solitude' where name = 'Cien años de soledad'; alter table t1 add column age int; alter table t2 add column age int; -insert into t2 (uid, name, age) values (20004, 'Colonel Aureliano Buendía', 301); +insert into t2 (uid, name, age, info) values (20004, 'Colonel Aureliano Buendía', 301, '{}'); +alter table t2 add column info_json json GENERATED ALWAYS AS (`info`) VIRTUAL; +insert into t1 (uid, name, info) values (10004, 'Buenos Aires', '{"age": 10}'); +insert into t2 (uid, name, info) values (20005, 'Buenos Aires', '{"age": 100}'); +insert into t2 (uid, name, info) values (20006, 'Buenos Aires', '{"age": 1000}'); +alter table t1 add column info_json json GENERATED ALWAYS AS (`info`) VIRTUAL; +insert into t1 (uid, name, info) values (10005, 'Buenos Aires', '{"age": 100}'); +insert into t2 (uid, name, info) values (20007, 'Buenos Aires', '{"age": 200}'); diff --git a/tests/sharding/data/db1.prepare.sql b/tests/sharding/data/db1.prepare.sql index 7e62474799..7d6e378356 100644 --- a/tests/sharding/data/db1.prepare.sql +++ b/tests/sharding/data/db1.prepare.sql @@ -1,7 +1,7 @@ drop database if exists `sharding`; create database `sharding`; use `sharding`; -create table t1 (id bigint auto_increment, uid int, name varchar(80), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; -create table t2 (id bigint auto_increment, uid int, name varchar(80), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +create table t1 (id bigint auto_increment, uid int, name varchar(80), info varchar(100), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +create table t2 (id bigint auto_increment, uid int, name varchar(80), info varchar(100), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; insert into t1 (uid, name) values (10001, 'Gabriel García Márquez'), (10002, 'Cien años de soledad'); insert into t2 (uid, name) values (20001, 'José Arcadio Buendía'), (20002, 'Úrsula Iguarán'), (20003, 'José Arcadio'); diff --git a/tests/sharding/data/db2.increment.sql b/tests/sharding/data/db2.increment.sql index e84df8d2e0..3237f915d0 100644 --- a/tests/sharding/data/db2.increment.sql +++ b/tests/sharding/data/db2.increment.sql @@ -4,3 +4,6 @@ alter table t2 add column age int; update t2 set uid = uid + 10000; alter table t3 add column age int; update t3 set age = 1; +alter table t2 add column info_json json GENERATED ALWAYS AS (`info`) VIRTUAL; +update t3 set age = age + 10; +alter table t3 add column info_json json GENERATED ALWAYS AS (`info`) VIRTUAL; diff --git a/tests/sharding/data/db2.prepare.sql b/tests/sharding/data/db2.prepare.sql index 9a2679f7ad..39d71bb6f8 100644 --- a/tests/sharding/data/db2.prepare.sql +++ b/tests/sharding/data/db2.prepare.sql @@ -1,7 +1,7 @@ drop database if exists `sharding`; create database `sharding`; use `sharding`; -create table t2 (id bigint auto_increment, uid int, name varchar(80), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; -create table t3 (id bigint auto_increment, uid int, name varchar(80), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; -insert into t2 (uid, name) values (40000, 'Remedios Moscote'), (40001, 'Amaranta'); -insert into t3 (uid, name) values (30001, 'Aureliano José'), (30002, 'Santa Sofía de la Piedad'), (30003, '17 Aurelianos'); +create table t2 (id bigint auto_increment, uid int, name varchar(80), info varchar(100), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +create table t3 (id bigint auto_increment, uid int, name varchar(80), info varchar(100), primary key (`id`), unique key(`uid`)) DEFAULT CHARSET=utf8mb4; +insert into t2 (uid, name, info) values (40000, 'Remedios Moscote', '{}'), (40001, 'Amaranta', '{"age": 0}'); +insert into t3 (uid, name, info) values (30001, 'Aureliano José', '{}'), (30002, 'Santa Sofía de la Piedad', '{}'), (30003, '17 Aurelianos', NULL); From cc6b6b59ca1830e7ca32fcb257cc906c98a57d36 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 12 Feb 2019 22:32:20 +0800 Subject: [PATCH 03/19] *: fix make check error --- syncer/syncer_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index f294acc236..f82049f32e 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -790,9 +790,9 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { "REPLACE INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", }, [][]interface{}{ - []interface{}{int32(1), int32(18), "{}"}, - []interface{}{int32(2), int32(19), "{\"key\": \"value\", \"int\": 123}"}, - []interface{}{int32(3), int32(17), nil}, + {int32(1), int32(18), "{}"}, + {int32(2), int32(19), "{\"key\": \"value\", \"int\": 123}"}, + {int32(3), int32(17), nil}, }, }, { @@ -809,9 +809,9 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` IS ? LIMIT 1;", }, [][]interface{}{ - []interface{}{int32(1), int32(21), "{\"a\": 12}", int32(1), int32(18), "{}"}, - []interface{}{int32(2), int32(19), "{}", int32(2), int32(19), "{\"key\": \"value\", \"int\": 123}"}, - []interface{}{int32(3), int32(20), nil, int32(3), int32(17), nil}, + {int32(1), int32(21), "{\"a\": 12}", int32(1), int32(18), "{}"}, + {int32(2), int32(19), "{}", int32(2), int32(19), "{\"key\": \"value\", \"int\": 123}"}, + {int32(3), int32(20), nil, int32(3), int32(17), nil}, }, }, } From a54efe0c748c293dddf01fae87f3dabe1c311dab Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 12 Feb 2019 22:43:06 +0800 Subject: [PATCH 04/19] test: fix unit test --- loader/convert_data.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loader/convert_data.go b/loader/convert_data.go index 366a29461f..ed17f9cfff 100644 --- a/loader/convert_data.go +++ b/loader/convert_data.go @@ -257,7 +257,7 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error) columnNameFields = "" ) for _, col := range ct.Cols { - if col.Options[0].Tp == ast.ColumnOptionGenerated { + if len(col.Options) > 0 && col.Options[0].Tp == ast.ColumnOptionGenerated { hasGeneragedCols = true continue } From bf5dba9603a389f0fddbd33b747f38295454ea51 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 13 Feb 2019 11:02:58 +0800 Subject: [PATCH 05/19] test: add convert data unit test --- loader/convert_data.go | 6 +++++- loader/convert_data_test.go | 25 +++++++++++++++++++++++++ loader/dumpfile/test1.t3-schema.sql | 9 +++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 loader/dumpfile/test1.t3-schema.sql diff --git a/loader/convert_data.go b/loader/convert_data.go index ed17f9cfff..c4b22b47b5 100644 --- a/loader/convert_data.go +++ b/loader/convert_data.go @@ -264,7 +264,11 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error) columns = append(columns, col.Name.Name.O) } if hasGeneragedCols { - columnNameFields = "(" + strings.Join(columns, ",") + ")" + var escapeColumns []string + for _, column := range columns { + escapeColumns = append(escapeColumns, fmt.Sprintf("`%s`", column)) + } + columnNameFields = "(" + strings.Join(escapeColumns, ",") + ") " } dstSchema, dstTable := fetchMatchedLiteral(r, schema, table) diff --git a/loader/convert_data_test.go b/loader/convert_data_test.go index 000ee626c6..b811aba6e5 100644 --- a/loader/convert_data_test.go +++ b/loader/convert_data_test.go @@ -128,3 +128,28 @@ func (t *testConvertDataSuite) TestParseTable(c *C) { c.Assert(err, IsNil) c.Assert(tableInfo, DeepEquals, expectedTableInfo) } + +func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) { + rules := []*router.TableRule{ + {"test*", "t*", "test", "t"}, + } + + expectedTableInfo := &tableInfo{ + sourceSchema: "test1", + sourceTable: "t3", + targetSchema: "test", + targetTable: "t", + columnNameList: []string{ + "id", + "t_json", + }, + insertHeadStmt: "INSERT INTO `t` (`id`,`t_json`) VALUES", + } + + r, err := router.NewTableRouter(false, rules) + c.Assert(err, IsNil) + + tableInfo, err := parseTable(r, "test1", "t3", "./dumpfile/test1.t3-schema.sql") + c.Assert(err, IsNil) + c.Assert(tableInfo, DeepEquals, expectedTableInfo) +} diff --git a/loader/dumpfile/test1.t3-schema.sql b/loader/dumpfile/test1.t3-schema.sql new file mode 100644 index 0000000000..398f92d6f3 --- /dev/null +++ b/loader/dumpfile/test1.t3-schema.sql @@ -0,0 +1,9 @@ +/*!40101 SET NAMES binary*/; +/*!40014 SET FOREIGN_KEY_CHECKS=0*/; + +CREATE TABLE `binlog_1` ( + `id` bigint(11) NOT NULL AUTO_INCREMENT, + `t_json` VARCHAR(100), + `t_json_gen` json as (`t_json`) VIRTUAL, + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1; From 13182d9278d567be3c0ee033e248ae591f4784cf Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 13 Feb 2019 11:36:40 +0800 Subject: [PATCH 06/19] test: add loader reassemble ut with generated column --- loader/convert_data_test.go | 44 +++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/loader/convert_data_test.go b/loader/convert_data_test.go index b811aba6e5..8917b04570 100644 --- a/loader/convert_data_test.go +++ b/loader/convert_data_test.go @@ -88,6 +88,50 @@ func (t *testConvertDataSuite) TestReassemble(c *C) { } } +func (t *testConvertDataSuite) TestReassembleWithGeneratedColumn(c *C) { + table := &tableInfo{ + sourceSchema: "test2", + sourceTable: "t3", + targetSchema: "test", + targetTable: "t", + columnNameList: []string{ + "id", + "t_json", + }, + insertHeadStmt: "INSERT INTO t (`id`,`t_json`) VALUES", + } + + sql := `INSERT INTO t1 (id,t_json) VALUES +(10,'{}'), +(9,NULL); +(8,'{"a":123}'); +` + + expected := []string{ + "INSERT INTO t (`id`,`t_json`) VALUES(585520728116297738,'{}'),(585520728116297737,NULL),(585520728116297736,'{\"a\":123}');", + "INSERT INTO t (`id`,`t_json`) VALUES(10,'{}'),(9,NULL),(8,'{\"a\":123}');", + } + + rules := []*cm.Rule{ + { + PatternSchema: "test*", + PatternTable: "t*", + TargetColumn: "id", + Expression: cm.PartitionID, + Arguments: []string{"1", "test", "t"}, + }, + } + + for i, r := range rules { + columnMapping, err := cm.NewMapping(false, []*cm.Rule{r}) + c.Assert(err, IsNil) + + query, err := reassemble([]byte(sql), table, columnMapping) + c.Assert(err, IsNil) + c.Assert(query, Equals, expected[i]) + } +} + func (t *testConvertDataSuite) TestParseTable(c *C) { rules := []*router.TableRule{ {"test*", "t*", "test", "t"}, From 9443dfe44ce4143e399691bfc8fe4e3b6674dc7d Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 15:34:29 +0800 Subject: [PATCH 07/19] loader: test all column options for generated column --- loader/convert_data.go | 14 ++++++++++---- loader/dumpfile/test1.t3-schema.sql | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/loader/convert_data.go b/loader/convert_data.go index c4b22b47b5..6307632b95 100644 --- a/loader/convert_data.go +++ b/loader/convert_data.go @@ -257,11 +257,17 @@ func parseTable(r *router.Table, schema, table, file string) (*tableInfo, error) columnNameFields = "" ) for _, col := range ct.Cols { - if len(col.Options) > 0 && col.Options[0].Tp == ast.ColumnOptionGenerated { - hasGeneragedCols = true - continue + skip := false + for _, opt := range col.Options { + if opt.Tp == ast.ColumnOptionGenerated { + hasGeneragedCols = true + skip = true + break + } + } + if !skip { + columns = append(columns, col.Name.Name.O) } - columns = append(columns, col.Name.Name.O) } if hasGeneragedCols { var escapeColumns []string diff --git a/loader/dumpfile/test1.t3-schema.sql b/loader/dumpfile/test1.t3-schema.sql index 398f92d6f3..1439d7cdea 100644 --- a/loader/dumpfile/test1.t3-schema.sql +++ b/loader/dumpfile/test1.t3-schema.sql @@ -4,6 +4,6 @@ CREATE TABLE `binlog_1` ( `id` bigint(11) NOT NULL AUTO_INCREMENT, `t_json` VARCHAR(100), - `t_json_gen` json as (`t_json`) VIRTUAL, + `t_json_gen` json comment 'test comment' as (`t_json`) VIRTUAL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1; From 1641de001758ed83ac6f6fa897b2282a6dfb1b5a Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 15:47:25 +0800 Subject: [PATCH 08/19] address comment --- loader/convert_data_test.go | 21 ++++++--------------- syncer/db.go | 17 +++++++++-------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/loader/convert_data_test.go b/loader/convert_data_test.go index 8917b04570..63964a2043 100644 --- a/loader/convert_data_test.go +++ b/loader/convert_data_test.go @@ -100,18 +100,12 @@ func (t *testConvertDataSuite) TestReassembleWithGeneratedColumn(c *C) { }, insertHeadStmt: "INSERT INTO t (`id`,`t_json`) VALUES", } - sql := `INSERT INTO t1 (id,t_json) VALUES (10,'{}'), (9,NULL); (8,'{"a":123}'); ` - - expected := []string{ - "INSERT INTO t (`id`,`t_json`) VALUES(585520728116297738,'{}'),(585520728116297737,NULL),(585520728116297736,'{\"a\":123}');", - "INSERT INTO t (`id`,`t_json`) VALUES(10,'{}'),(9,NULL),(8,'{\"a\":123}');", - } - + expected := "INSERT INTO t (`id`,`t_json`) VALUES(585520728116297738,'{}'),(585520728116297737,NULL),(585520728116297736,'{\"a\":123}');" rules := []*cm.Rule{ { PatternSchema: "test*", @@ -122,14 +116,11 @@ func (t *testConvertDataSuite) TestReassembleWithGeneratedColumn(c *C) { }, } - for i, r := range rules { - columnMapping, err := cm.NewMapping(false, []*cm.Rule{r}) - c.Assert(err, IsNil) - - query, err := reassemble([]byte(sql), table, columnMapping) - c.Assert(err, IsNil) - c.Assert(query, Equals, expected[i]) - } + columnMapping, err := cm.NewMapping(false, rules) + c.Assert(err, IsNil) + query, err := reassemble([]byte(sql), table, columnMapping) + c.Assert(err, IsNil) + c.Assert(query, Equals, expected) } func (t *testConvertDataSuite) TestParseTable(c *C) { diff --git a/syncer/db.go b/syncer/db.go index 783744f892..a4a74b3053 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -363,14 +363,15 @@ func getTableColumns(db *Conn, table *table, maxRetry int) error { // Show an example. /* mysql> show columns from test.t; - +-------+---------+------+-----+---------+-------+ - | Field | Type | Null | Key | Default | Extra | - +-------+---------+------+-----+---------+-------+ - | a | int(11) | NO | PRI | NULL | | - | b | int(11) | NO | PRI | NULL | | - | c | int(11) | YES | MUL | NULL | | - | d | int(11) | YES | | NULL | | - +-------+---------+------+-----+---------+-------+ + +-------+---------+------+-----+---------+-------------------+ + | Field | Type | Null | Key | Default | Extra | + +-------+---------+------+-----+---------+-------------------+ + | a | int(11) | NO | PRI | NULL | | + | b | int(11) | NO | PRI | NULL | | + | c | int(11) | YES | MUL | NULL | | + | d | int(11) | YES | | NULL | | + | d | json | YES | | NULL | VIRTUAL GENERATED | + +-------+---------+------+-----+---------+-------------------+ */ idx := 0 From 0af60d64439f6695d71d8e30517ea0eb282b11dd Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 15:49:49 +0800 Subject: [PATCH 09/19] address comment --- syncer/ddl_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index a43fd820a5..18fe028dcd 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -350,7 +350,7 @@ func (s *testSyncerSuite) TestIgnoreDMLInQuery(c *C) { } } -func (s *testSyncerSuite) TestResolveSQL(c *C) { +func (s *testSyncerSuite) TestResolveGeneratedColumnSQL(c *C) { testCases := []struct { sql string expected string From 5dffffa0069421d5c3d5eec9fe467bf35662b521 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 18:05:02 +0800 Subject: [PATCH 10/19] *: refine index with generated column --- syncer/dml.go | 68 +++++++++++++++++---------- syncer/syncer.go | 10 ++-- syncer/syncer_test.go | 5 +- tests/sharding/data/db1.increment.sql | 4 ++ tests/sharding/data/db2.increment.sql | 5 ++ 5 files changed, 59 insertions(+), 33 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index 2f08f3483c..b972c6d401 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -424,42 +424,60 @@ func (s *Syncer) mappingDML(schema, table string, columns []string, data [][]int return rows, nil } -func (s *Syncer) pruneGenColumnDML(needPrune bool, genColumnFilter []bool, data [][]interface{}) [][]interface{} { +// pruneGeneratedColumnDML filters columns list, data and index removing all generated column +func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[string][]*column) ([]*column, [][]interface{}, map[string][]*column) { + var ( + needPrune bool + colIndexfilters = make([]bool, 0, len(columns)) + genColumnNames = make(map[string]bool) + ) + + for _, c := range columns { + isGenColumn := c.isGeneratedColumn() + colIndexfilters = append(colIndexfilters, isGenColumn) + if isGenColumn { + needPrune = true + genColumnNames[c.name] = true + continue + } + } + if !needPrune { - return data + return columns, data, index } - rows := make([][]interface{}, 0, len(data)) + var ( + cols = make([]*column, 0, len(columns)) + rows = make([][]interface{}, 0, len(data)) + idxes = make(map[string][]*column) + ) + + for i := range columns { + if !colIndexfilters[i] { + cols = append(cols, columns[i]) + } + } for _, row := range data { value := make([]interface{}, 0, len(row)) for i := range row { - if !genColumnFilter[i] { + if !colIndexfilters[i] { value = append(value, row[i]) } } rows = append(rows, value) } - return rows -} - -// generatedColumnFilter iterates column list and returns -// a bool indicates where one or more generated column exists -// a bool slice indicates whether the i-th column is generated column -// a new column slice without generated columns -func generatedColumnFilter(columns []*column) (bool, []bool, []*column) { - var ( - needPrune bool - filters = make([]bool, 0, len(columns)) - filterCols = make([]*column, 0, len(columns)) - ) - for _, c := range columns { - isGenColumn := c.isGeneratedColumn() - filters = append(filters, isGenColumn) - if isGenColumn { - needPrune = true - continue + for key, keyCols := range index { + hasGenColumn := false + for _, col := range keyCols { + if _, ok := genColumnNames[col.name]; ok { + hasGenColumn = true + break + } + } + if !hasGenColumn { + idxes[key] = keyCols } - filterCols = append(filterCols, c) } - return needPrune, filters, filterCols + + return cols, rows, idxes } diff --git a/syncer/syncer.go b/syncer/syncer.go index dcbdb06308..d66507b51b 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1138,12 +1138,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if err != nil { return errors.Trace(err) } - needPrune, genColumnFilter, tblColumns := generatedColumnFilter(table.columns) rows, err := s.mappingDML(originSchema, originTable, columns, ev.Rows) if err != nil { return errors.Trace(err) } - rows = s.pruneGenColumnDML(needPrune, genColumnFilter, rows) + tblColumns, rowData, tblIndexColumns := pruneGeneratedColumnDML(table.columns, rows, table.indexColumns) + log.Infof("tblColumns %+v, rowData %+v tblIndexColumns %+v", tblColumns, rowData, tblIndexColumns) var ( applied bool @@ -1163,7 +1163,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { switch e.Header.EventType { case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: if !applied { - sqls, keys, args, err = genInsertSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns) + sqls, keys, args, err = genInsertSQLs(table.schema, table.name, rowData, tblColumns, tblIndexColumns) if err != nil { return errors.Errorf("gen insert sqls failed: %v, schema: %s, table: %s", errors.Trace(err), table.schema, table.name) } @@ -1186,7 +1186,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: if !applied { - sqls, keys, args, err = genUpdateSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns, safeMode.Enable()) + sqls, keys, args, err = genUpdateSQLs(table.schema, table.name, rowData, tblColumns, tblIndexColumns, safeMode.Enable()) if err != nil { return errors.Errorf("gen update sqls failed: %v, schema: %s, table: %s", err, table.schema, table.name) } @@ -1210,7 +1210,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } case replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: if !applied { - sqls, keys, args, err = genDeleteSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns) + sqls, keys, args, err = genDeleteSQLs(table.schema, table.name, rowData, tblColumns, tblIndexColumns) if err != nil { return errors.Errorf("gen delete sqls failed: %v, schema: %s, table: %s", err, table.schema, table.name) } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 06f2720593..e3b469204b 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -150,11 +150,10 @@ func (s *testSyncerSuite) catchUpBinlog() { }() for { - t := time.NewTimer(10 * time.Millisecond) select { case <-ch: - t.Stop() - case <-t.C: + // do nothing + case <-time.After(10 * time.Millisecond): cancel() return } diff --git a/tests/sharding/data/db1.increment.sql b/tests/sharding/data/db1.increment.sql index d5847dfe11..1088cdec99 100644 --- a/tests/sharding/data/db1.increment.sql +++ b/tests/sharding/data/db1.increment.sql @@ -10,5 +10,9 @@ insert into t1 (uid, name, info) values (10004, 'Buenos Aires', '{"age": 10}'); insert into t2 (uid, name, info) values (20005, 'Buenos Aires', '{"age": 100}'); insert into t2 (uid, name, info) values (20006, 'Buenos Aires', '{"age": 1000}'); alter table t1 add column info_json json GENERATED ALWAYS AS (`info`) VIRTUAL; +alter table t1 add column id_gen int as (uid + 1); +alter table t2 add column id_gen int as (uid + 1); +alter table t1 add unique (id_gen); +alter table t2 add unique (id_gen); insert into t1 (uid, name, info) values (10005, 'Buenos Aires', '{"age": 100}'); insert into t2 (uid, name, info) values (20007, 'Buenos Aires', '{"age": 200}'); diff --git a/tests/sharding/data/db2.increment.sql b/tests/sharding/data/db2.increment.sql index 3237f915d0..9008065f28 100644 --- a/tests/sharding/data/db2.increment.sql +++ b/tests/sharding/data/db2.increment.sql @@ -7,3 +7,8 @@ update t3 set age = 1; alter table t2 add column info_json json GENERATED ALWAYS AS (`info`) VIRTUAL; update t3 set age = age + 10; alter table t3 add column info_json json GENERATED ALWAYS AS (`info`) VIRTUAL; +alter table t3 add column id_gen int as (uid + 1); +alter table t2 add column id_gen int as (uid + 1); +alter table t2 add unique (id_gen); +alter table t3 add unique (id_gen); +update t2 set age = age + 10; From 184137671f5f177a16ee85758ee9ebb994421c1f Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 18:14:12 +0800 Subject: [PATCH 11/19] *: fix unit test --- syncer/dml.go | 5 ++++- syncer/syncer.go | 1 - syncer/syncer_test.go | 7 +++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index b972c6d401..1f6c8863c6 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -424,7 +424,10 @@ func (s *Syncer) mappingDML(schema, table string, columns []string, data [][]int return rows, nil } -// pruneGeneratedColumnDML filters columns list, data and index removing all generated column +// pruneGeneratedColumnDML filters columns list, data and index removing all +// generated column. because generated column is not support setting value +// directly in DML, we must remove generated column from DML, including column +// list, data list and all indexes including generated columns. func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[string][]*column) ([]*column, [][]interface{}, map[string][]*column) { var ( needPrune bool diff --git a/syncer/syncer.go b/syncer/syncer.go index d66507b51b..7a519ac7c7 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1143,7 +1143,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return errors.Trace(err) } tblColumns, rowData, tblIndexColumns := pruneGeneratedColumnDML(table.columns, rows, table.indexColumns) - log.Infof("tblColumns %+v, rowData %+v tblIndexColumns %+v", tblColumns, rowData, tblIndexColumns) var ( applied bool diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index e3b469204b..7000cfcd21 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -853,16 +853,15 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { args [][]interface{} ) - needPrune, genColumnFilter, tblColumns := generatedColumnFilter(table.columns) - rows := syncer.pruneGenColumnDML(needPrune, genColumnFilter, ev.Rows) + tblColumns, rowData, tblIndexColumns := pruneGeneratedColumnDML(table.columns, ev.Rows, table.indexColumns) switch e.Header.EventType { case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: - sqls, _, args, err = genInsertSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns) + sqls, _, args, err = genInsertSQLs(table.schema, table.name, rowData, tblColumns, tblIndexColumns) c.Assert(err, IsNil) c.Assert(sqls[0], Equals, testCase.expected[idx]) c.Assert(args[0], DeepEquals, testCase.args[idx]) case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: - sqls, _, args, err = genUpdateSQLs(table.schema, table.name, rows, tblColumns, table.indexColumns, false) + sqls, _, args, err = genUpdateSQLs(table.schema, table.name, rowData, tblColumns, tblIndexColumns, false) c.Assert(err, IsNil) c.Assert(sqls[0], Equals, testCase.expected[idx]) c.Assert(args[0], DeepEquals, testCase.args[idx]) From 048078f14ece89424d1786d0ee0f05a2b662b969 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 18:46:41 +0800 Subject: [PATCH 12/19] *: add multi column index in generated column test --- tests/sharding/data/db1.increment.sql | 4 ++++ tests/sharding/data/db2.increment.sql | 3 +++ 2 files changed, 7 insertions(+) diff --git a/tests/sharding/data/db1.increment.sql b/tests/sharding/data/db1.increment.sql index 1088cdec99..82154563c5 100644 --- a/tests/sharding/data/db1.increment.sql +++ b/tests/sharding/data/db1.increment.sql @@ -16,3 +16,7 @@ alter table t1 add unique (id_gen); alter table t2 add unique (id_gen); insert into t1 (uid, name, info) values (10005, 'Buenos Aires', '{"age": 100}'); insert into t2 (uid, name, info) values (20007, 'Buenos Aires', '{"age": 200}'); +alter table t1 add key multi_col_idx(uid, id_gen); +alter table t2 add key multi_col_idx(uid, id_gen); +insert into t1 (uid, name, info) values (10006, 'Buenos Aires', '{"age": 100}'); +insert into t2 (uid, name, info) values (20008, 'Buenos Aires', '{"age": 200}'); diff --git a/tests/sharding/data/db2.increment.sql b/tests/sharding/data/db2.increment.sql index 9008065f28..18eb4f75e0 100644 --- a/tests/sharding/data/db2.increment.sql +++ b/tests/sharding/data/db2.increment.sql @@ -12,3 +12,6 @@ alter table t2 add column id_gen int as (uid + 1); alter table t2 add unique (id_gen); alter table t3 add unique (id_gen); update t2 set age = age + 10; +alter table t2 add key multi_col_idx(uid, id_gen); +alter table t3 add key multi_col_idx(uid, id_gen); +update t3 set age = age + 10; From d3572b79415f6387ef03fd5af1d87da7ce586c39 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 20:50:25 +0800 Subject: [PATCH 13/19] address comment, test index with generated column --- syncer/dml.go | 1 - tests/all_mode/data/db1.increment.sql | 7 ++++++- tests/all_mode/data/db1.prepare.sql | 4 ++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index 1f6c8863c6..29c9623172 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -441,7 +441,6 @@ func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[ if isGenColumn { needPrune = true genColumnNames[c.name] = true - continue } } diff --git a/tests/all_mode/data/db1.increment.sql b/tests/all_mode/data/db1.increment.sql index 5c7c37b4e4..c9c3ae129a 100644 --- a/tests/all_mode/data/db1.increment.sql +++ b/tests/all_mode/data/db1.increment.sql @@ -1,4 +1,9 @@ use all_mode; -insert into t1 (name) values ('Eddard Stark'); +insert into t1 (id, name) values (3, 'Eddard Stark'); update t1 set name = 'Arya Stark' where id = 1; update t1 set name = 'Catelyn Stark' where name = 'catelyn'; +alter table t1 add column info json; +alter table t1 add column gen_id int as (info->"$.id"); +alter table t1 add index multi_col(`id`, `gen_id`); +insert into t1 (id, name, info) values (4, 'gentest', '{"id": 123}'); +insert into t1 (id, name, info) values (5, 'gentest', '{"id": 124}'); diff --git a/tests/all_mode/data/db1.prepare.sql b/tests/all_mode/data/db1.prepare.sql index 6ee56a8c65..1557401c64 100644 --- a/tests/all_mode/data/db1.prepare.sql +++ b/tests/all_mode/data/db1.prepare.sql @@ -1,5 +1,5 @@ drop database if exists `all_mode`; create database `all_mode`; use `all_mode`; -create table t1 (id int auto_increment, name varchar(20), primary key (`id`)); -insert into t1 (name) values ('arya'), ('catelyn'); +create table t1 (id int, name varchar(20)); +insert into t1 (id, name) values (1, 'arya'), (2, 'catelyn'); From c42b15186188959df23ac65c429d87b123f49aec Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 21:59:01 +0800 Subject: [PATCH 14/19] *: add generated column information cache --- syncer/dml.go | 86 ++++++++++++++++++++++++++- syncer/syncer.go | 9 ++- syncer/syncer_test.go | 2 +- tests/all_mode/data/db1.increment.sql | 6 ++ 4 files changed, 98 insertions(+), 5 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index 29c9623172..fdc8a71ec3 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -20,10 +20,62 @@ import ( "strconv" "strings" + "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/errors" ) +type GenColumnCacheStatus uint8 + +const ( + NotFound GenColumnCacheStatus = iota + hasGenColumn + noGenColumn +) + +type GenColCache struct { + hasGenColumn map[string]bool + columns map[string][]*column + indexes map[string]map[string][]*column + isGenColumn map[string][]bool +} + +func NewGenColCache() *GenColCache { + c := &GenColCache{} + c.reset() + return c +} + +// status returns `NotFound` if a `schema`.`table` has no generated column +// information cached, otherwise returns `hasGenColumn` if cache found and +// it has generated column and returns `noGenColumn` if it has no generated column. +func (c *GenColCache) status(key string) GenColumnCacheStatus { + if val, ok := c.hasGenColumn[key]; !ok { + return NotFound + } else { + if val { + return hasGenColumn + } + } + return noGenColumn +} + +func (c *GenColCache) clearTable(schema, table string) { + key := dbutil.TableName(schema, table) + delete(c.hasGenColumn, key) + delete(c.columns, key) + delete(c.indexes, key) + delete(c.isGenColumn, key) +} + +func (c *GenColCache) reset() { + c.hasGenColumn = make(map[string]bool) + c.columns = make(map[string][]*column) + c.indexes = make(map[string]map[string][]*column) + c.isGenColumn = make(map[string][]bool) +} + func genInsertSQLs(schema string, table string, dataSeq [][]interface{}, columns []*column, indexColumns map[string][]*column) ([]string, [][]string, [][]interface{}, error) { sqls := make([]string, 0, len(dataSeq)) keys := make([][]string, 0, len(dataSeq)) @@ -428,13 +480,36 @@ func (s *Syncer) mappingDML(schema, table string, columns []string, data [][]int // generated column. because generated column is not support setting value // directly in DML, we must remove generated column from DML, including column // list, data list and all indexes including generated columns. -func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[string][]*column) ([]*column, [][]interface{}, map[string][]*column) { +func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[string][]*column, schema, table string, cache *GenColCache) ([]*column, [][]interface{}, map[string][]*column, error) { var ( needPrune bool colIndexfilters = make([]bool, 0, len(columns)) genColumnNames = make(map[string]bool) ) + cacheKey := dbutil.TableName(schema, table) + cacheStatus := cache.status(cacheKey) + if cacheStatus == noGenColumn { + return columns, data, index, nil + } + if cacheStatus == hasGenColumn { + rows := make([][]interface{}, 0, len(data)) + filters, ok := cache.isGenColumn[cacheKey] + if !ok { + return nil, nil, nil, errors.NotFoundf("cache key %s in isGenColumn", cacheKey) + } + for _, row := range data { + value := make([]interface{}, 0, len(row)) + for i := range row { + if !filters[i] { + value = append(value, row[i]) + } + } + rows = append(rows, value) + } + return cache.columns[cacheKey], rows, cache.indexes[cacheKey], nil + } + for _, c := range columns { isGenColumn := c.isGeneratedColumn() colIndexfilters = append(colIndexfilters, isGenColumn) @@ -445,7 +520,8 @@ func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[ } if !needPrune { - return columns, data, index + cache.hasGenColumn[cacheKey] = false + return columns, data, index, nil } var ( @@ -480,6 +556,10 @@ func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[ idxes[key] = keyCols } } + cache.hasGenColumn[cacheKey] = true + cache.columns[cacheKey] = cols + cache.indexes[cacheKey] = idxes + cache.isGenColumn[cacheKey] = colIndexfilters - return cols, rows, idxes + return cols, rows, idxes, nil } diff --git a/syncer/syncer.go b/syncer/syncer.go index 7a519ac7c7..88c2121c43 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -97,6 +97,7 @@ type Syncer struct { tables map[string]*table // table cache: `target-schema`.`target-table` -> table cacheColumns map[string][]string // table columns cache: `target-schema`.`target-table` -> column names list + genColsCache *GenColCache fromDB *Conn toDBs []*Conn @@ -165,6 +166,7 @@ func NewSyncer(cfg *config.SubTaskConfig) *Syncer { syncer.count.Set(0) syncer.tables = make(map[string]*table) syncer.cacheColumns = make(map[string][]string) + syncer.genColsCache = NewGenColCache() syncer.c = newCausality() syncer.tableRouter, _ = router.NewTableRouter(cfg.CaseSensitive, []*router.TableRule{}) syncer.done = make(chan struct{}) @@ -495,11 +497,13 @@ func (s *Syncer) clearTables(schema, table string) { key := dbutil.TableName(schema, table) delete(s.tables, key) delete(s.cacheColumns, key) + s.genColsCache.clearTable(schema, table) } func (s *Syncer) clearAllTables() { s.tables = make(map[string]*table) s.cacheColumns = make(map[string][]string) + s.genColsCache.reset() } func (s *Syncer) getTableFromDB(db *Conn, schema string, name string) (*table, error) { @@ -1142,7 +1146,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if err != nil { return errors.Trace(err) } - tblColumns, rowData, tblIndexColumns := pruneGeneratedColumnDML(table.columns, rows, table.indexColumns) + tblColumns, rowData, tblIndexColumns, err := pruneGeneratedColumnDML(table.columns, rows, table.indexColumns, schemaName, tableName, s.genColsCache) + if err != nil { + return errors.Trace(err) + } var ( applied bool diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 7000cfcd21..17b26622ca 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -853,7 +853,7 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { args [][]interface{} ) - tblColumns, rowData, tblIndexColumns := pruneGeneratedColumnDML(table.columns, ev.Rows, table.indexColumns) + tblColumns, rowData, tblIndexColumns, err := pruneGeneratedColumnDML(table.columns, ev.Rows, table.indexColumns, table.schema, table.name, syncer.genColsCache) switch e.Header.EventType { case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: sqls, _, args, err = genInsertSQLs(table.schema, table.name, rowData, tblColumns, tblIndexColumns) diff --git a/tests/all_mode/data/db1.increment.sql b/tests/all_mode/data/db1.increment.sql index c9c3ae129a..d9481e5b02 100644 --- a/tests/all_mode/data/db1.increment.sql +++ b/tests/all_mode/data/db1.increment.sql @@ -2,8 +2,14 @@ use all_mode; insert into t1 (id, name) values (3, 'Eddard Stark'); update t1 set name = 'Arya Stark' where id = 1; update t1 set name = 'Catelyn Stark' where name = 'catelyn'; + +-- test multi column index with generated column alter table t1 add column info json; alter table t1 add column gen_id int as (info->"$.id"); alter table t1 add index multi_col(`id`, `gen_id`); insert into t1 (id, name, info) values (4, 'gentest', '{"id": 123}'); insert into t1 (id, name, info) values (5, 'gentest', '{"id": 124}'); + +-- test genColumnCache is reset after ddl +alter table t1 add column info2 varchar(40); +insert into t1 (id, name, info) values (6, 'gentest', '{"id": 125, "test cache": false}'); From 091443cd251562f3af5ab220f15354b857682b7e Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 22:12:14 +0800 Subject: [PATCH 15/19] *: fix golint, add more comments --- syncer/dml.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index fdc8a71ec3..478641a6ac 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -26,21 +26,30 @@ import ( "github.com/pingcap/errors" ) -type GenColumnCacheStatus uint8 +type genColumnCacheStatus uint8 const ( - NotFound GenColumnCacheStatus = iota + genColumnNoCache genColumnCacheStatus = iota hasGenColumn noGenColumn ) +// GenColCache stores generated column information for all tables type GenColCache struct { + // `schema`.`table` -> whether this table has generated column hasGenColumn map[string]bool - columns map[string][]*column - indexes map[string]map[string][]*column - isGenColumn map[string][]bool + + // `schema`.`table` -> column list + columns map[string][]*column + + // `schema`.`table` -> tableIndex information + indexes map[string]map[string][]*column + + // `schema`.`table` -> a bool slice representing whether it is generated for each column + isGenColumn map[string][]bool } +// NewGenColCache creates a GenColCache. func NewGenColCache() *GenColCache { c := &GenColCache{} c.reset() @@ -51,12 +60,12 @@ func NewGenColCache() *GenColCache { // information cached, otherwise returns `hasGenColumn` if cache found and // it has generated column and returns `noGenColumn` if it has no generated column. func (c *GenColCache) status(key string) GenColumnCacheStatus { - if val, ok := c.hasGenColumn[key]; !ok { - return NotFound - } else { - if val { - return hasGenColumn - } + val, ok := c.hasGenColumn[key] + if !ok { + return genColumnNoCache + } + if val { + return hasGenColumn } return noGenColumn } From 7a027b39e92bbaf18f7a721b210577be3e3f7b0f Mon Sep 17 00:00:00 2001 From: amyangfei Date: Mon, 18 Feb 2019 22:17:26 +0800 Subject: [PATCH 16/19] *: typo fix --- syncer/dml.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/dml.go b/syncer/dml.go index 478641a6ac..05d46b97de 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -59,7 +59,7 @@ func NewGenColCache() *GenColCache { // status returns `NotFound` if a `schema`.`table` has no generated column // information cached, otherwise returns `hasGenColumn` if cache found and // it has generated column and returns `noGenColumn` if it has no generated column. -func (c *GenColCache) status(key string) GenColumnCacheStatus { +func (c *GenColCache) status(key string) genColumnCacheStatus { val, ok := c.hasGenColumn[key] if !ok { return genColumnNoCache From cb7dd1ea083257182c273158950c416e06ff3854 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 20 Feb 2019 09:56:11 +0800 Subject: [PATCH 17/19] *: lazy allocate memory --- syncer/dml.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index 05d46b97de..84d8d10709 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -491,13 +491,10 @@ func (s *Syncer) mappingDML(schema, table string, columns []string, data [][]int // list, data list and all indexes including generated columns. func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[string][]*column, schema, table string, cache *GenColCache) ([]*column, [][]interface{}, map[string][]*column, error) { var ( - needPrune bool - colIndexfilters = make([]bool, 0, len(columns)) - genColumnNames = make(map[string]bool) + cacheKey = dbutil.TableName(schema, table) + cacheStatus = cache.status(cacheKey) ) - cacheKey := dbutil.TableName(schema, table) - cacheStatus := cache.status(cacheKey) if cacheStatus == noGenColumn { return columns, data, index, nil } @@ -519,6 +516,12 @@ func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[ return cache.columns[cacheKey], rows, cache.indexes[cacheKey], nil } + var ( + needPrune bool + colIndexfilters = make([]bool, 0, len(columns)) + genColumnNames = make(map[string]bool) + ) + for _, c := range columns { isGenColumn := c.isGeneratedColumn() colIndexfilters = append(colIndexfilters, isGenColumn) From 5464994a84f61a0872373c7ef02a2d9bcecb6357 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 20 Feb 2019 09:59:44 +0800 Subject: [PATCH 18/19] *: add one more error check in unit test --- syncer/syncer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 17b26622ca..bc8980b0cd 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -854,6 +854,7 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { ) tblColumns, rowData, tblIndexColumns, err := pruneGeneratedColumnDML(table.columns, ev.Rows, table.indexColumns, table.schema, table.name, syncer.genColsCache) + c.Assert(err, IsNil) switch e.Header.EventType { case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: sqls, _, args, err = genInsertSQLs(table.schema, table.name, rowData, tblColumns, tblIndexColumns) From d731ce15b07b2d830b45f668149248ed6a1f56b4 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 20 Feb 2019 15:07:11 +0800 Subject: [PATCH 19/19] *: address comment, test key in map --- syncer/dml.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index 84d8d10709..70ca9145c8 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -500,10 +500,18 @@ func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[ } if cacheStatus == hasGenColumn { rows := make([][]interface{}, 0, len(data)) - filters, ok := cache.isGenColumn[cacheKey] - if !ok { + filters, ok1 := cache.isGenColumn[cacheKey] + if !ok1 { return nil, nil, nil, errors.NotFoundf("cache key %s in isGenColumn", cacheKey) } + cols, ok2 := cache.columns[cacheKey] + if !ok2 { + return nil, nil, nil, errors.NotFoundf("cache key %s in columns", cacheKey) + } + idxes, ok3 := cache.indexes[cacheKey] + if !ok3 { + return nil, nil, nil, errors.NotFoundf("cache key %s in indexes", cacheKey) + } for _, row := range data { value := make([]interface{}, 0, len(row)) for i := range row { @@ -513,7 +521,7 @@ func pruneGeneratedColumnDML(columns []*column, data [][]interface{}, index map[ } rows = append(rows, value) } - return cache.columns[cacheKey], rows, cache.indexes[cacheKey], nil + return cols, rows, idxes, nil } var (