Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1518 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
GMHDBJD authored and ti-srebot committed Mar 26, 2021
1 parent 3407f19 commit c614263
Show file tree
Hide file tree
Showing 16 changed files with 610 additions and 151 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high
ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "Message: source bound is changed too frequently, last old bound %s:, new bound %s, Workaround: Please try again later"
ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation."
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table-info-before not exist in optimistic ddls: %v"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
1 change: 1 addition & 0 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, false)
)

st1.AddTable("foo-1", "bar-1", schema, table)
_, err = optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, check.IsNil)
_, err = optimism.PutInfo(etcdTestCli, info1)
Expand Down
84 changes: 70 additions & 14 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/schemacmp"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/optimism"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// Optimist is used to coordinate the shard DDL migration in optimism mode.
Expand Down Expand Up @@ -255,29 +257,83 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
return revSource, revInfo, revOperation, nil
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation) error {
// construct locks based on the shard DDL info.
for task, ifTask := range ifm {
// sortInfos sort all infos by revision
func sortInfos(ifm map[string]map[string]map[string]map[string]optimism.Info) []optimism.Info {
infos := make([]optimism.Info, 0, len(ifm))

for _, ifTask := range ifm {
for _, ifSource := range ifTask {
for _, ifSchema := range ifSource {
for _, info := range ifSchema {
tts := o.tk.FindTables(task, info.DownSchema, info.DownTable)
_, _, err := o.lk.TrySync(info, tts)
if err != nil {
return err
infos = append(infos, info)
}
}
}
}

// sort according to the Revision
sort.Slice(infos, func(i, j int) bool {
return infos[i].Revision < infos[j].Revision
})
return infos
}

// buildLockJoinedAndTTS build joined table and target table slice for lock by history infos
func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]map[string]optimism.Info) (map[string]schemacmp.Table, map[string][]optimism.TargetTable) {
lockJoined := make(map[string]schemacmp.Table)
lockTTS := make(map[string][]optimism.TargetTable)

for _, taskInfos := range ifm {
for _, sourceInfos := range taskInfos {
for _, schemaInfos := range sourceInfos {
for _, info := range schemaInfos {
lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable)
if joined, ok := lockJoined[lockID]; !ok {
lockJoined[lockID] = schemacmp.Encode(info.TableInfoBefore)
} else {
newJoined, err := joined.Join(schemacmp.Encode(info.TableInfoBefore))
// ignore error, will report it in TrySync later
if err != nil {
o.logger.Error(fmt.Sprintf("fail to join table info %s with %s, lockID: %s in recover lock", joined, newJoined, lockID), log.ShortError(err))
} else {
lockJoined[lockID] = newJoined
}
}
// never mark the lock operation from `done` to `not-done` when recovering.
err = o.handleLock(info, tts, true)
if err != nil {
return err
if _, ok := lockTTS[lockID]; !ok {
lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
}
}
}
}
}
return lockJoined, lockTTS
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation) error {
// construct joined table based on the shard DDL info.
o.logger.Info("build lock joined and tts")
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
// build lock and restore table info
o.logger.Info("rebuild locks and tables")
o.lk.RebuildLocksAndTables(ifm, lockJoined, lockTTS)
// sort infos by revision
infos := sortInfos(ifm)

for _, info := range infos {
tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
_, _, err := o.lk.TrySync(info, tts)
if err != nil {
return err
}
// never mark the lock operation from `done` to `not-done` when recovering.
err = o.handleLock(info, tts, true)
if err != nil {
return err
}
}

// update the done status of the lock.
for _, opTask := range opm {
Expand Down
130 changes: 128 additions & 2 deletions dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/schemacmp"
tiddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -148,6 +149,7 @@ func (t *testOptimist) TestOptimist(c *C) {
t.testOptimist(c, noRestart)
t.testOptimist(c, restartOnly)
t.testOptimist(c, restartNewInstance)
t.testSortInfos(c)
}

func (t *testOptimist) testOptimist(c *C, restart int) {
Expand Down Expand Up @@ -656,14 +658,13 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
tblID int64 = 111
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"}
DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c1 DATETIME"}
DDLs3 = []string{"ALTER TABLE bar DROP COLUMN c1"}
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 DATETIME)`)
ti3 = ti0
i1 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i2 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti2})
i3 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs3, ti2, []*model.TableInfo{ti3})
i3 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti3})
)

st1.AddTable("foo", "bar-1", downSchema, downTable)
Expand Down Expand Up @@ -714,6 +715,7 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
c.Assert(len(errCh), Equals, 0)

// PUT i3, no conflict now.
// case for handle-error replace
rev3, err := optimism.PutInfo(etcdTestCli, i3)
c.Assert(err, IsNil)
// wait operation for i3 become available.
Expand Down Expand Up @@ -1029,3 +1031,127 @@ func (t *testOptimist) TestOptimistInitSchema(c *C) {
c.Assert(err, IsNil)
c.Assert(is.TableInfo, DeepEquals, ti1) // the init schema is ti1 now.
}

func (t *testOptimist) testSortInfos(c *C) {
defer clearOptimistTestSourceInfoOperation(c)

var (
task = "test-optimist-init-schema"
sources = []string{"mysql-replica-1", "mysql-replica-2"}
upSchema = "foo"
upTables = []string{"bar-1", "bar-2"}
downSchema = "foo"
downTable = "bar"

p = parser.New()
se = mock.NewContext()
tblID int64 = 111
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"}
DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"}
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT, c2 INT)`)
i11 = optimism.NewInfo(task, sources[0], upSchema, upTables[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i12 = optimism.NewInfo(task, sources[0], upSchema, upTables[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i21 = optimism.NewInfo(task, sources[1], upSchema, upTables[1], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2})
)

rev1, err := optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)
ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos := sortInfos(ifm)
c.Assert(len(infos), Equals, 1)
i11.Version = 1
i11.Revision = rev1
c.Assert(infos[0], DeepEquals, i11)

rev2, err := optimism.PutInfo(etcdTestCli, i12)
c.Assert(err, IsNil)
ifm, _, err = optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos = sortInfos(ifm)
c.Assert(len(infos), Equals, 2)
i11.Version = 1
i11.Revision = rev1
i12.Version = 1
i12.Revision = rev2
c.Assert(infos[0], DeepEquals, i11)
c.Assert(infos[1], DeepEquals, i12)

rev3, err := optimism.PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
rev4, err := optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)
ifm, _, err = optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos = sortInfos(ifm)
c.Assert(len(infos), Equals, 3)

i11.Version = 2
i11.Revision = rev4
i12.Version = 1
i12.Revision = rev2
i21.Version = 1
i21.Revision = rev3
c.Assert(infos[0], DeepEquals, i12)
c.Assert(infos[1], DeepEquals, i21)
c.Assert(infos[2], DeepEquals, i11)
}

func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) {
defer clearOptimistTestSourceInfoOperation(c)

var (
logger = log.L()
o = NewOptimist(&logger)
task = "task"
source1 = "mysql-replica-1"
source2 = "mysql-replica-2"
downSchema = "db"
downTable = "tbl"
st1 = optimism.NewSourceTables(task, source1)
st2 = optimism.NewSourceTables(task, source2)
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
DDLs2 = []string{"ALTER TABLE bar DROP COLUMN c1"}
p = parser.New()
se = mock.NewContext()
tblID int64 = 111
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`)
ti3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c2 INT)`)

i11 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i21 = optimism.NewInfo(task, source2, "foo", "bar-1", downSchema, downTable, DDLs2, ti2, []*model.TableInfo{ti3})
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

st1.AddTable("db", "tbl-1", downSchema, downTable)
st2.AddTable("db", "tbl-1", downSchema, downTable)

c.Assert(o.Start(ctx, etcdTestCli), IsNil)
_, err := optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, IsNil)
_, err = optimism.PutSourceTables(etcdTestCli, st2)
c.Assert(err, IsNil)

_, err = optimism.PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
_, err = optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)

ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)

lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
c.Assert(len(lockJoined), Equals, 1)
c.Assert(len(lockTTS), Equals, 1)
joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)]
c.Assert(ok, IsTrue)
cmp, err := joined.Compare(schemacmp.Encode(ti2))
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)
}
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2242,6 +2242,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-master-38055]
message = "table-info-before not exist in optimistic ddls: %v"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-worker-40001]
message = "parse dm-worker config flag set"
description = ""
Expand Down
5 changes: 5 additions & 0 deletions pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Info struct {
// only set it when get/watch from etcd
Version int64 `json:"-"`

// only set it when get from etcd
// use for sort infos in recoverlock
Revision int64 `json:"-"`

// use to resolve conflict
IgnoreConflict bool `json:"ignore-conflict"`
}
Expand Down Expand Up @@ -132,6 +136,7 @@ func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[stri
return nil, 0, err2
}
info.Version = kv.Version
info.Revision = kv.ModRevision

if _, ok := ifm[info.Task]; !ok {
ifm[info.Task] = make(map[string]map[string]map[string]Info)
Expand Down
4 changes: 4 additions & 0 deletions pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm[task1][source1][upSchema], HasLen, 1)
i11WithVer := i11
i11WithVer.Version = 2
i11WithVer.Revision = ifm[task1][source1][upSchema][upTable].Revision
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)

// put another key and get again with 2 info.
Expand All @@ -141,6 +142,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
i12WithVer := i12
i12WithVer.Version = 1
i12WithVer.Revision = ifm[task1][source2][upSchema][upTable].Revision
c.Assert(ifm[task1][source2][upSchema][upTable], DeepEquals, i12WithVer)

// start the watcher.
Expand Down Expand Up @@ -210,8 +212,10 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm, HasKey, task1)
c.Assert(ifm, HasKey, task2)
c.Assert(ifm[task1], HasLen, 1)
i11WithVer.Revision = ifm[task1][source1][upSchema][upTable].Revision
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
c.Assert(ifm[task2], HasLen, 1)
i21WithVer.Revision = ifm[task2][source1][upSchema][upTable].Revision
c.Assert(ifm[task2][source1][upSchema][upTable], DeepEquals, i21WithVer)

// watch the deletion for i12.
Expand Down
Loading

0 comments on commit c614263

Please sign in to comment.