Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer/: fix flush checkpoint for ddls for situations that IsSharding is false(#587) #624

Merged
merged 11 commits into from
Apr 27, 2020
1 change: 1 addition & 0 deletions dm/master/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (t *testHTTPServer) startServer(c *check.C) {
c.Assert(err, check.IsNil)
}()

time.Sleep(time.Second) // wait server fully started
err := t.waitUntilServerOnline()
c.Assert(err, check.IsNil)
}
Expand Down
30 changes: 22 additions & 8 deletions syncer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ func (t opType) String() string {
}

type job struct {
tp opType
sourceSchema string
sourceTable string
tp opType
// ddl in ShardOptimistic and ShardPessimistic will only affect one table at one time but for normal node
// we don't have this limit. So we should update multi tables in normal mode.
// sql example: drop table `s1`.`t1`, `s2`.`t2`.
sourceTbl map[string][]string
targetSchema string
targetTable string
sql string
Expand Down Expand Up @@ -89,8 +91,7 @@ func newJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql
}
return &job{
tp: tp,
sourceSchema: sourceSchema,
sourceTable: sourceTable,
sourceTbl: map[string][]string{sourceSchema: {sourceTable}},
targetSchema: targetSchema,
targetTable: targetTable,
sql: sql,
Expand All @@ -104,7 +105,10 @@ func newJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql
}
}

func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, pos, cmdPos mysql.Position, currentGtidSet gtid.Set, ddlExecItem *DDLExecItem, traceID string) *job {
// newDDL job is used to create a new ddl job
// when cfg.ShardMode == "", ddlInfo == nil,sourceTbls != nil, we use sourceTbls to record ddl affected tables.
// when cfg.ShardMode == ShardOptimistic || ShardPessimistic, ddlInfo != nil, sourceTbls == nil.
func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, pos, cmdPos mysql.Position, currentGtidSet gtid.Set, ddlExecItem *DDLExecItem, traceID string, sourceTbls map[string]map[string]struct{}) *job {
var gs gtid.Set
if currentGtidSet != nil {
gs = currentGtidSet.Clone()
Expand All @@ -120,10 +124,20 @@ func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, pos, cmdPos mysql.Positi
}

if ddlInfo != nil {
j.sourceSchema = ddlInfo.tableNames[0][0].Schema
j.sourceTable = ddlInfo.tableNames[0][0].Name
j.sourceTbl = map[string][]string{ddlInfo.tableNames[0][0].Schema: {ddlInfo.tableNames[0][0].Name}}
j.targetSchema = ddlInfo.tableNames[1][0].Schema
j.targetTable = ddlInfo.tableNames[1][0].Name
} else if sourceTbls != nil {
sourceTbl := make(map[string][]string, len(sourceTbls))
for schema, tbMap := range sourceTbls {
if len(tbMap) > 0 {
sourceTbl[schema] = make([]string, 0, len(tbMap))
}
for name := range tbMap {
sourceTbl[schema] = append(sourceTbl[schema], name)
}
}
j.sourceTbl = sourceTbl
}

if ddlExecItem != nil && ddlExecItem.req != nil {
Expand Down
2 changes: 1 addition & 1 deletion syncer/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (t *testJobSuite) TestJob(c *C) {
newJob(insert, "test", "t1", "test", "t1", "insert into test.t1 values(?)", []interface{}{1}, "1", mysql.Position{}, mysql.Position{}, nil, ""),
"tp: insert, sql: insert into test.t1 values(?), args: [1], key: 1, ddls: [], last_pos: (, 0), current_pos: (, 0), gtid:<nil>",
}, {
newDDLJob(ddlInfo, []string{"create database test"}, mysql.Position{}, mysql.Position{}, nil, ddlExecItem, ""),
newDDLJob(ddlInfo, []string{"create database test"}, mysql.Position{}, mysql.Position{}, nil, ddlExecItem, "", nil),
"tp: ddl, sql: , args: [], key: , ddls: [create database test], last_pos: (, 0), current_pos: (, 0), gtid:<nil>",
}, {
newXIDJob(mysql.Position{}, mysql.Position{}, nil, ""),
Expand Down
43 changes: 32 additions & 11 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,17 +801,31 @@ func (s *Syncer) addJob(job *job) error {

switch job.tp {
case ddl:
failpoint.Inject("ExitAfterDDLBeforeFlush", func() {
s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ExitAfterDDLBeforeFlush"))
utils.OsExit(1)
})
// only save checkpoint for DDL and XID (see above)
s.saveGlobalPoint(job.pos)
if len(job.sourceSchema) > 0 {
s.checkpoint.SaveTablePoint(job.sourceSchema, job.sourceTable, job.pos)
for sourceSchema, tbs := range job.sourceTbl {
if len(sourceSchema) == 0 {
continue
}
for _, sourceTable := range tbs {
s.checkpoint.SaveTablePoint(sourceSchema, sourceTable, job.pos)
}
}
// reset sharding group after checkpoint saved
s.resetShardingGroup(job.targetSchema, job.targetTable)
case insert, update, del:
// save job's current pos for DML events
if len(job.sourceSchema) > 0 {
s.checkpoint.SaveTablePoint(job.sourceSchema, job.sourceTable, job.currentPos)
for sourceSchema, tbs := range job.sourceTbl {
if len(sourceSchema) == 0 {
continue
}
for _, sourceTable := range tbs {
s.checkpoint.SaveTablePoint(sourceSchema, sourceTable, job.currentPos)
}
}
}

Expand Down Expand Up @@ -1653,6 +1667,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
var (
ddlInfo *shardingDDLInfo
needHandleDDLs []string
sourceTbls = make(map[string]map[string]struct{}) // db name -> tb name
targetTbls = make(map[string]*filter.Table)
)
for _, sql := range sqls {
Expand Down Expand Up @@ -1712,6 +1727,9 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
}

needHandleDDLs = append(needHandleDDLs, sqlDDL)
// TODO: current table checkpoints will be deleted in track ddls, but created and updated in flush checkpoints,
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
// we should use a better mechanism to combine these operations
recordSourceTbls(sourceTbls, stmt, tableNames[0][0])
targetTbls[tableNames[1][0].String()] = tableNames[1][0]
}

Expand All @@ -1729,6 +1747,11 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
*ec.traceID = traceEvent.Base.TraceID
}

// flush previous DMLs and checkpoint if needing to handle the DDL.
// NOTE: do this flush before operations on shard groups which may lead to skip a table caused by `UnresolvedTables`.
if err := s.flushJobs(); err != nil {
return err
}
if !s.cfg.IsSharding {
s.tctx.L().Info("start to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))
// try apply SQL operator before addJob. now, one query event only has one DDL job, if updating to multi DDL jobs, refine this.
Expand All @@ -1741,7 +1764,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
needHandleDDLs = appliedSQLs // maybe nil
}

job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID)
job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID, sourceTbls)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
err = s.addJobFunc(job)
if err != nil {
return err
Expand All @@ -1753,14 +1776,12 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query)
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))

for _, tbl := range targetTbls {
s.clearTables(tbl.Schema, tbl.Name)
// save checkpoint of each table
s.checkpoint.SaveTablePoint(tbl.Schema, tbl.Name, *ec.currentPos)
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))

for _, table := range onlineDDLTableNames {
s.tctx.L().Info("finish online ddl and clear online ddl metadata in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.String("schema", table.Schema), zap.String("table", table.Name))
err = s.onlineDDL.Finish(ec.tctx, table.Schema, table.Name)
Expand Down Expand Up @@ -1861,7 +1882,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
}

// Don't send new DDLInfo to dm-master until all local sql jobs finished
s.jobWg.Wait()
// since jobWg is flushed by flushJobs before, we don't wait here any more

// NOTE: if we need singleton Syncer (without dm-master) to support sharding DDL sync
// we should add another config item to differ, and do not save DDLInfo, and not wait for ddlExecInfo
Expand Down Expand Up @@ -1929,7 +1950,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
s.tctx.L().Info("replace ddls to preset ddls by sql operator in shard mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Stringer("start position", startPos), log.WrapStringerField("end position", ec.currentPos))
needHandleDDLs = appliedSQLs // maybe nil
}
job := newDDLJob(ddlInfo, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, ddlExecItem, *ec.traceID)
job := newDDLJob(ddlInfo, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, ddlExecItem, *ec.traceID, nil)
err = s.addJobFunc(job)
if err != nil {
return err
Expand Down
43 changes: 41 additions & 2 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser.
return utils.GetParser(db, false)
}

func (s *testSyncerSuite) mockCheckPointMeta(checkPointMock sqlmock.Sqlmock) {
checkPointMock.ExpectBegin()
checkPointMock.ExpectExec(fmt.Sprintf("DELETE FROM `%s`.`%s_syncer_sharding_meta", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectCommit()
}

func (s *testSyncerSuite) mockCheckPointCreate(checkPointMock sqlmock.Sqlmock) {
checkPointMock.ExpectBegin()
checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down Expand Up @@ -1262,8 +1268,11 @@ func (s *testSyncerSuite) TestSharding(c *C) {
AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"))

// mock checkpoint db after create db table1 table2
s.mockCheckPointMeta(checkPointMock)
s.mockCheckPointCreate(checkPointMock)
s.mockCheckPointMeta(checkPointMock)
s.mockCheckPointCreate(checkPointMock)
s.mockCheckPointMeta(checkPointMock)
s.mockCheckPointCreate(checkPointMock)

// mock downstream db result
Expand All @@ -1286,6 +1295,21 @@ func (s *testSyncerSuite) TestSharding(c *C) {
"Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"},
).AddRow("st", 0, "PRIMARY", 1, "id", "A", 0, null, null, null, "BTREE", "", ""))

// before first ddl, we only flush global checkpoint
checkPointMock.ExpectBegin()
checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectCommit()
// before second ddl, we flush the saved table checkpoint t1
checkPointMock.ExpectBegin()
checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectCommit()
// after ddl is synced, we flush global checkpoint and saved table point t2
checkPointMock.ExpectBegin()
checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectExec(fmt.Sprintf("DELETE FROM `%s`.`%s_syncer_sharding_meta", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectCommit()

// mock expect sql
for i, expectSQL := range _case.expectSQLS {
mock.ExpectBegin()
Expand All @@ -1299,7 +1323,6 @@ func (s *testSyncerSuite) TestSharding(c *C) {
sqlmock.NewRows([]string{"Table", "Non_unique", "Key_name", "Seq_in_index", "Column_name",
"Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"},
).AddRow("st", 0, "PRIMARY", 1, "id", "A", 0, null, null, null, "BTREE", "", ""))
s.mockCheckPointFlush(checkPointMock)
} else {
// change insert to replace because of safe mode
mock.ExpectExec(expectSQL.sql).WithArgs(expectSQL.args...).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down Expand Up @@ -1363,7 +1386,6 @@ func (s *testSyncerSuite) TestRun(c *C) {
// 3. check the generated jobs
// 4. update config, add route rules, and update syncer
// 5. execute somes sqls and then check jobs generated

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
dbConn, err := db.Conn(context.Background())
Expand Down Expand Up @@ -1462,14 +1484,27 @@ func (s *testSyncerSuite) TestRun(c *C) {
go syncer.Process(ctx, resultCh)

expectJobs1 := []*expectJob{
// now every ddl job will start with a flush job
{
flush,
"",
nil,
}, {
ddl,
"CREATE DATABASE IF NOT EXISTS `test_1`",
nil,
}, {
flush,
"",
nil,
}, {
ddl,
"CREATE TABLE IF NOT EXISTS `test_1`.`t_1` (`id` INT PRIMARY KEY,`name` VARCHAR(24))",
nil,
}, {
flush,
"",
nil,
}, {
ddl,
"CREATE TABLE IF NOT EXISTS `test_1`.`t_2` (`id` INT PRIMARY KEY,`name` VARCHAR(24))",
Expand All @@ -1478,6 +1513,10 @@ func (s *testSyncerSuite) TestRun(c *C) {
insert,
"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)",
[]interface{}{int64(580981944116838401), "a"},
}, {
flush,
"",
nil,
}, {
ddl,
"ALTER TABLE `test_1`.`t_1` ADD INDEX `index1`(`name`)",
Expand Down
25 changes: 25 additions & 0 deletions syncer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -95,3 +96,27 @@ func getDBConfigFromEnv() config.DBConfig {
Port: port,
}
}

// record source tbls record the tables that need to flush checkpoints
func recordSourceTbls(sourceTbls map[string]map[string]struct{}, stmt ast.StmtNode, table *filter.Table) {
schema, name := table.Schema, table.Name
switch stmt.(type) {
// these ddls' relative table checkpoints will be deleted during track ddl,
// so we shouldn't flush these checkpoints
case *ast.DropDatabaseStmt:
delete(sourceTbls, schema)
case *ast.DropTableStmt:
if _, ok := sourceTbls[schema]; ok {
delete(sourceTbls[schema], name)
}
// these ddls won't update schema tracker, no need to update them
case *ast.LockTablesStmt, *ast.UnlockTablesStmt, *ast.CleanupTableLockStmt, *ast.TruncateTableStmt:
break
// flush other tables schema tracker info into checkpoint
default:
if _, ok := sourceTbls[schema]; !ok {
sourceTbls[schema] = make(map[string]struct{})
}
sourceTbls[schema][name] = struct{}{}
}
}
24 changes: 24 additions & 0 deletions syncer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/filter"
_ "github.com/pingcap/tidb/types/parser_driver"
)

Expand Down Expand Up @@ -123,3 +124,26 @@ func (t *testUtilSuite) TestTableNameResultSet(c *C) {
c.Assert(schema, Equals, "test")
c.Assert(table, Equals, "t1")
}

func (t *testUtilSuite) TestRecordSourceTbls(c *C) {
sourceTbls := make(map[string]map[string]struct{})

recordSourceTbls(sourceTbls, &ast.CreateDatabaseStmt{}, &filter.Table{Schema: "a", Name: ""})
c.Assert(sourceTbls, HasKey, "a")
c.Assert(sourceTbls["a"], HasKey, "")

recordSourceTbls(sourceTbls, &ast.CreateTableStmt{}, &filter.Table{Schema: "a", Name: "b"})
c.Assert(sourceTbls, HasKey, "a")
c.Assert(sourceTbls["a"], HasKey, "b")

recordSourceTbls(sourceTbls, &ast.DropTableStmt{}, &filter.Table{Schema: "a", Name: "b"})
_, ok := sourceTbls["a"]["b"]
c.Assert(ok, IsFalse)

recordSourceTbls(sourceTbls, &ast.CreateTableStmt{}, &filter.Table{Schema: "a", Name: "c"})
c.Assert(sourceTbls, HasKey, "a")
c.Assert(sourceTbls["a"], HasKey, "c")

recordSourceTbls(sourceTbls, &ast.DropDatabaseStmt{}, &filter.Table{Schema: "a", Name: ""})
c.Assert(sourceTbls, HasLen, 0)
}
3 changes: 3 additions & 0 deletions tests/sequence_sharding/data/db1.increment.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ update t2 set d = 200;
update t1 set c = 101;
update t2 set c = 102;
insert into t1 (uid,name,c) values(100004,'VALUES',191472878),(100005,'jAPlnzXli',1091218279);

alter table t1 add column f int;
alter table t2 add column f int;
4 changes: 4 additions & 0 deletions tests/sequence_sharding/data/db1.increment2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
use `sharding_seq`;
insert into t1 (uid,name,c,d,e,f) values (15, "i", 15, 15, 15, 15);
alter table t1 drop column f;
alter table t2 drop column f;
4 changes: 4 additions & 0 deletions tests/sequence_sharding/data/db2.increment.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ update t4 set d = 200;
update t4 set uid=uid+100000;
insert into t2 (uid,name,c) values(300003,'nvWgBf',73),(300004,'nD1000',4029);
insert into t3 (uid,name,c) values(400004,'1000',1000);

alter table t2 add column f int;
alter table t3 add column f int;
alter table t4 add column f int;
5 changes: 5 additions & 0 deletions tests/sequence_sharding/data/db2.increment2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use `sharding_seq`;
insert into t2 (uid,name,c,d,e,f) values (16, "j", 16, 16, 16, 16);
alter table t2 drop column f;
alter table t3 drop column f;
alter table t4 drop column f;
Loading