Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

bug-fix: use table-info-before always and fix bug for recover lock in optimistic #1518

Merged
merged 22 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high
ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "Message: source bound is changed too frequently, last old bound %s:, new bound %s, Workaround: Please try again later"
ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation."
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table info before not exist in optimistic ddls: %v"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
48 changes: 32 additions & 16 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,30 +255,46 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
return revSource, revInfo, revOperation, nil
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation) error {
// construct locks based on the shard DDL info.
for task, ifTask := range ifm {
// sortInfos sort all infos by revision
func sortInfos(ifm map[string]map[string]map[string]map[string]optimism.Info) []optimism.Info {
infos := make([]optimism.Info, 0, len(ifm))

for _, ifTask := range ifm {
for _, ifSource := range ifTask {
for _, ifSchema := range ifSource {
for _, info := range ifSchema {
tts := o.tk.FindTables(task, info.DownSchema, info.DownTable)
_, _, err := o.lk.TrySync(info, tts)
if err != nil {
return err
}
// never mark the lock operation from `done` to `not-done` when recovering.
err = o.handleLock(info, tts, true)
if err != nil {
return err
}
infos = append(infos, info)
}
}
}
}

// sort according to the ReVision
sort.Slice(infos, func(i, j int) bool {
return infos[i].ReVision < infos[j].ReVision
})
return infos
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation) error {
// construct locks based on the shard DDL info.
infos := sortInfos(ifm)
for _, info := range infos {
tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
_, _, err := o.lk.TrySync(info, tts)
if err != nil {
return err
}
// never mark the lock operation from `done` to `not-done` when recovering.
err = o.handleLock(info, tts, true)
if err != nil {
return err
}
}

// update the done status of the lock.
for _, opTask := range opm {
for _, opSource := range opTask {
Expand Down
68 changes: 68 additions & 0 deletions dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (t *testOptimist) TestOptimist(c *C) {
t.testOptimist(c, noRestart)
t.testOptimist(c, restartOnly)
t.testOptimist(c, restartNewInstance)
t.TestSortInfos(c)
}

func (t *testOptimist) testOptimist(c *C, restart int) {
Expand Down Expand Up @@ -1029,3 +1030,70 @@ func (t *testOptimist) TestOptimistInitSchema(c *C) {
c.Assert(err, IsNil)
c.Assert(is.TableInfo, DeepEquals, ti1) // the init schema is ti1 now.
}

func (t *testOptimist) TestSortInfos(c *C) {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
defer clearOptimistTestSourceInfoOperation(c)

var (
task = "test-optimist-init-schema"
sources = []string{"mysql-replica-1", "mysql-replica-2"}
upSchema = "foo"
upTables = []string{"bar-1", "bar-2"}
downSchema = "foo"
downTable = "bar"

p = parser.New()
se = mock.NewContext()
tblID int64 = 111
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 TEXT"}
DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"}
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT, c2 INT)`)
i11 = optimism.NewInfo(task, sources[0], upSchema, upTables[0], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i12 = optimism.NewInfo(task, sources[0], upSchema, upTables[1], downSchema, downTable, DDLs1, ti0, []*model.TableInfo{ti1})
i21 = optimism.NewInfo(task, sources[1], upSchema, upTables[1], downSchema, downTable, DDLs2, ti1, []*model.TableInfo{ti2})
)

rev1, err := optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)
ifm, _, err := optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos := sortInfos(ifm)
c.Assert(len(infos), Equals, 1)
i11.Version = 1
i11.ReVision = rev1
c.Assert(infos[0], DeepEquals, i11)

rev2, err := optimism.PutInfo(etcdTestCli, i12)
c.Assert(err, IsNil)
ifm, _, err = optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos = sortInfos(ifm)
c.Assert(len(infos), Equals, 2)
i11.Version = 1
i11.ReVision = rev1
i12.Version = 1
i12.ReVision = rev2
c.Assert(infos[0], DeepEquals, i11)
c.Assert(infos[1], DeepEquals, i12)

rev3, err := optimism.PutInfo(etcdTestCli, i21)
c.Assert(err, IsNil)
rev4, err := optimism.PutInfo(etcdTestCli, i11)
c.Assert(err, IsNil)
ifm, _, err = optimism.GetAllInfo(etcdTestCli)
c.Assert(err, IsNil)
infos = sortInfos(ifm)
c.Assert(len(infos), Equals, 3)

i11.Version = 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we could replace Info.Version by Revision (could file another PR)

i11.ReVision = rev4
i12.Version = 1
i12.ReVision = rev2
i21.Version = 1
i21.ReVision = rev3
c.Assert(infos[0], DeepEquals, i12)
c.Assert(infos[1], DeepEquals, i21)
c.Assert(infos[2], DeepEquals, i11)
}
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2242,6 +2242,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-master-38055]
message = "table info before not exist in optimistic ddls: %v"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-worker-40001]
message = "parse dm-worker config flag set"
description = ""
Expand Down
5 changes: 5 additions & 0 deletions pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Info struct {
// only set it when get/watch from etcd
Version int64 `json:"-"`

// only set it when get from etcd
// use for sort infos in recoverlock
ReVision int64 `json:"-"`
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

// use to resolve conflict
IgnoreConflict bool `json:"ignore-conflict"`
}
Expand Down Expand Up @@ -132,6 +136,7 @@ func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[stri
return nil, 0, err2
}
info.Version = kv.Version
info.ReVision = kv.ModRevision

if _, ok := ifm[info.Task]; !ok {
ifm[info.Task] = make(map[string]map[string]map[string]Info)
Expand Down
4 changes: 4 additions & 0 deletions pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm[task1][source1][upSchema], HasLen, 1)
i11WithVer := i11
i11WithVer.Version = 2
i11WithVer.ReVision = ifm[task1][source1][upSchema][upTable].ReVision
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)

// put another key and get again with 2 info.
Expand All @@ -141,6 +142,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
i12WithVer := i12
i12WithVer.Version = 1
i12WithVer.ReVision = ifm[task1][source2][upSchema][upTable].ReVision
c.Assert(ifm[task1][source2][upSchema][upTable], DeepEquals, i12WithVer)

// start the watcher.
Expand Down Expand Up @@ -210,8 +212,10 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm, HasKey, task1)
c.Assert(ifm, HasKey, task2)
c.Assert(ifm[task1], HasLen, 1)
i11WithVer.ReVision = ifm[task1][source1][upSchema][upTable].ReVision
c.Assert(ifm[task1][source1][upSchema][upTable], DeepEquals, i11WithVer)
c.Assert(ifm[task2], HasLen, 1)
i21WithVer.ReVision = ifm[task2][source1][upSchema][upTable].ReVision
c.Assert(ifm[task2][source1][upSchema][upTable], DeepEquals, i21WithVer)

// watch the deletion for i12.
Expand Down
5 changes: 5 additions & 0 deletions pkg/shardddl/optimism/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"sync"

"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

Expand Down Expand Up @@ -45,6 +46,10 @@ func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, e
lk.mu.Lock()
defer lk.mu.Unlock()

if info.TableInfoBefore == nil {
return "", nil, terror.ErrMasterOptimisticTableInfoBeforeNotExist.Generate(info.DDLs)
}

if l, ok = lk.locks[lockID]; !ok {
lk.locks[lockID] = NewLock(lockID, info.Task, info.DownSchema, info.DownTable, info.TableInfoBefore, tts)
l = lk.locks[lockID]
Expand Down
27 changes: 26 additions & 1 deletion pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ type Lock struct {
// upstream source ID -> upstream schema name -> upstream table name -> table info.
// if all of them are the same, then we call the lock `synced`.
tables map[string]map[string]map[string]schemacmp.Table

// receives record whether the lock receives a table's info
// usptream source ID -> upstream schema name -> upstream table name -> bool
// if an info of table hasn't received, we use the joined table instead
// if the info received, mark it received and replace the table info in `tables`
received map[string]map[string]map[string]bool

synced bool

// whether DDLs operations have done (execute the shard DDL) to the downstream.
Expand All @@ -68,6 +75,7 @@ func NewLock(ID, task, downSchema, downTable string, ti *model.TableInfo, tts []
DownTable: downTable,
joined: schemacmp.Encode(ti),
tables: make(map[string]map[string]map[string]schemacmp.Table),
received: make(map[string]map[string]map[string]bool),
done: make(map[string]map[string]map[string]bool),
synced: true,
versions: make(map[string]map[string]map[string]int64),
Expand Down Expand Up @@ -149,13 +157,19 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro
return ddls, terror.ErrMasterInconsistentOptimisticDDLsAndInfo.Generate(len(ddls), len(newTIs))
}

// should not happen
if info.TableInfoBefore == nil {
return ddls, terror.ErrMasterOptimisticTableInfoBeforeNotExist.Generate(ddls)
}
oldTable := schemacmp.Encode(info.TableInfoBefore)
// handle the case where <callerSource, callerSchema, callerTable>
// is not in old source tables and current new source tables.
// duplicate append is not a problem.
tts = append(tts, newTargetTable(l.Task, callerSource, l.DownSchema, l.DownTable,
map[string]map[string]struct{}{callerSchema: {callerTable: struct{}{}}}))
// add any new source tables.
l.addTables(tts)
l.receiveTable(callerSource, callerSchema, callerTable, oldTable)
if val, ok := l.versions[callerSource][callerSchema][callerTable]; !ok || val < infoVersion {
l.versions[callerSource][callerSchema][callerTable] = infoVersion
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we changed the l.tables by l.receiveTable, so in line 179 oldJoined is not a result of "join every l.tables" 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean join all the tables for old joined?

Copy link
Collaborator

@lance6716 lance6716 Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just not sure if the correctness depends on "old joined should be joined result of all l.tables", or "old joined of previous TrySync and new joined of next TrySync should keep consistent"

Copy link
Collaborator Author

@GMHDBJD GMHDBJD Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we assume user start-task with same table info in all upstream, tableInfoBefore should be same as oldJoined or oldtable. Otherwise, the oldJoined is not collect and we may keep it and hope later join may return an error? cc @lichunzhu

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If dm-master/dm-worker meets a restart the table info in all upstream is more likely to be different. What's the problem here? I don't understand this clearly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the tableInfoBefore not equal l.tables[source][schema][table], that means the l.tables[source][schema][table] and oldjoined may wrong, do we need join all the tables with tableInfoBefore as oldjoined?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the tableInfoBefore not equal l.tables[source][schema][table], that means the l.tables[source][schema][table] and oldjoined may wrong, do we need join all the tables with tableInfoBefore as oldjoined?

I think carefully about this situation. Could you help me check if I'm wrong?

When this happens, it means we add a table whose table schema is not equal to joined table info now. When we init all the tables, we have three situations:

  1. This task is new. The pre-check will assure the correctness of the tables' schemas.
  2. This dm cluster recovers from a failure. The lock is synced now and all the tables have the same schema. If we add a table with a different schema, we should report an error.
  3. This dm cluster recovers from a failure. The lock is unsynced now. Some workers didn't put their lock Info into etcd before the restart. These tables' l.tables will be set to l.joined at first, and if it's different from the tableInfoBefore received later I think it's acceptable. Am I right?

Copy link
Collaborator Author

@GMHDBJD GMHDBJD Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you mean if we get a info which we have already received the table info before, if their schema is different, we should report an error?

Copy link
Contributor

@lichunzhu lichunzhu Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But I don't think we should report an error in situation 3.

Copy link
Collaborator Author

@GMHDBJD GMHDBJD Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some workers didn't put their lock Info into etcd before the restart. These tables' l.tables will be set to l.joined at first

That means the table haven't been received. l.received[x][x][x]==false

In unit test, we have some idempotent TrySync, how do we deal with the case? 🤔

Expand Down Expand Up @@ -205,6 +219,7 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro
// special case: if the DDL does not affect the schema at all, assume it is
// idempotent and just execute the DDL directly.
// if any real conflicts after joined exist, they will be detected by the following steps.
// this often happens when executing `CREATE TABLE` statement
var cmp int
if cmp, err = nextTable.Compare(oldJoined); err == nil && cmp == 0 {
newDDLs = append(newDDLs, ddls[idx])
Expand Down Expand Up @@ -319,6 +334,7 @@ func (l *Lock) TryRemoveTable(source, schema, table string) bool {
}

delete(l.tables[source][schema], table)
delete(l.received[source][schema], table)
_, remain := l.syncStatus()
l.synced = remain == 0
delete(l.done[source][schema], table)
Expand Down Expand Up @@ -464,24 +480,33 @@ func (l *Lock) tryRevertDone(source, schema, table string) {
l.done[source][schema][table] = false
}

func (l *Lock) receiveTable(callerSource string, callerSchema string, callerTable string, table schemacmp.Table) {
if !l.received[callerSource][callerSchema][callerTable] {
l.received[callerSource][callerSchema][callerTable] = true
l.tables[callerSource][callerSchema][callerTable] = table
}
}

// addTables adds any not-existing tables into the lock.
func (l *Lock) addTables(tts []TargetTable) {
for _, tt := range tts {
if _, ok := l.tables[tt.Source]; !ok {
l.tables[tt.Source] = make(map[string]map[string]schemacmp.Table)
l.received[tt.Source] = make(map[string]map[string]bool)
l.done[tt.Source] = make(map[string]map[string]bool)
l.versions[tt.Source] = make(map[string]map[string]int64)
}
for schema, tables := range tt.UpTables {
if _, ok := l.tables[tt.Source][schema]; !ok {
l.tables[tt.Source][schema] = make(map[string]schemacmp.Table)
l.received[tt.Source][schema] = make(map[string]bool)
l.done[tt.Source][schema] = make(map[string]bool)
l.versions[tt.Source][schema] = make(map[string]int64)
}
for table := range tables {
if _, ok := l.tables[tt.Source][schema][table]; !ok {
// NOTE: the newly added table uses the current table info.
l.tables[tt.Source][schema][table] = l.joined
l.received[tt.Source][schema][table] = false
l.done[tt.Source][schema][table] = false
l.versions[tt.Source][schema][table] = 0
log.L().Info("table added to the lock", zap.String("lock", l.ID),
Expand Down
Loading