Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

bug-fix: use table-info-before always and fix bug for recover lock in optimistic #1518

Merged
merged 22 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high
ErrMasterBoundChanging,[code=38052:class=dm-master:scope=internal:level=low], "Message: source bound is changed too frequently, last old bound %s:, new bound %s, Workaround: Please try again later"
ErrMasterFailToImportFromV10x,[code=38053:class=dm-master:scope=internal:level=high], "Message: fail to import DM cluster from v1.0.x, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation."
ErrMasterInconsistentOptimisticDDLsAndInfo,[code=38054:class=dm-master:scope=internal:level=high], "Message: inconsistent count of optimistic ddls and table infos, ddls: %d, table info: %d"
ErrMasterOptimisticTableInfoBeforeNotExist,[code=38055:class=dm-master:scope=internal:level=high], "Message: table info before not exist in optimistic ddls: %v"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2242,6 +2242,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-master-38055]
message = "table info before not exist in optimistic ddls: %v"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-dm-worker-40001]
message = "parse dm-worker config flag set"
description = ""
Expand Down
5 changes: 5 additions & 0 deletions pkg/shardddl/optimism/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"sync"

"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

Expand Down Expand Up @@ -45,6 +46,10 @@ func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, e
lk.mu.Lock()
defer lk.mu.Unlock()

if info.TableInfoBefore == nil {
return "", nil, terror.ErrMasterOptimisticTableInfoBeforeNotExist.Generate(info.DDLs)
}

if l, ok = lk.locks[lockID]; !ok {
lk.locks[lockID] = NewLock(lockID, info.Task, info.DownSchema, info.DownTable, info.TableInfoBefore, tts)
l = lk.locks[lockID]
Expand Down
15 changes: 10 additions & 5 deletions pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewLock(ID, task, downSchema, downTable string, ti *model.TableInfo, tts []
synced: true,
versions: make(map[string]map[string]map[string]int64),
}
l.addTables(tts)
l.addTables(l.joined, tts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if tts contains the other tables whose schema didn't reach l.joined?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here l.joined equals tableInfoBefore. If it's a new lock(first table), tableInfoBefore=l.joined. For other newer tables, they will be added in L163

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tts here is from https://github.com/pingcap/dm/blob/master/dm/master/shardddl/optimist.go#L267. optimistic will get all source tables from etcd and this tts may have not only the table in this info, I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

em... That's right, I will fix it

metrics.ReportDDLPending(task, metrics.DDLPendingNone, metrics.DDLPendingSynced)

return l
Expand Down Expand Up @@ -149,13 +149,18 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro
return ddls, terror.ErrMasterInconsistentOptimisticDDLsAndInfo.Generate(len(ddls), len(newTIs))
}

// should not happen
if info.TableInfoBefore == nil {
return ddls, terror.ErrMasterOptimisticTableInfoBeforeNotExist.Generate(ddls)
}
oldTable := schemacmp.Encode(info.TableInfoBefore)
// handle the case where <callerSource, callerSchema, callerTable>
// is not in old source tables and current new source tables.
// duplicate append is not a problem.
tts = append(tts, newTargetTable(l.Task, callerSource, l.DownSchema, l.DownTable,
map[string]map[string]struct{}{callerSchema: {callerTable: struct{}{}}}))
// add any new source tables.
l.addTables(tts)
l.addTables(oldTable, tts)
if val, ok := l.versions[callerSource][callerSchema][callerTable]; !ok || val < infoVersion {
l.versions[callerSource][callerSchema][callerTable] = infoVersion
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we changed the l.tables by l.receiveTable, so in line 179 oldJoined is not a result of "join every l.tables" 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean join all the tables for old joined?

Copy link
Collaborator

@lance6716 lance6716 Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just not sure if the correctness depends on "old joined should be joined result of all l.tables", or "old joined of previous TrySync and new joined of next TrySync should keep consistent"

Copy link
Collaborator Author

@GMHDBJD GMHDBJD Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we assume user start-task with same table info in all upstream, tableInfoBefore should be same as oldJoined or oldtable. Otherwise, the oldJoined is not collect and we may keep it and hope later join may return an error? cc @lichunzhu

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If dm-master/dm-worker meets a restart the table info in all upstream is more likely to be different. What's the problem here? I don't understand this clearly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the tableInfoBefore not equal l.tables[source][schema][table], that means the l.tables[source][schema][table] and oldjoined may wrong, do we need join all the tables with tableInfoBefore as oldjoined?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the tableInfoBefore not equal l.tables[source][schema][table], that means the l.tables[source][schema][table] and oldjoined may wrong, do we need join all the tables with tableInfoBefore as oldjoined?

I think carefully about this situation. Could you help me check if I'm wrong?

When this happens, it means we add a table whose table schema is not equal to joined table info now. When we init all the tables, we have three situations:

  1. This task is new. The pre-check will assure the correctness of the tables' schemas.
  2. This dm cluster recovers from a failure. The lock is synced now and all the tables have the same schema. If we add a table with a different schema, we should report an error.
  3. This dm cluster recovers from a failure. The lock is unsynced now. Some workers didn't put their lock Info into etcd before the restart. These tables' l.tables will be set to l.joined at first, and if it's different from the tableInfoBefore received later I think it's acceptable. Am I right?

Copy link
Collaborator Author

@GMHDBJD GMHDBJD Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you mean if we get a info which we have already received the table info before, if their schema is different, we should report an error?

Copy link
Contributor

@lichunzhu lichunzhu Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But I don't think we should report an error in situation 3.

Copy link
Collaborator Author

@GMHDBJD GMHDBJD Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some workers didn't put their lock Info into etcd before the restart. These tables' l.tables will be set to l.joined at first

That means the table haven't been received. l.received[x][x][x]==false

In unit test, we have some idempotent TrySync, how do we deal with the case? 🤔

Expand Down Expand Up @@ -205,6 +210,7 @@ func (l *Lock) TrySync(info Info, tts []TargetTable) (newDDLs []string, err erro
// special case: if the DDL does not affect the schema at all, assume it is
// idempotent and just execute the DDL directly.
// if any real conflicts after joined exist, they will be detected by the following steps.
// this often happens when executing `CREATE TABLE` statement
var cmp int
if cmp, err = nextTable.Compare(oldJoined); err == nil && cmp == 0 {
newDDLs = append(newDDLs, ddls[idx])
Expand Down Expand Up @@ -465,7 +471,7 @@ func (l *Lock) tryRevertDone(source, schema, table string) {
}

// addTables adds any not-existing tables into the lock.
func (l *Lock) addTables(tts []TargetTable) {
func (l *Lock) addTables(tb schemacmp.Table, tts []TargetTable) {
for _, tt := range tts {
if _, ok := l.tables[tt.Source]; !ok {
l.tables[tt.Source] = make(map[string]map[string]schemacmp.Table)
Expand All @@ -480,8 +486,7 @@ func (l *Lock) addTables(tts []TargetTable) {
}
for table := range tables {
if _, ok := l.tables[tt.Source][schema][table]; !ok {
// NOTE: the newly added table uses the current table info.
l.tables[tt.Source][schema][table] = l.joined
l.tables[tt.Source][schema][table] = tb
l.done[tt.Source][schema][table] = false
l.versions[tt.Source][schema][table] = 0
log.L().Info("table added to the lock", zap.String("lock", l.ID),
Expand Down
Loading