From c61426375900b0bf87f0175aeb7e9b3a08d43977 Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Fri, 26 Mar 2021 10:56:54 +0800 Subject: [PATCH] cherry pick #1518 to release-2.0 Signed-off-by: ti-srebot --- _utils/terror_gen/errors_release.txt | 1 + dm/master/server_test.go | 1 + dm/master/shardddl/optimist.go | 84 +++++-- dm/master/shardddl/optimist_test.go | 130 ++++++++++- errors.toml | 6 + pkg/shardddl/optimism/info.go | 5 + pkg/shardddl/optimism/info_test.go | 4 + pkg/shardddl/optimism/keeper.go | 33 ++- pkg/shardddl/optimism/keeper_test.go | 62 +++++ pkg/shardddl/optimism/lock.go | 33 ++- pkg/shardddl/optimism/lock_test.go | 328 +++++++++++++++++---------- pkg/shardddl/optimism/ops_test.go | 2 + pkg/terror/error_list.go | 2 + syncer/schema.go | 3 +- syncer/shardddl/optimist_test.go | 2 + tests/shardddl1/run.sh | 65 ++++++ 16 files changed, 610 insertions(+), 151 deletions(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 1c8f20f246..9f1e7158b1 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -372,6 +372,7 @@ ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "Message: source bound is changed too frequently, last old bound %s:, new bound %s, Workaround: Please try again later" ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation." ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d" +ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table-info-before not exist in optimistic ddls: %v" ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set" ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag" ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format." diff --git a/dm/master/server_test.go b/dm/master/server_test.go index b56f1d9bf0..e2a9fba7e6 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -637,6 +637,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, false) ) + st1.AddTable("foo-1", "bar-1", schema, table) _, err = optimism.PutSourceTables(etcdTestCli, st1) c.Assert(err, check.IsNil) _, err = optimism.PutInfo(etcdTestCli, info1) diff --git a/dm/master/shardddl/optimist.go b/dm/master/shardddl/optimist.go index 70067ed91e..edf7099be8 100644 --- a/dm/master/shardddl/optimist.go +++ b/dm/master/shardddl/optimist.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb-tools/pkg/schemacmp" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -33,6 +34,7 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/shardddl/optimism" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) // Optimist is used to coordinate the shard DDL migration in optimism mode. @@ -255,29 +257,83 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e return revSource, revInfo, revOperation, nil } -// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation. -func (o *Optimist) recoverLocks( - ifm map[string]map[string]map[string]map[string]optimism.Info, - opm map[string]map[string]map[string]map[string]optimism.Operation) error { - // construct locks based on the shard DDL info. - for task, ifTask := range ifm { +// sortInfos sort all infos by revision +func sortInfos(ifm map[string]map[string]map[string]map[string]optimism.Info) []optimism.Info { + infos := make([]optimism.Info, 0, len(ifm)) + + for _, ifTask := range ifm { for _, ifSource := range ifTask { for _, ifSchema := range ifSource { for _, info := range ifSchema { - tts := o.tk.FindTables(task, info.DownSchema, info.DownTable) - _, _, err := o.lk.TrySync(info, tts) - if err != nil { - return err + infos = append(infos, info) + } + } + } + } + + // sort according to the Revision + sort.Slice(infos, func(i, j int) bool { + return infos[i].Revision < infos[j].Revision + }) + return infos +} + +// buildLockJoinedAndTTS build joined table and target table slice for lock by history infos +func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]map[string]optimism.Info) (map[string]schemacmp.Table, map[string][]optimism.TargetTable) { + lockJoined := make(map[string]schemacmp.Table) + lockTTS := make(map[string][]optimism.TargetTable) + + for _, taskInfos := range ifm { + for _, sourceInfos := range taskInfos { + for _, schemaInfos := range sourceInfos { + for _, info := range schemaInfos { + lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable) + if joined, ok := lockJoined[lockID]; !ok { + lockJoined[lockID] = schemacmp.Encode(info.TableInfoBefore) + } else { + newJoined, err := joined.Join(schemacmp.Encode(info.TableInfoBefore)) + // ignore error, will report it in TrySync later + if err != nil { + o.logger.Error(fmt.Sprintf("fail to join table info %s with %s, lockID: %s in recover lock", joined, newJoined, lockID), log.ShortError(err)) + } else { + lockJoined[lockID] = newJoined + } } - // never mark the lock operation from `done` to `not-done` when recovering. - err = o.handleLock(info, tts, true) - if err != nil { - return err + if _, ok := lockTTS[lockID]; !ok { + lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable) } } } } } + return lockJoined, lockTTS +} + +// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation. +func (o *Optimist) recoverLocks( + ifm map[string]map[string]map[string]map[string]optimism.Info, + opm map[string]map[string]map[string]map[string]optimism.Operation) error { + // construct joined table based on the shard DDL info. + o.logger.Info("build lock joined and tts") + lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm) + // build lock and restore table info + o.logger.Info("rebuild locks and tables") + o.lk.RebuildLocksAndTables(ifm, lockJoined, lockTTS) + // sort infos by revision + infos := sortInfos(ifm) + + for _, info := range infos { + tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable) + _, _, err := o.lk.TrySync(info, tts) + if err != nil { + return err + } + // never mark the lock operation from `done` to `not-done` when recovering. + err = o.handleLock(info, tts, true) + if err != nil { + return err + } + } // update the done status of the lock. for _, opTask := range opm { diff --git a/dm/master/shardddl/optimist_test.go b/dm/master/shardddl/optimist_test.go index a8c73ed3bb..05e859144f 100644 --- a/dm/master/shardddl/optimist_test.go +++ b/dm/master/shardddl/optimist_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb-tools/pkg/schemacmp" tiddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/mock" @@ -148,6 +149,7 @@ func (t *testOptimist) TestOptimist(c *C) { t.testOptimist(c, noRestart) t.testOptimist(c, restartOnly) t.testOptimist(c, restartNewInstance) + t.testSortInfos(c) } func (t *testOptimist) testOptimist(c *C, restart int) { @@ -656,14 +658,13 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) { tblID int64 = 111 DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"} DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c1 DATETIME"} - DDLs3 = []string{"ALTER TABLE bar DROP COLUMN c1"} ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`) ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`) ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 DATETIME)`) ti3 = ti0 i1 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}) i2 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti2}) - i3 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs3, ti2, []*model.TableInfo{ti3}) + i3 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti3}) ) st1.AddTable("foo", "bar-1", downSchema, downTable) @@ -714,6 +715,7 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) { c.Assert(len(errCh), Equals, 0) // PUT i3, no conflict now. + // case for handle-error replace rev3, err := optimism.PutInfo(etcdTestCli, i3) c.Assert(err, IsNil) // wait operation for i3 become available. @@ -1029,3 +1031,127 @@ func (t *testOptimist) TestOptimistInitSchema(c *C) { c.Assert(err, IsNil) c.Assert(is.TableInfo, DeepEquals, ti1) // the init schema is ti1 now. } + +func (t *testOptimist) testSortInfos(c *C) { + defer clearOptimistTestSourceInfoOperation(c) + + var ( + task = "test-optimist-init-schema" + sources = []string{"mysql-replica-1", "mysql-replica-2"} + upSchema = "foo" + upTables = []string{"bar-1", "bar-2"} + downSchema = "foo" + downTable = "bar" + + p = parser.New() + se = mock.NewContext() + tblID int64 = 111 + DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"} + DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"} + ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`) + ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`) + ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT, c2 INT)`) + i11 = optimism.NewInfo(task, sources[0], upSchema, upTables[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}) + i12 = optimism.NewInfo(task, sources[0], upSchema, upTables[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}) + i21 = optimism.NewInfo(task, sources[1], upSchema, upTables[1], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}) + ) + + rev1, err := optimism.PutInfo(etcdTestCli, i11) + c.Assert(err, IsNil) + ifm, _, err := optimism.GetAllInfo(etcdTestCli) + c.Assert(err, IsNil) + infos := sortInfos(ifm) + c.Assert(len(infos), Equals, 1) + i11.Version = 1 + i11.Revision = rev1 + c.Assert(infos[0], DeepEquals, i11) + + rev2, err := optimism.PutInfo(etcdTestCli, i12) + c.Assert(err, IsNil) + ifm, _, err = optimism.GetAllInfo(etcdTestCli) + c.Assert(err, IsNil) + infos = sortInfos(ifm) + c.Assert(len(infos), Equals, 2) + i11.Version = 1 + i11.Revision = rev1 + i12.Version = 1 + i12.Revision = rev2 + c.Assert(infos[0], DeepEquals, i11) + c.Assert(infos[1], DeepEquals, i12) + + rev3, err := optimism.PutInfo(etcdTestCli, i21) + c.Assert(err, IsNil) + rev4, err := optimism.PutInfo(etcdTestCli, i11) + c.Assert(err, IsNil) + ifm, _, err = optimism.GetAllInfo(etcdTestCli) + c.Assert(err, IsNil) + infos = sortInfos(ifm) + c.Assert(len(infos), Equals, 3) + + i11.Version = 2 + i11.Revision = rev4 + i12.Version = 1 + i12.Revision = rev2 + i21.Version = 1 + i21.Revision = rev3 + c.Assert(infos[0], DeepEquals, i12) + c.Assert(infos[1], DeepEquals, i21) + c.Assert(infos[2], DeepEquals, i11) +} + +func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) { + defer clearOptimistTestSourceInfoOperation(c) + + var ( + logger = log.L() + o = NewOptimist(&logger) + task = "task" + source1 = "mysql-replica-1" + source2 = "mysql-replica-2" + downSchema = "db" + downTable = "tbl" + st1 = optimism.NewSourceTables(task, source1) + st2 = optimism.NewSourceTables(task, source2) + DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} + DDLs2 = []string{"ALTER TABLE bar DROP COLUMN c1"} + p = parser.New() + se = mock.NewContext() + tblID int64 = 111 + ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`) + ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`) + ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`) + ti3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c2 INT)`) + + i11 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}) + i21 = optimism.NewInfo(task, source2, "foo", "bar-1", downSchema, downTable, DDLs2, ti2, []*model.TableInfo{ti3}) + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + st1.AddTable("db", "tbl-1", downSchema, downTable) + st2.AddTable("db", "tbl-1", downSchema, downTable) + + c.Assert(o.Start(ctx, etcdTestCli), IsNil) + _, err := optimism.PutSourceTables(etcdTestCli, st1) + c.Assert(err, IsNil) + _, err = optimism.PutSourceTables(etcdTestCli, st2) + c.Assert(err, IsNil) + + _, err = optimism.PutInfo(etcdTestCli, i21) + c.Assert(err, IsNil) + _, err = optimism.PutInfo(etcdTestCli, i11) + c.Assert(err, IsNil) + + ifm, _, err := optimism.GetAllInfo(etcdTestCli) + c.Assert(err, IsNil) + + lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm) + c.Assert(len(lockJoined), Equals, 1) + c.Assert(len(lockTTS), Equals, 1) + joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)] + c.Assert(ok, IsTrue) + cmp, err := joined.Compare(schemacmp.Encode(ti2)) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, 0) +} diff --git a/errors.toml b/errors.toml index 4a77f15f7e..cee9992a9c 100644 --- a/errors.toml +++ b/errors.toml @@ -2242,6 +2242,12 @@ description = "" workaround = "" tags = ["internal", "high"] +[error.DM-dm-master-38055] +message = "table-info-before not exist in optimistic ddls: %v" +description = "" +workaround = "" +tags = ["internal", "high"] + [error.DM-dm-worker-40001] message = "parse dm-worker config flag set" description = "" diff --git a/pkg/shardddl/optimism/info.go b/pkg/shardddl/optimism/info.go index 38c11918d6..0e07d31e94 100644 --- a/pkg/shardddl/optimism/info.go +++ b/pkg/shardddl/optimism/info.go @@ -54,6 +54,10 @@ type Info struct { // only set it when get/watch from etcd Version int64 `json:"-"` + // only set it when get from etcd + // use for sort infos in recoverlock + Revision int64 `json:"-"` + // use to resolve conflict IgnoreConflict bool `json:"ignore-conflict"` } @@ -132,6 +136,7 @@ func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[stri return nil, 0, err2 } info.Version = kv.Version + info.Revision = kv.ModRevision if _, ok := ifm[info.Task]; !ok { ifm[info.Task] = make(map[string]map[string]map[string]Info) diff --git a/pkg/shardddl/optimism/info_test.go b/pkg/shardddl/optimism/info_test.go index 12b824c5bb..4b8836c1e2 100644 --- a/pkg/shardddl/optimism/info_test.go +++ b/pkg/shardddl/optimism/info_test.go @@ -128,6 +128,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) { c.Assert(ifm[task1][source1][upSchema], HasLen, 1) i11WithVer := i11 i11WithVer.Version = 2 + i11WithVer.Revision = ifm[task1][source1][upSchema][upTable].Revision c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer) // put another key and get again with 2 info. @@ -141,6 +142,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) { c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer) i12WithVer := i12 i12WithVer.Version = 1 + i12WithVer.Revision = ifm[task1][source2][upSchema][upTable].Revision c.Assert(ifm[task1][source2][upSchema][upTable], DeepEquals, i12WithVer) // start the watcher. @@ -210,8 +212,10 @@ func (t *testForEtcd) TestInfoEtcd(c *C) { c.Assert(ifm, HasKey, task1) c.Assert(ifm, HasKey, task2) c.Assert(ifm[task1], HasLen, 1) + i11WithVer.Revision = ifm[task1][source1][upSchema][upTable].Revision c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer) c.Assert(ifm[task2], HasLen, 1) + i21WithVer.Revision = ifm[task2][source1][upSchema][upTable].Revision c.Assert(ifm[task2][source1][upSchema][upTable], DeepEquals, i21WithVer) // watch the deletion for i12. diff --git a/pkg/shardddl/optimism/keeper.go b/pkg/shardddl/optimism/keeper.go index 5c6a9ec67e..26c514cc75 100644 --- a/pkg/shardddl/optimism/keeper.go +++ b/pkg/shardddl/optimism/keeper.go @@ -17,7 +17,9 @@ import ( "sort" "sync" + "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/tidb-tools/pkg/schemacmp" ) // LockKeeper used to keep and handle DDL lock conveniently. @@ -34,6 +36,31 @@ func NewLockKeeper() *LockKeeper { } } +// RebuildLocksAndTables rebuild the locks and tables +func (lk *LockKeeper) RebuildLocksAndTables( + ifm map[string]map[string]map[string]map[string]Info, + lockJoined map[string]schemacmp.Table, + lockTTS map[string][]TargetTable) { + var ( + lock *Lock + ok bool + ) + for _, taskInfos := range ifm { + for _, sourceInfos := range taskInfos { + for _, schemaInfos := range sourceInfos { + for _, info := range schemaInfos { + lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable) + if lock, ok = lk.locks[lockID]; !ok { + lk.locks[lockID] = NewLock(lockID, info.Task, info.DownSchema, info.DownTable, lockJoined[lockID], lockTTS[lockID]) + lock = lk.locks[lockID] + } + lock.tables[info.Source][info.UpSchema][info.UpTable] = schemacmp.Encode(info.TableInfoBefore) + } + } + } + } +} + // TrySync tries to sync the lock. func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, error) { var ( @@ -45,8 +72,12 @@ func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, e lk.mu.Lock() defer lk.mu.Unlock() + if info.TableInfoBefore == nil { + return "", nil, terror.ErrMasterOptimisticTableInfoBeforeNotExist.Generate(info.DDLs) + } + if l, ok = lk.locks[lockID]; !ok { - lk.locks[lockID] = NewLock(lockID, info.Task, info.DownSchema, info.DownTable, info.TableInfoBefore, tts) + lk.locks[lockID] = NewLock(lockID, info.Task, info.DownSchema, info.DownTable, schemacmp.Encode(info.TableInfoBefore), tts) l = lk.locks[lockID] } diff --git a/pkg/shardddl/optimism/keeper_test.go b/pkg/shardddl/optimism/keeper_test.go index e1d62423c3..f45a850970 100644 --- a/pkg/shardddl/optimism/keeper_test.go +++ b/pkg/shardddl/optimism/keeper_test.go @@ -15,8 +15,10 @@ package optimism import ( . "github.com/pingcap/check" + "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/parser" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-tools/pkg/schemacmp" "github.com/pingcap/tidb/util/mock" ) @@ -391,3 +393,63 @@ func (t *testKeeper) TestTargetTablesForTask(c *C) { }), }) } + +func (t *testKeeper) TestRebuildLocksAndTables(c *C) { + var ( + lk = NewLockKeeper() + task = "task" + source1 = "mysql-replica-1" + source2 = "mysql-replica-2" + upSchema = "foo" + upTable = "bar" + downSchema = "db" + downTable = "tbl" + DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} + DDLs2 = []string{"ALTER TABLE bar DROP COLUMN c1"} + p = parser.New() + se = mock.NewContext() + tblID int64 = 111 + ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`) + ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`) + ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`) + ti3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c2 INT)`) + + i11 = NewInfo(task, source1, upSchema, upTable, downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}) + i21 = NewInfo(task, source2, upSchema, upTable, downSchema, downTable, DDLs2, ti2, []*model.TableInfo{ti3}) + + tts = []TargetTable{ + newTargetTable(task, source1, downSchema, downTable, map[string]map[string]struct{}{upSchema: {upTable: struct{}{}}}), + newTargetTable(task, source2, downSchema, downTable, map[string]map[string]struct{}{upSchema: {upTable: struct{}{}}}), + } + + lockID = utils.GenDDLLockID(task, downSchema, downTable) + + ifm = map[string]map[string]map[string]map[string]Info{ + task: { + source1: {upSchema: {upTable: i11}}, + source2: {upSchema: {upTable: i21}}, + }, + } + lockJoined = map[string]schemacmp.Table{ + lockID: schemacmp.Encode(ti2), + } + lockTTS = map[string][]TargetTable{ + lockID: tts, + } + ) + + lk.RebuildLocksAndTables(ifm, lockJoined, lockTTS) + locks := lk.Locks() + c.Assert(len(locks), Equals, 1) + lock, ok := locks[lockID] + c.Assert(ok, IsTrue) + cmp, err := lock.Joined().Compare(schemacmp.Encode(ti2)) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, 0) + cmp, err = lock.tables[source1][upSchema][upTable].Compare(schemacmp.Encode(ti0)) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, 0) + cmp, err = lock.tables[source2][upSchema][upTable].Compare(schemacmp.Encode(ti2)) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, 0) +} diff --git a/pkg/shardddl/optimism/lock.go b/pkg/shardddl/optimism/lock.go index 51f6946b3e..3472b9e872 100644 --- a/pkg/shardddl/optimism/lock.go +++ b/pkg/shardddl/optimism/lock.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" - "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/schemacmp" "go.uber.org/zap" @@ -45,6 +44,7 @@ type Lock struct { // upstream source ID -> upstream schema name -> upstream table name -> table info. // if all of them are the same, then we call the lock `synced`. tables map[string]map[string]map[string]schemacmp.Table + synced bool // whether DDLs operations have done (execute the shard DDL) to the downstream. @@ -60,13 +60,13 @@ type Lock struct { // NewLock creates a new Lock instance. // NOTE: we MUST give the initial table info when creating the lock now. -func NewLock(ID, task, downSchema, downTable string, ti *model.TableInfo, tts []TargetTable) *Lock { +func NewLock(ID, task, downSchema, downTable string, joined schemacmp.Table, tts []TargetTable) *Lock { l := &Lock{ ID: ID, Task: task, DownSchema: downSchema, DownTable: downTable, - joined: schemacmp.Encode(ti), + joined: joined, tables: make(map[string]map[string]map[string]schemacmp.Table), done: make(map[string]map[string]map[string]bool), synced: true, @@ -100,6 +100,8 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro newTIs = info.TableInfosAfter infoVersion = info.Version ignoreConflict = info.IgnoreConflict + oldSynced = l.synced + emptyDDLs = []string{} ) l.mu.Lock() defer func() { @@ -111,7 +113,6 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro l.mu.Unlock() }() - oldSynced := l.synced defer func() { _, remain := l.syncStatus() l.synced = remain == 0 @@ -149,6 +150,10 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro return ddls, terror.ErrMasterInconsistentOptimisticDDLsAndInfo.Generate(len(ddls), len(newTIs)) } + // should not happen + if info.TableInfoBefore == nil { + return ddls, terror.ErrMasterOptimisticTableInfoBeforeNotExist.Generate(ddls) + } // handle the case where // is not in old source tables and current new source tables. // duplicate append is not a problem. @@ -160,12 +165,7 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro l.versions[callerSource][callerSchema][callerTable] = infoVersion } - var emptyDDLs = []string{} - prevTable := l.tables[callerSource][callerSchema][callerTable] - oldJoined := l.joined - lastTableInfo := schemacmp.Encode(newTIs[len(newTIs)-1]) - defer func() { // only update table info if no error or ignore conflict if ignoreConflict || err == nil { @@ -175,6 +175,19 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro } }() + prevTable := schemacmp.Encode(info.TableInfoBefore) + // if preTable not equal table in master, we always use preTable + // this often happens when an info TrySync twice, e.g. worker restart/resume task + if cmp, err := prevTable.Compare(l.tables[callerSource][callerSchema][callerTable]); err != nil || cmp != 0 { + l.tables[callerSource][callerSchema][callerTable] = prevTable + prevJoined, err := joinTable(prevTable) + if err != nil { + return emptyDDLs, err + } + l.joined = prevJoined + } + oldJoined := l.joined + lastJoined, err := joinTable(lastTableInfo) if err != nil { return emptyDDLs, err @@ -205,6 +218,7 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro // special case: if the DDL does not affect the schema at all, assume it is // idempotent and just execute the DDL directly. // if any real conflicts after joined exist, they will be detected by the following steps. + // this often happens when executing `CREATE TABLE` statement var cmp int if cmp, err = nextTable.Compare(oldJoined); err == nil && cmp == 0 { newDDLs = append(newDDLs, ddls[idx]) @@ -480,7 +494,6 @@ func (l *Lock) addTables(tts []TargetTable) { } for table := range tables { if _, ok := l.tables[tt.Source][schema][table]; !ok { - // NOTE: the newly added table uses the current table info. l.tables[tt.Source][schema][table] = l.joined l.done[tt.Source][schema][table] = false l.versions[tt.Source][schema][table] = 0 diff --git a/pkg/shardddl/optimism/lock_test.go b/pkg/shardddl/optimism/lock_test.go index 44b39f315b..fb864b531b 100644 --- a/pkg/shardddl/optimism/lock_test.go +++ b/pkg/shardddl/optimism/lock_test.go @@ -14,11 +14,14 @@ package optimism import ( + "testing" + . "github.com/pingcap/check" "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/schemacmp" "github.com/pingcap/tidb/util/mock" + "go.etcd.io/etcd/integration" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -28,6 +31,15 @@ type testLock struct{} var _ = Suite(&testLock{}) +func TestLock(t *testing.T) { + mockCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer mockCluster.Terminate(t) + + etcdTestCli = mockCluster.RandClient() + + TestingT(t) +} + func (t *testLock) SetUpSuite(c *C) { c.Assert(log.InitLogger(&log.Config{}), IsNil) } @@ -65,7 +77,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { newTargetTable(task, sources[1], downSchema, downTable, tables), } - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ sources[0]: { @@ -100,7 +112,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { for _, db := range dbs { for _, tbl := range tbls { - info := newInfoWithVersion(task, source, db, tbl, downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbl, downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -119,7 +131,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { t.checkLockNoDone(c, l) // CASE: TrySync again after synced is idempotent. - info := newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -129,7 +141,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { // CASE: need to add more than one DDL to reach the desired schema (schema become larger). // add two columns for one table. - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2_1, ti2}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2_1, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) @@ -139,7 +151,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { c.Assert(ready[sources[0]][dbs[0]][tbls[1]], IsFalse) // TrySync again is idempotent (more than one DDL). - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2_1, ti2}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2_1, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) @@ -149,7 +161,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { c.Assert(ready[sources[0]][dbs[0]][tbls[1]], IsFalse) // add only the first column for another table. - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs2[0:1], nil, []*model.TableInfo{ti2_1}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs2[0:1], ti1, []*model.TableInfo{ti2_1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2[0:1]) @@ -165,17 +177,18 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { c.Assert(err, IsNil) c.Assert(cmp, Equals, 1) - // TrySync again (only the first DDL). - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs2[0:1], nil, []*model.TableInfo{ti2_1}, vers) + // TrySync again is idempotent + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs2[0:1], ti1, []*model.TableInfo{ti2_1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) - c.Assert(DDLs, DeepEquals, []string{}) // NOTE: special case, joined has larger schema. + c.Assert(DDLs, DeepEquals, DDLs2[0:1]) c.Assert(l.versions, DeepEquals, vers) ready = l.Ready() + c.Assert(ready[sources[0]][dbs[0]][tbls[0]], IsTrue) c.Assert(ready[sources[0]][dbs[0]][tbls[1]], IsFalse) // add the second column for another table. - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs2[1:2], nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs2[1:2], ti2_1, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2[1:2]) @@ -191,24 +204,13 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { c.Assert(cmp, Equals, 0) // Try again (for the second DDL). - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs2[1:2], nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs2[1:2], ti2_1, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2[1:2]) c.Assert(l.versions, DeepEquals, vers) - // try add columns for all tables to reach the same schema. - resultDDLs := map[string]map[string]map[string][]string{ - sources[0]: { - dbs[0]: {tbls[0]: DDLs2[1:], tbls[1]: DDLs2[1:]}, - dbs[1]: {tbls[0]: DDLs2, tbls[1]: DDLs2}, - }, - sources[1]: { - dbs[0]: {tbls[0]: DDLs2, tbls[1]: DDLs2}, - dbs[1]: {tbls[0]: DDLs2, tbls[1]: DDLs2}, - }, - } - t.trySyncForAllTablesLarger(c, l, DDLs2, []*model.TableInfo{ti2_1, ti2}, tts, vers, resultDDLs) + t.trySyncForAllTablesLarger(c, l, DDLs2, ti1, []*model.TableInfo{ti2_1, ti2}, tts, vers) t.checkLockSynced(c, l) t.checkLockNoDone(c, l) @@ -230,7 +232,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { for _, db := range dbs { for _, tbl := range tbls { syncedCount++ - info = newInfoWithVersion(task, source, db, tbl, downSchema, downTable, DDLs3, nil, []*model.TableInfo{ti3}, vers) + info = newInfoWithVersion(task, source, db, tbl, downSchema, downTable, DDLs3, ti2, []*model.TableInfo{ti3}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(l.versions, DeepEquals, vers) @@ -253,7 +255,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { // CASE: need to drop more than one DDL to reach the desired schema (schema become smaller). // drop two columns for one table. - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs4, nil, []*model.TableInfo{ti4_1, ti4}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs4, ti3, []*model.TableInfo{ti4_1, ti4}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, []string{}) @@ -263,17 +265,17 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { c.Assert(ready[sources[0]][dbs[0]][tbls[1]], IsTrue) // TrySync again is idempotent. - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs4, nil, []*model.TableInfo{ti4_1, ti4}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs4, ti3, []*model.TableInfo{ti4_1, ti4}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) - c.Assert(DDLs, DeepEquals, DDLs4[:1]) + c.Assert(DDLs, DeepEquals, []string{}) c.Assert(l.versions, DeepEquals, vers) ready = l.Ready() c.Assert(ready[sources[0]][dbs[0]][tbls[0]], IsFalse) c.Assert(ready[sources[0]][dbs[0]][tbls[1]], IsTrue) // drop only the first column for another table. - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs4[0:1], nil, []*model.TableInfo{ti4_1}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs4[0:1], ti3, []*model.TableInfo{ti4_1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, []string{}) @@ -286,14 +288,14 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { c.Assert(cmp, Equals, -1) // TrySync again (only the first DDL). - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs4[0:1], nil, []*model.TableInfo{ti4_1}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs4[0:1], ti3, []*model.TableInfo{ti4_1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, []string{}) c.Assert(l.versions, DeepEquals, vers) // drop the second column for another table. - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs4[1:2], nil, []*model.TableInfo{ti4}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs4[1:2], ti4_1, []*model.TableInfo{ti4}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, []string{}) @@ -306,7 +308,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { c.Assert(cmp, Equals, 0) // TrySync again (for the second DDL). - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs4[1:2], nil, []*model.TableInfo{ti4}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[1], downSchema, downTable, DDLs4[1:2], ti4_1, []*model.TableInfo{ti4}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, []string{}) @@ -318,7 +320,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) { for schema, tables := range schemaTables { for table, synced2 := range tables { if synced2 { // do not `TrySync` again for previous two (un-synced now). - info = newInfoWithVersion(task, source, schema, table, downSchema, downTable, DDLs4, nil, []*model.TableInfo{ti4_1, ti4}, vers) + info = newInfoWithVersion(task, source, schema, table, downSchema, downTable, DDLs4, ti3, []*model.TableInfo{ti4_1, ti4}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(l.versions, DeepEquals, vers) @@ -360,7 +362,7 @@ func (t *testLock) TestLockTrySyncIndex(c *C) { newTargetTable(task, source, downSchema, downTable, tables), } - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -375,7 +377,7 @@ func (t *testLock) TestLockTrySyncIndex(c *C) { // try sync for one table, `DROP INDEX` returned directly (to make schema become more compatible). // `DROP INDEX` is handled like `ADD COLUMN`. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -386,7 +388,7 @@ func (t *testLock) TestLockTrySyncIndex(c *C) { c.Assert(remain, Equals, 1) // try sync for another table, also got `DROP INDEX` now. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -395,7 +397,7 @@ func (t *testLock) TestLockTrySyncIndex(c *C) { // try sync for one table, `ADD INDEX` not returned directly (to keep the schema more compatible). // `ADD INDEX` is handled like `DROP COLUMN`. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, []string{}) // no DDLs returned @@ -406,7 +408,7 @@ func (t *testLock) TestLockTrySyncIndex(c *C) { c.Assert(remain, Equals, 1) // try sync for another table, got `ADD INDEX` now. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) @@ -439,7 +441,7 @@ func (t *testLock) TestLockTrySyncNullNotNull(c *C) { newTargetTable(task, source, downSchema, downTable, tables), } - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -454,28 +456,28 @@ func (t *testLock) TestLockTrySyncNullNotNull(c *C) { for i := 0; i < 2; i++ { // two round // try sync for one table, from `NULL` to `NOT NULL`, no DDLs returned. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, []string{}) c.Assert(l.versions, DeepEquals, vers) // try sync for another table, DDLs returned. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) c.Assert(l.versions, DeepEquals, vers) // try sync for one table, from `NOT NULL` to `NULL`, DDLs returned. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) c.Assert(l.versions, DeepEquals, vers) // try sync for another table, from `NOT NULL` to `NULL`, DDLs, returned. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) @@ -506,7 +508,7 @@ func (t *testLock) TestLockTrySyncIntBigint(c *C) { newTargetTable(task, source, downSchema, downTable, tables), } - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -520,14 +522,14 @@ func (t *testLock) TestLockTrySyncIntBigint(c *C) { t.checkLockNoDone(c, l) // try sync for one table, from `INT` to `BIGINT`, DDLs returned. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) c.Assert(l.versions, DeepEquals, vers) // try sync for another table, DDLs returned. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -557,7 +559,7 @@ func (t *testLock) TestLockTrySyncNoDiff(c *C) { newTargetTable(task, source, downSchema, downTable, tables), } - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -571,7 +573,7 @@ func (t *testLock) TestLockTrySyncNoDiff(c *C) { t.checkLockNoDone(c, l) // try sync for one table. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(terror.ErrShardDDLOptimismTrySyncFail.Equal(err), IsTrue) c.Assert(DDLs, DeepEquals, []string{}) @@ -599,7 +601,7 @@ func (t *testLock) TestLockTrySyncNewTable(c *C) { tables = map[string]map[string]struct{}{db1: {tbl1: struct{}{}}} tts = []TargetTable{newTargetTable(task, source1, downSchema, downTable, tables)} - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source1: { db1: {tbl1: 0}, @@ -615,7 +617,7 @@ func (t *testLock) TestLockTrySyncNewTable(c *C) { t.checkLockNoDone(c, l) // TrySync for a new table as the caller. - info := newInfoWithVersion(task, source2, db2, tbl2, downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source2, db2, tbl2, downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -631,7 +633,7 @@ func (t *testLock) TestLockTrySyncNewTable(c *C) { c.Assert(ready[source2][db2][tbl2], IsTrue) // TrySync for two new tables as extra sources. - // we treat all newly added sources as synced. + // newly added work table use tableInfoBefore as table info tts = append(tts, newTargetTable(task, source1, downSchema, downTable, map[string]map[string]struct{}{db1: {tbl2: struct{}{}}}), newTargetTable(task, source2, downTable, downTable, map[string]map[string]struct{}{db2: {tbl1: struct{}{}}}), @@ -639,7 +641,24 @@ func (t *testLock) TestLockTrySyncNewTable(c *C) { vers[source1][db1][tbl2] = 0 vers[source2][db2][tbl1] = 0 - info = newInfoWithVersion(task, source1, db1, tbl1, downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info = newInfoWithVersion(task, source1, db1, tbl1, downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) + DDLs, err = l.TrySync(info, tts) + c.Assert(err, IsNil) + c.Assert(DDLs, DeepEquals, DDLs1) + c.Assert(l.versions, DeepEquals, vers) + + ready = l.Ready() + c.Assert(ready, HasLen, 2) + c.Assert(ready[source1], HasLen, 1) + c.Assert(ready[source1][db1], HasLen, 2) + c.Assert(ready[source1][db1][tbl1], IsTrue) + c.Assert(ready[source1][db1][tbl2], IsTrue) + c.Assert(ready[source2], HasLen, 1) + c.Assert(ready[source2][db2], HasLen, 2) + c.Assert(ready[source2][db2][tbl1], IsTrue) + c.Assert(ready[source2][db2][tbl2], IsTrue) + + info = newInfoWithVersion(task, source1, db1, tbl2, downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -692,7 +711,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { tables = map[string]map[string]struct{}{db: {tbls[0]: struct{}{}, tbls[1]: struct{}{}}} tts = []TargetTable{newTargetTable(task, source, downSchema, downTable, tables)} - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -707,7 +726,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { // CASE: revert for single DDL. // TrySync for one table. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -723,7 +742,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { c.Assert(cmp, Equals, -1) // revert for the table, become synced again. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) @@ -733,7 +752,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { // CASE: revert for multiple DDLs. // TrySync for one table. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs3, nil, []*model.TableInfo{ti4, ti3}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs3, ti0, []*model.TableInfo{ti4, ti3}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs3) @@ -749,7 +768,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { c.Assert(cmp, Equals, -1) // revert part of the DDLs. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs4, nil, []*model.TableInfo{ti4}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs4, ti3, []*model.TableInfo{ti4}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs4) @@ -764,7 +783,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { c.Assert(cmp, Equals, -1) // revert the reset part of the DDLs. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs5, nil, []*model.TableInfo{ti5}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs5, ti4, []*model.TableInfo{ti5}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs5) @@ -774,7 +793,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { // CASE: revert part of multiple DDLs. // TrySync for one table. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs6, nil, []*model.TableInfo{ti7, ti6}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs6, ti0, []*model.TableInfo{ti7, ti6}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs6) @@ -789,7 +808,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { c.Assert(cmp, Equals, -1) // revert part of the DDLs. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs7, nil, []*model.TableInfo{ti7}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs7, ti3, []*model.TableInfo{ti7}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs7) @@ -804,7 +823,7 @@ func (t *testLock) TestLockTrySyncRevert(c *C) { c.Assert(cmp, Equals, -1) // TrySync for another table. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs8, nil, []*model.TableInfo{ti8}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs8, ti0, []*model.TableInfo{ti8}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs8) @@ -839,7 +858,7 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) { tables = map[string]map[string]struct{}{db: {tbls[0]: struct{}{}, tbls[1]: struct{}{}}} tts = []TargetTable{newTargetTable(task, source, downSchema, downTable, tables)} - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -853,7 +872,7 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) { t.checkLockNoDone(c, l) // TrySync for the first table, construct the joined schema. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -869,7 +888,7 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) { c.Assert(cmp, Equals, -1) // TrySync for the second table with another schema (add two columns, one of them will cause conflict). - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2_1, ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti2_1, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(terror.ErrShardDDLOptimismTrySyncFail.Equal(err), IsTrue) c.Assert(DDLs, DeepEquals, []string{}) @@ -882,7 +901,7 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) { c.Assert(ready[source][db][tbls[1]], IsFalse) // TrySync for the first table to resolve the conflict. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs3, nil, []*model.TableInfo{ti3}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs3, ti1, []*model.TableInfo{ti3}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs3) @@ -898,7 +917,7 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) { c.Assert(cmp, Equals, 0) // TrySync for the second table, succeed now - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2_1, ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti2_1, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) @@ -910,7 +929,7 @@ func (t *testLock) TestLockTrySyncConflictNonIntrusive(c *C) { c.Assert(ready[source][db][tbls[1]], IsTrue) // TrySync for the first table. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs4, nil, []*model.TableInfo{ti4_1, ti4}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs4, ti0, []*model.TableInfo{ti4_1, ti4}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs4) @@ -933,19 +952,17 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { tblID int64 = 111 DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"} DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c1 DATETIME", "ALTER TABLE bar ADD COLUMN c2 INT"} - DDLs3 = []string{"ALTER TABLE bar DROP COLUMN c2"} - DDLs4 = []string{"ALTER TABLE bar DROP COLUMN c1"} + DDLs3 = []string{"ALTER TABLE bar ADD COLUMN c1 DATETIME"} ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`) ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`) ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 DATETIME, c2 INT)`) ti3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 DATETIME)`) - ti4 = ti0 DDLs5 = []string{"ALTER TABLE bar ADD COLUMN c2 TEXT"} DDLs6 = []string{"ALTER TABLE bar ADD COLUMN c2 DATETIME", "ALTER TABLE bar ADD COLUMN c3 INT"} - DDLs7 = []string{"ALTER TABLE bar DROP COLUMN c2"} - DDLs8_1 = []string{"ALTER TABLE bar ADD COLUMN c3 INT"} - DDLs8_2 = []string{"ALTER TABLE bar ADD COLUMN c2 TEXT"} + DDLs7 = []string{"ALTER TABLE bar ADD COLUMN c3 INT"} + DDLs8_1 = DDLs7 + DDLs8_2 = DDLs5 ti5 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT, c2 TEXT)`) ti6 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT, c2 DATETIME, c3 INT)`) ti6_1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT, c2 DATETIME)`) @@ -954,7 +971,7 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { tables = map[string]map[string]struct{}{db: {tbls[0]: struct{}{}, tbls[1]: struct{}{}}} tts = []TargetTable{newTargetTable(task, source, downSchema, downTable, tables)} - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -969,7 +986,7 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { // CASE: conflict happen, revert all changes to resolve the conflict. // TrySync for the first table, construct the joined schema. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -985,7 +1002,7 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(cmp, Equals, -1) // TrySync for the second table with another schema (add two columns, one of them will cause conflict). - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti3, ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti3, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(terror.ErrShardDDLOptimismTrySyncFail.Equal(err), IsTrue) c.Assert(DDLs, DeepEquals, []string{}) @@ -998,7 +1015,7 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(ready[source][db][tbls[1]], IsFalse) // TrySync again. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti3, ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti3, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(terror.ErrShardDDLOptimismTrySyncFail.Equal(err), IsTrue) c.Assert(DDLs, DeepEquals, []string{}) @@ -1007,8 +1024,8 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(err, IsNil) c.Assert(cmp, Equals, -1) - // TrySync for the second table to drop the non-conflict column, the conflict should still exist. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs3, nil, []*model.TableInfo{ti3}, vers) + // TrySync for the second table to replace a new ddl without non-conflict column, the conflict should still exist. + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs3, ti0, []*model.TableInfo{ti3}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(terror.ErrShardDDLOptimismTrySyncFail.Equal(err), IsTrue) c.Assert(DDLs, DeepEquals, []string{}) @@ -1019,20 +1036,8 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { ready = l.Ready() c.Assert(ready[source][db][tbls[1]], IsFalse) - // TrySync for the second table to drop the conflict column, the conflict should be resolved. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs4, nil, []*model.TableInfo{ti4}, vers) - DDLs, err = l.TrySync(info, tts) - c.Assert(err, IsNil) - c.Assert(DDLs, DeepEquals, []string{}) - c.Assert(l.versions, DeepEquals, vers) - cmp, err = l.tables[source][db][tbls[1]].Compare(l.Joined()) - c.Assert(err, IsNil) - c.Assert(cmp, Equals, -1) - ready = l.Ready() - c.Assert(ready[source][db][tbls[1]], IsFalse) - // TrySync for the second table as we did for the first table, the lock should be synced. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs) @@ -1045,7 +1050,7 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { // CASE: conflict happen, revert part of changes to resolve the conflict. // TrySync for the first table, construct the joined schema. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs5, nil, []*model.TableInfo{ti5}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs5, ti1, []*model.TableInfo{ti5}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs5) @@ -1061,7 +1066,7 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(cmp, Equals, -1) // TrySync for the second table with another schema (add two columns, one of them will cause conflict). - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs6, nil, []*model.TableInfo{ti6_1, ti6}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs6, ti1, []*model.TableInfo{ti6_1, ti6}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(terror.ErrShardDDLOptimismTrySyncFail.Equal(err), IsTrue) c.Assert(DDLs, DeepEquals, []string{}) @@ -1072,12 +1077,12 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { ready = l.Ready() c.Assert(ready[source][db][tbls[1]], IsFalse) - // TrySync for the second table to drop the conflict column, the conflict should be resolved. + // TrySync for the second table to replace a new ddl without conflict column, the conflict should be resolved. // but both of tables are not synced now. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs7, nil, []*model.TableInfo{ti7}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs7, ti1, []*model.TableInfo{ti7}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) - c.Assert(DDLs, DeepEquals, DDLs7) // special case: these DDLs should not be replicated to the downstream. + c.Assert(DDLs, DeepEquals, DDLs7) c.Assert(l.versions, DeepEquals, vers) ready = l.Ready() c.Assert(ready[source][db][tbls[0]], IsFalse) @@ -1090,7 +1095,7 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(cmp, Equals, -1) // TrySync for the first table to become synced. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs8_1, nil, []*model.TableInfo{ti8}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs8_1, ti5, []*model.TableInfo{ti8}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs8_1) @@ -1098,7 +1103,7 @@ func (t *testLock) TestLockTrySyncConflictIntrusive(c *C) { c.Assert(ready[source][db][tbls[0]], IsTrue) // TrySync for the second table to become synced. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs8_2, nil, []*model.TableInfo{ti8}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs8_2, ti7, []*model.TableInfo{ti8}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs8_2) @@ -1144,7 +1149,7 @@ func (t *testLock) TestLockTrySyncMultipleChangeDDL(c *C) { newTargetTable(task, sources[1], downSchema, downTable, tables), } - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ sources[0]: { @@ -1163,12 +1168,12 @@ func (t *testLock) TestLockTrySyncMultipleChangeDDL(c *C) { t.checkLockNoDone(c, l) // inconsistent ddls and table infos - info := newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs1[:1], nil, []*model.TableInfo{ti1_1, ti1}, vers) + info := newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs1[:1], ti0, []*model.TableInfo{ti1_1, ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(DDLs, DeepEquals, DDLs1[:1]) c.Assert(terror.ErrMasterInconsistentOptimisticDDLsAndInfo.Equal(err), IsTrue) - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(DDLs, DeepEquals, DDLs1) c.Assert(terror.ErrMasterInconsistentOptimisticDDLsAndInfo.Equal(err), IsTrue) @@ -1191,7 +1196,7 @@ func (t *testLock) TestLockTrySyncMultipleChangeDDL(c *C) { for _, source := range sources { for _, db := range dbs { for _, tbl := range tbls { - info = newInfoWithVersion(task, source, db, tbl, downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1_1, ti1}, vers) + info = newInfoWithVersion(task, source, db, tbl, downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1_1, ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, resultDDLs1[source][db][tbl]) @@ -1210,7 +1215,7 @@ func (t *testLock) TestLockTrySyncMultipleChangeDDL(c *C) { // CASE: TrySync again after synced is idempotent. // both ddl will sync again - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1_1, ti1}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1_1, ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -1233,7 +1238,7 @@ func (t *testLock) TestLockTrySyncMultipleChangeDDL(c *C) { for _, source := range sources { for _, db := range dbs { for _, tbl := range tbls { - info = newInfoWithVersion(task, source, db, tbl, downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2_1, ti2}, vers) + info = newInfoWithVersion(task, source, db, tbl, downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2_1, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, resultDDLs2[source][db][tbl]) @@ -1251,11 +1256,10 @@ func (t *testLock) TestLockTrySyncMultipleChangeDDL(c *C) { t.checkLockNoDone(c, l) // CASE: TrySync again after synced is idempotent. - // only the second ddl(ADD COLUMN) will sync, the first one(DROP COLUMN) will not sync since oldJoined==newJoined - info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2_1, ti2}, vers) + info = newInfoWithVersion(task, sources[0], dbs[0], tbls[0], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2_1, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) - c.Assert(DDLs, DeepEquals, DDLs2[1:]) + c.Assert(DDLs, DeepEquals, DDLs2) c.Assert(l.versions, DeepEquals, vers) t.checkLockSynced(c, l) t.checkLockNoDone(c, l) @@ -1282,7 +1286,7 @@ func (t *testLock) TestTryRemoveTable(c *C) { tables = map[string]map[string]struct{}{db: {tbl1: struct{}{}, tbl2: struct{}{}}} tts = []TargetTable{newTargetTable(task, source, downSchema, downTable, tables)} - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -1297,7 +1301,7 @@ func (t *testLock) TestTryRemoveTable(c *C) { // CASE: remove a table as normal. // TrySync for the first table. - info := newInfoWithVersion(task, source, db, tbl1, downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbl1, downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -1322,7 +1326,7 @@ func (t *testLock) TestTryRemoveTable(c *C) { // CASE: remove a table will not rebuild joined schema now. // TrySync to add the second back. vers[source][db][tbl2] = 0 - info = newInfoWithVersion(task, source, db, tbl2, downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, source, db, tbl2, downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) @@ -1372,7 +1376,7 @@ func (t *testLock) TestLockTryMarkDone(c *C) { tables = map[string]map[string]struct{}{db: {tbls[0]: struct{}{}, tbls[1]: struct{}{}}} tts = []TargetTable{newTargetTable(task, source, downSchema, downTable, tables)} - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -1387,7 +1391,7 @@ func (t *testLock) TestLockTryMarkDone(c *C) { c.Assert(l.IsResolved(), IsFalse) // TrySync for the first table, no table has done the DDLs operation. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -1402,7 +1406,7 @@ func (t *testLock) TestLockTryMarkDone(c *C) { c.Assert(l.IsResolved(), IsFalse) // TrySync for the second table, the joined schema become larger. - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti1, ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti1, ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs2) @@ -1421,7 +1425,7 @@ func (t *testLock) TestLockTryMarkDone(c *C) { c.Assert(l.IsResolved(), IsFalse) // TrySync for the first table, all tables become synced. - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs3, nil, []*model.TableInfo{ti3}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs3, ti1, []*model.TableInfo{ti3}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs3) @@ -1477,7 +1481,7 @@ func (t *testLock) TestAddDifferentFieldLenColumns(c *C) { tables = map[string]map[string]struct{}{db: {tbls[0]: struct{}{}, tbls[1]: struct{}{}}} tts = []TargetTable{newTargetTable(task, source, downSchema, downTable, tables)} - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) vers = map[string]map[string]map[string]int64{ source: { @@ -1495,7 +1499,7 @@ func (t *testLock) TestAddDifferentFieldLenColumns(c *C) { c.Assert(l.IsResolved(), IsFalse) // TrySync for the first table, no table has done the DDLs operation. - info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err := l.TrySync(info, tts) c.Assert(err, IsNil) c.Assert(DDLs, DeepEquals, DDLs1) @@ -1504,18 +1508,18 @@ func (t *testLock) TestAddDifferentFieldLenColumns(c *C) { c.Assert(l.IsResolved(), IsFalse) // TrySync for the second table, add a table with a larger field length - info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2}, vers) + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti2}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, ErrorMatches, ".*add columns with different field lengths.*") c.Assert(DDLs, DeepEquals, DDLs2) c.Assert(l.versions, DeepEquals, vers) // case 2: add a column with a smaller field length - l = NewLock(ID, task, downSchema, downTable, ti0, tts) + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) // TrySync for the first table, no table has done the DDLs operation. vers[source][db][tbls[0]]-- - info = NewInfo(task, source, db, tbls[1], downSchema, downTable, DDLs2, nil, []*model.TableInfo{ti2}) + info = NewInfo(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti2}) info.Version = vers[source][db][tbls[1]] DDLs, err = l.TrySync(info, tts) c.Assert(err, IsNil) @@ -1525,7 +1529,7 @@ func (t *testLock) TestAddDifferentFieldLenColumns(c *C) { c.Assert(l.IsResolved(), IsFalse) // TrySync for the second table, add a table with a smaller field length - info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, nil, []*model.TableInfo{ti1}, vers) + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) DDLs, err = l.TrySync(info, tts) c.Assert(err, ErrorMatches, ".*add columns with different field lengths.*") c.Assert(DDLs, DeepEquals, DDLs1) @@ -1533,14 +1537,14 @@ func (t *testLock) TestAddDifferentFieldLenColumns(c *C) { } func (t *testLock) trySyncForAllTablesLarger(c *C, l *Lock, - DDLs []string, tis []*model.TableInfo, tts []TargetTable, vers map[string]map[string]map[string]int64, resultDDLs map[string]map[string]map[string][]string) { + DDLs []string, tableInfoBefore *model.TableInfo, tis []*model.TableInfo, tts []TargetTable, vers map[string]map[string]map[string]int64) { for source, schemaTables := range l.Ready() { for schema, tables := range schemaTables { for table := range tables { - info := newInfoWithVersion(l.Task, source, schema, table, l.DownSchema, l.DownTable, DDLs, nil, tis, vers) + info := newInfoWithVersion(l.Task, source, schema, table, l.DownSchema, l.DownTable, DDLs, tableInfoBefore, tis, vers) DDLs2, err := l.TrySync(info, tts) c.Assert(err, IsNil) - c.Assert(DDLs2, DeepEquals, resultDDLs[source][schema][table]) + c.Assert(DDLs2, DeepEquals, DDLs) } } } @@ -1580,3 +1584,81 @@ func newInfoWithVersion(task, source, upSchema, upTable, downSchema, downTable s info.Version = vers[source][upSchema][upTable] return info } + +func (t *testLock) TestLockTrySyncDifferentIndex(c *C) { + var ( + ID = "test_lock_try_sync_index-`foo`.`bar`" + task = "test_lock_try_sync_index" + source = "mysql-replica-1" + downSchema = "db" + downTable = "bar" + db = "db" + tbls = []string{"bar1", "bar2"} + p = parser.New() + se = mock.NewContext() + tblID int64 = 111 + DDLs1 = []string{"ALTER TABLE bar DROP INDEX idx_c1"} + DDLs2 = []string{"ALTER TABLE bar ADD INDEX new_idx(c1)"} + ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, UNIQUE INDEX idx_c1(c1))`) + ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`) + ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, INDEX new_idx(c1))`) + tables = map[string]map[string]struct{}{ + db: {tbls[0]: struct{}{}, tbls[1]: struct{}{}}, + } + tts = []TargetTable{ + newTargetTable(task, source, downSchema, downTable, tables), + } + + l = NewLock(ID, task, downSchema, downTable, schemacmp.Encode(ti0), tts) + + vers = map[string]map[string]map[string]int64{ + source: { + db: {tbls[0]: 0, tbls[1]: 0}, + }, + } + ) + + // the initial status is synced. + t.checkLockSynced(c, l) + t.checkLockNoDone(c, l) + + // try sync for one table, `DROP INDEX` returned directly (to make schema become more compatible). + // `DROP INDEX` is handled like `ADD COLUMN`. + info := newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1}, vers) + DDLs, err := l.TrySync(info, tts) + c.Assert(err, IsNil) + c.Assert(DDLs, DeepEquals, DDLs1) + c.Assert(l.versions, DeepEquals, vers) + synced, remain := l.IsSynced() + c.Assert(synced, Equals, l.synced) + c.Assert(synced, IsFalse) + c.Assert(remain, Equals, 1) + + cmp, err := l.tables[source][db][tbls[1]].Compare(schemacmp.Encode(ti0)) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, 0) + + // try sync ADD another INDEX for another table + // `ADD INDEX` is handled like `DROP COLUMN`. + info = newInfoWithVersion(task, source, db, tbls[1], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}, vers) + DDLs, err = l.TrySync(info, tts) + c.Assert(err, IsNil) + c.Assert(DDLs, DeepEquals, []string{}) // no DDLs returned + c.Assert(l.versions, DeepEquals, vers) + synced, remain = l.IsSynced() + c.Assert(synced, Equals, l.synced) + c.Assert(synced, IsFalse) + c.Assert(remain, Equals, 1) + + cmp, err = l.tables[source][db][tbls[0]].Compare(l.joined) + c.Assert(err, IsNil) + c.Assert(cmp, Equals, 0) + + // try sync ADD INDEX for first table + info = newInfoWithVersion(task, source, db, tbls[0], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2}, vers) + DDLs, err = l.TrySync(info, tts) + c.Assert(err, IsNil) + c.Assert(DDLs, DeepEquals, DDLs2) + c.Assert(l.versions, DeepEquals, vers) + t.checkLockSynced(c, l) +} diff --git a/pkg/shardddl/optimism/ops_test.go b/pkg/shardddl/optimism/ops_test.go index f6f4794e81..7758fd9b8c 100644 --- a/pkg/shardddl/optimism/ops_test.go +++ b/pkg/shardddl/optimism/ops_test.go @@ -41,6 +41,7 @@ func (t *testForEtcd) TestDeleteInfosOperationsSchema(c *C) { c.Assert(ifm, HasLen, 1) infoWithVer := info infoWithVer.Version = 1 + infoWithVer.Revision = ifm[task][source][upSchema][upTable].Revision c.Assert(ifm[task][source][upSchema][upTable], DeepEquals, infoWithVer) // put operation. @@ -133,6 +134,7 @@ func (t *testForEtcd) TestSourceTablesInfo(c *C) { c.Assert(ifm[task][source][upSchema], HasLen, 1) i11WithVer := i11 i11WithVer.Version = 1 + i11WithVer.Revision = ifm[task][source][upSchema][upTable].Revision c.Assert(ifm[task][source][upSchema][upTable], DeepEquals, i11WithVer) // put/update source tables and delete info. diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 7e1b0a5122..aedf21aa2d 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -469,6 +469,7 @@ const ( codeMasterBoundChanging codeMasterFailToImportFromV10x codeMasterInconsistentOptimistDDLsAndInfo + codeMasterOptimisticTableInfobeforeNotExist ) // DM-worker error code @@ -1059,6 +1060,7 @@ var ( ErrMasterFailToImportFromV10x = New(codeMasterFailToImportFromV10x, ClassDMMaster, ScopeInternal, LevelHigh, "fail to import DM cluster from v1.0.x", "Please confirm that you have not violated any restrictions in the upgrade documentation.") ErrMasterInconsistentOptimisticDDLsAndInfo = New(codeMasterInconsistentOptimistDDLsAndInfo, ClassDMMaster, ScopeInternal, LevelHigh, "inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d", "") + ErrMasterOptimisticTableInfoBeforeNotExist = New(codeMasterOptimisticTableInfobeforeNotExist, ClassDMMaster, ScopeInternal, LevelHigh, "table-info-before not exist in optimistic ddls: %v", "") // DM-worker error ErrWorkerParseFlagSet = New(codeWorkerParseFlagSet, ClassDMWorker, ScopeInternal, LevelMedium, "parse dm-worker config flag set", "") diff --git a/syncer/schema.go b/syncer/schema.go index 6d0d3526fd..3cc8a5b88e 100644 --- a/syncer/schema.go +++ b/syncer/schema.go @@ -102,7 +102,8 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR break } downSchema, downTable := s.renameShardingSchema(req.Database, req.Table) - info := s.optimist.ConstructInfo(req.Database, req.Table, downSchema, downTable, []string{""}, nil, []*model.TableInfo{ti}) + // use new table info as tableInfoBefore, we can also use the origin table from schemaTracker + info := s.optimist.ConstructInfo(req.Database, req.Table, downSchema, downTable, []string{""}, ti, []*model.TableInfo{ti}) info.IgnoreConflict = true log.L().Info("sync info with operate-schema", zap.Stringer("info", info)) _, err = s.optimist.PutInfo(info) diff --git a/syncer/shardddl/optimist_test.go b/syncer/shardddl/optimist_test.go index 689f2d884f..4e7811e04f 100644 --- a/syncer/shardddl/optimist_test.go +++ b/syncer/shardddl/optimist_test.go @@ -146,6 +146,7 @@ func (t *testOptimist) TestOptimist(c *C) { c.Assert(ifm[task][source][info1.UpSchema], HasLen, 1) info1WithVer := info1 info1WithVer.Version = 1 + info1WithVer.Revision = ifm[task][source][info1.UpSchema][info1.UpTable].Revision c.Assert(ifm[task][source][info1.UpSchema][info1.UpTable], DeepEquals, info1WithVer) opc := op1c opc.Done = true @@ -169,6 +170,7 @@ func (t *testOptimist) TestOptimist(c *C) { c.Assert(err, IsNil) infoCreateWithVer := infoCreate infoCreateWithVer.Version = 1 + infoCreateWithVer.Revision = ifm[task][source][infoCreate.UpSchema][infoCreate.UpTable].Revision c.Assert(ifm[task][source][infoCreate.UpSchema][infoCreate.UpTable], DeepEquals, infoCreateWithVer) c.Assert(o.tables.Tables[infoCreate.DownSchema][infoCreate.DownTable][infoCreate.UpSchema], HasKey, infoCreate.UpTable) diff --git a/tests/shardddl1/run.sh b/tests/shardddl1/run.sh index d2597ae75b..b306aec8fc 100644 --- a/tests/shardddl1/run.sh +++ b/tests/shardddl1/run.sh @@ -589,6 +589,70 @@ function DM_RENAME_COLUMN_OPTIMISTIC() { "clean_table" "optimistic" } +function DM_RECOVER_LOCK_CASE() { + # tb1(a,b) tb2(a,b) + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1);" + run_sql_source2 "insert into ${shardddl1}.${tb2} values(2,2);" + + # tb1(a,b,c); tb2(a,b) + run_sql_source1 "alter table ${shardddl1}.${tb1} add column c varchar(10);" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,3,'aaa');" + check_log_contain_with_retry "putted a shard DDL.*tb1.*ALTER TABLE .* ADD COLUMN" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + + # tb1(a,b,c); tb2(a) + run_sql_source2 "alter table ${shardddl1}.${tb2} drop column b;" + check_log_contain_with_retry "putted a shard DDL.*tb2.*ALTER TABLE .* DROP COLUMN" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + + echo "restart dm-master" + ps aux | grep dm-master |awk '{print $2}'|xargs kill || true + check_port_offline $MASTER_PORT 20 + sleep 2 + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(4,'bbb');" + run_sql_source2 "insert into ${shardddl1}.${tb2} values(5);" + check_log_contain_with_retry "putted a shard DDL.*tb1.*ALTER TABLE .* DROP COLUMN" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + + # tb1(a,c); tb2(a,b) + run_sql_source2 "alter table ${shardddl1}.${tb2} add column b int;" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(6,'ccc');" + run_sql_source2 "insert into ${shardddl1}.${tb2} values(7,7);" + check_log_contain_with_retry "putted a shard DDL.*tb2.*ALTER TABLE .* ADD COLUMN" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + + # recover lock, tb1's info: (a,b,c)->(a,c); tb2's info: (a)->(a,b) + # joined(a,b,c); tb1(a,b,c); tb2(a) + # TrySync tb1: joined(a,b,c); tb1(a,c); tb2(a) + # TrySync tb2: joined(a,c); tb1(a,c); tb2(a,b) + echo "restart dm-master" + ps aux | grep dm-master |awk '{print $2}'|xargs kill || true + check_port_offline $MASTER_PORT 20 + sleep 2 + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + run_sql_source1 "insert into ${shardddl1}.${tb1} values(8,'eee');" + run_sql_source2 "insert into ${shardddl1}.${tb2} values(9,9);" + + run_sql_source1 "alter table ${shardddl1}.${tb1} add column b int;" + run_sql_source2 "alter table ${shardddl1}.${tb2} add column c varchar(10) after a;" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(10,'fff',10);" + run_sql_source2 "insert into ${shardddl1}.${tb2} values(11,'ggg',11);" + + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "show-ddl-locks" \ + "no DDL lock exists" 1 +} + +function DM_RECOVER_LOCK() { + run_case RECOVER_LOCK "double-source-optimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int) DEFAULT CHARSET=latin1;\"; \ + run_sql_source2 \"create table ${shardddl1}.${tb2} (a int primary key, b int) DEFAULT CHARSET=latin1;\"" \ + "clean_table" "optimistic" +} + function run() { init_cluster init_database @@ -603,6 +667,7 @@ function run() { sleep 1 done DM_RENAME_COLUMN_OPTIMISTIC + DM_RECOVER_LOCK } cleanup_data $shardddl