Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

bug-fix: use table-info-before always and fix bug for recover lock in optimistic (#1518) #1536

Merged
merged 1 commit into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high
ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "Message: source bound is changed too frequently, last old bound %s:, new bound %s, Workaround: Please try again later"
ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation."
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table-info-before not exist in optimistic ddls: %v"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
1 change: 1 addition & 0 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, false)
)

st1.AddTable("foo-1", "bar-1", schema, table)
_, err = optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, check.IsNil)
_, err = optimism.PutInfo(etcdTestCli, info1)
Expand Down
84 changes: 70 additions & 14 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/schemacmp"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/optimism"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// Optimist is used to coordinate the shard DDL migration in optimism mode.
Expand Down Expand Up @@ -255,29 +257,83 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
return revSource, revInfo, revOperation, nil
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation) error {
// construct locks based on the shard DDL info.
for task, ifTask := range ifm {
// sortInfos sort all infos by revision
func sortInfos(ifm map[string]map[string]map[string]map[string]optimism.Info) []optimism.Info {
infos := make([]optimism.Info, 0, len(ifm))

for _, ifTask := range ifm {
for _, ifSource := range ifTask {
for _, ifSchema := range ifSource {
for _, info := range ifSchema {
tts := o.tk.FindTables(task, info.DownSchema, info.DownTable)
_, _, err := o.lk.TrySync(info, tts)
if err != nil {
return err
infos = append(infos, info)
}
}
}
}

// sort according to the Revision
sort.Slice(infos, func(i, j int) bool {
return infos[i].Revision < infos[j].Revision
})
return infos
}

// buildLockJoinedAndTTS build joined table and target table slice for lock by history infos
func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]map[string]optimism.Info) (map[string]schemacmp.Table, map[string][]optimism.TargetTable) {
lockJoined := make(map[string]schemacmp.Table)
lockTTS := make(map[string][]optimism.TargetTable)

for _, taskInfos := range ifm {
for _, sourceInfos := range taskInfos {
for _, schemaInfos := range sourceInfos {
for _, info := range schemaInfos {
lockID := utils.GenDDLLockID(info.Task, info.DownSchema, info.DownTable)
if joined, ok := lockJoined[lockID]; !ok {
lockJoined[lockID] = schemacmp.Encode(info.TableInfoBefore)
} else {
newJoined, err := joined.Join(schemacmp.Encode(info.TableInfoBefore))
// ignore error, will report it in TrySync later
if err != nil {
o.logger.Error(fmt.Sprintf("fail to join table info %s with %s, lockID: %s in recover lock", joined, newJoined, lockID), log.ShortError(err))
} else {
lockJoined[lockID] = newJoined
}
}
// never mark the lock operation from `done` to `not-done` when recovering.
err = o.handleLock(info, tts, true)
if err != nil {
return err
if _, ok := lockTTS[lockID]; !ok {
lockTTS[lockID] = o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
}
}
}
}
}
return lockJoined, lockTTS
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation) error {
// construct joined table based on the shard DDL info.
o.logger.Info("build lock joined and tts")
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
// build lock and restore table info
o.logger.Info("rebuild locks and tables")
o.lk.RebuildLocksAndTables(ifm, lockJoined, lockTTS)
// sort infos by revision
infos := sortInfos(ifm)

for _, info := range infos {
tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
_, _, err := o.lk.TrySync(info, tts)
if err != nil {
return err
}
// never mark the lock operation from `done` to `not-done` when recovering.
err = o.handleLock(info, tts, true)
if err != nil {
return err
}
}

// update the done status of the lock.
for _, opTask := range opm {
Expand Down
130 changes: 128 additions & 2 deletions dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/schemacmp"
tiddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -148,6 +149,7 @@ func (t *testOptimist) TestOptimist(c *C) {
t.testOptimist(c, noRestart)
t.testOptimist(c, restartOnly)
t.testOptimist(c, restartNewInstance)
t.testSortInfos(c)
}

func (t *testOptimist) testOptimist(c *C, restart int) {
Expand Down Expand Up @@ -656,14 +658,13 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
tblID int64 = 111
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"}
DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c1 DATETIME"}
DDLs3 = []string{"ALTER TABLE bar DROP COLUMN c1"}
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 DATETIME)`)
ti3 = ti0
i1 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i2 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs2, ti0, []*model.TableInfo{ti2})
i3 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs3, ti2, []*model.TableInfo{ti3})
i3 = optimism.NewInfo(task, source1, "foo", "bar-2", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti3})
)

st1.AddTable("foo", "bar-1", downSchema, downTable)
Expand Down Expand Up @@ -714,6 +715,7 @@ func (t *testOptimist) TestOptimistLockConflict(c *C) {
c.Assert(len(errCh), Equals, 0)

// PUT i3, no conflict now.
// case for handle-error replace
rev3, err := optimism.PutInfo(etcdTestCli, i3)
c.Assert(err, IsNil)
// wait operation for i3 become available.
Expand Down Expand Up @@ -1029,3 +1031,127 @@ func (t *testOptimist) TestOptimistInitSchema(c *C) {
c.Assert(err, IsNil)
c.Assert(is.TableInfo, DeepEquals, ti1) // the init schema is ti1 now.
}

func (t *testOptimist) testSortInfos(c *C) {
defer clearOptimistTestSourceInfoOperation(c)

var (
task = "test-optimist-init-schema"
sources = []string{"mysql-replica-1", "mysql-replica-2"}
upSchema = "foo"
upTables = []string{"bar-1", "bar-2"}
downSchema = "foo"
downTable = "bar"

p = parser.New()
se = mock.NewContext()
tblID int64 = 111
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"}
DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"}
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT, c2 INT)`)
i11 = optimism.NewInfo(task, sources[0], upSchema, upTables[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i12 = optimism.NewInfo(task, sources[0], upSchema, upTables[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i21 = optimism.NewInfo(task, sources[1], upSchema, upTables[1], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2})
)

rev1, err := optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)
ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos := sortInfos(ifm)
c.Assert(len(infos), Equals, 1)
i11.Version = 1
i11.Revision = rev1
c.Assert(infos[0], DeepEquals, i11)

rev2, err := optimism.PutInfo(etcdTestCli, i12)
c.Assert(err, IsNil)
ifm, _, err = optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos = sortInfos(ifm)
c.Assert(len(infos), Equals, 2)
i11.Version = 1
i11.Revision = rev1
i12.Version = 1
i12.Revision = rev2
c.Assert(infos[0], DeepEquals, i11)
c.Assert(infos[1], DeepEquals, i12)

rev3, err := optimism.PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
rev4, err := optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)
ifm, _, err = optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos = sortInfos(ifm)
c.Assert(len(infos), Equals, 3)

i11.Version = 2
i11.Revision = rev4
i12.Version = 1
i12.Revision = rev2
i21.Version = 1
i21.Revision = rev3
c.Assert(infos[0], DeepEquals, i12)
c.Assert(infos[1], DeepEquals, i21)
c.Assert(infos[2], DeepEquals, i11)
}

func (t *testOptimist) TestBuildLockJoinedAndTable(c *C) {
defer clearOptimistTestSourceInfoOperation(c)

var (
logger = log.L()
o = NewOptimist(&logger)
task = "task"
source1 = "mysql-replica-1"
source2 = "mysql-replica-2"
downSchema = "db"
downTable = "tbl"
st1 = optimism.NewSourceTables(task, source1)
st2 = optimism.NewSourceTables(task, source2)
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
DDLs2 = []string{"ALTER TABLE bar DROP COLUMN c1"}
p = parser.New()
se = mock.NewContext()
tblID int64 = 111
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 INT, c2 INT)`)
ti3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c2 INT)`)

i11 = optimism.NewInfo(task, source1, "foo", "bar-1", downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i21 = optimism.NewInfo(task, source2, "foo", "bar-1", downSchema, downTable, DDLs2, ti2, []*model.TableInfo{ti3})
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

st1.AddTable("db", "tbl-1", downSchema, downTable)
st2.AddTable("db", "tbl-1", downSchema, downTable)

c.Assert(o.Start(ctx, etcdTestCli), IsNil)
_, err := optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, IsNil)
_, err = optimism.PutSourceTables(etcdTestCli, st2)
c.Assert(err, IsNil)

_, err = optimism.PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
_, err = optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)

ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)

lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
c.Assert(len(lockJoined), Equals, 1)
c.Assert(len(lockTTS), Equals, 1)
joined, ok := lockJoined[utils.GenDDLLockID(task, downSchema, downTable)]
c.Assert(ok, IsTrue)
cmp, err := joined.Compare(schemacmp.Encode(ti2))
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)
}
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2242,6 +2242,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-master-38055]
message = "table-info-before not exist in optimistic ddls: %v"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-worker-40001]
message = "parse dm-worker config flag set"
description = ""
Expand Down
5 changes: 5 additions & 0 deletions pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Info struct {
// only set it when get/watch from etcd
Version int64 `json:"-"`

// only set it when get from etcd
// use for sort infos in recoverlock
Revision int64 `json:"-"`

// use to resolve conflict
IgnoreConflict bool `json:"ignore-conflict"`
}
Expand Down Expand Up @@ -132,6 +136,7 @@ func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[stri
return nil, 0, err2
}
info.Version = kv.Version
info.Revision = kv.ModRevision

if _, ok := ifm[info.Task]; !ok {
ifm[info.Task] = make(map[string]map[string]map[string]Info)
Expand Down
4 changes: 4 additions & 0 deletions pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm[task1][source1][upSchema], HasLen, 1)
i11WithVer := i11
i11WithVer.Version = 2
i11WithVer.Revision = ifm[task1][source1][upSchema][upTable].Revision
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)

// put another key and get again with 2 info.
Expand All @@ -141,6 +142,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
i12WithVer := i12
i12WithVer.Version = 1
i12WithVer.Revision = ifm[task1][source2][upSchema][upTable].Revision
c.Assert(ifm[task1][source2][upSchema][upTable], DeepEquals, i12WithVer)

// start the watcher.
Expand Down Expand Up @@ -210,8 +212,10 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm, HasKey, task1)
c.Assert(ifm, HasKey, task2)
c.Assert(ifm[task1], HasLen, 1)
i11WithVer.Revision = ifm[task1][source1][upSchema][upTable].Revision
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
c.Assert(ifm[task2], HasLen, 1)
i21WithVer.Revision = ifm[task2][source1][upSchema][upTable].Revision
c.Assert(ifm[task2][source1][upSchema][upTable], DeepEquals, i21WithVer)

// watch the deletion for i12.
Expand Down
Loading