Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimist: return ConflictNone if a conflict DDL has no conflict with at lease one normal table #5414

Merged
merged 7 commits into from
May 17, 2022
77 changes: 29 additions & 48 deletions dm/pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,8 +889,8 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab

log.L().Info("found conflict for DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable), log.ShortError(tableErr))

if idempotent {
log.L().Info("return conflict DDL for idempotent DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable))
if idempotent || l.noConflictWithOneNormalTable(source, schema, table, prevTable, postTable) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idempotent means the DDL has already been executed in this table before.
noConflictWithOneNormalTable means the DDL has already been executed in at least one other table before.

log.L().Info("directly return conflict DDL", zap.Bool("idempotent", idempotent), zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable))
l.tables[source][schema][table] = postTable
l.finalTables[source][schema][table] = postTable
return true, ConflictNone
Expand All @@ -902,15 +902,6 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab
l.addConflictTable(source, schema, table, postTable)
l.finalTables[source][schema][table] = postTable

// if more than one conflict tables and this conflict DDL has no conflict with normal tables
// e.g. tb1,tb2 put ddl1(rename a to b); tb1 put ddl2(rename c to d); tb2 crash and reput ddl1(rename a to b)
// now tb2's ddl1 is a conflict DDL but has no conflict with normal tables
if l.multipleConflictTables() && l.noConflictWithNormalTables(source, schema, table, postTable) {
l.removeConflictTable(source, schema, table)
l.tables[source][schema][table] = postTable
return true, ConflictNone
}

// if any conflict happened between conflict DDLs, return error
// e.g. tb1: "ALTER TABLE RENAME a TO b", tb2: "ALTER TABLE RENAME c TO d"
if !l.noConflictForConflictTables() {
Expand Down Expand Up @@ -1085,33 +1076,39 @@ func (l *Lock) allFinalTableLarger() bool {
return l.allTableLarger(finalTables)
}

// judge whether a conflict DDL has no conflict with all normal tables.
func (l *Lock) noConflictWithNormalTables(source, schema, table string, postTable schemacmp.Table) bool {
// revert conflict tables and final tables
currentConflictTables := l.conflictTables
currentFinalTables := l.finalTables
defer func() {
l.conflictTables = currentConflictTables
l.finalTables = currentFinalTables
}()

// reset conflict tables and final tables
l.conflictTables = make(map[string]map[string]map[string]schemacmp.Table)
l.finalTables = make(map[string]map[string]map[string]schemacmp.Table)
// jude a conflict ddl is no conflict with at least one normal table.
func (l *Lock) noConflictWithOneNormalTable(callerSource, callerSchema, callerTable string, prevTable, postTable schemacmp.Table) bool {
for source, schemaTables := range l.tables {
l.finalTables[source] = make(map[string]map[string]schemacmp.Table)
for schema, tables := range schemaTables {
l.finalTables[source][schema] = make(map[string]schemacmp.Table)
for table, ti := range tables {
l.finalTables[source][schema][table] = ti
if source == callerSource && schema == callerSchema && table == callerTable {
continue
}

// judge joined no error
joined, err := postTable.Join(ti)
if err != nil {
continue
}

// judge this normal table is smaller(same as allTableSmaller)
if _, err = joined.Compare(prevTable); err == nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy from

if _, err = joined.Compare(ti); err == nil {

continue
}

// judge this normal table is larger(same as allTableLarger)
if joined, err = prevTable.Join(ti); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy from

joined, err := ti.Join(finalTi)

joined = ti
}
if cmp, err := joined.Compare(postTable); err != nil || cmp < 0 {
continue
}

return true
}
}
}
// update for current conflict DDL
l.addConflictTable(source, schema, table, postTable)
l.finalTables[source][schema][table] = postTable

return l.noConflictForFinalTables()
return false
}

// judge whether all conflict tables has no conflict.
Expand Down Expand Up @@ -1202,19 +1199,3 @@ func (l *Lock) redirectForConflictTables(callerSource, callerSchema, callerTable
}
return nil
}

// multipleConflictTables check whether a lock has multiple conflict tables.
func (l *Lock) multipleConflictTables() bool {
cnt := 0
for _, schemaTables := range l.conflictTables {
for _, tables := range schemaTables {
for range tables {
cnt++
if cnt > 1 {
return true
}
}
}
}
return false
}
58 changes: 55 additions & 3 deletions dm/pkg/shardddl/optimism/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2583,7 +2583,7 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) {
// tb1: rename id to a
l.addConflictTable(source, schema, table1, t9)
l.finalTables[source][schema][table1] = t9
c.Assert(l.noConflictWithNormalTables(source, schema, table1, t1), IsFalse)
c.Assert(l.noConflictWithOneNormalTable(source, schema, table1, t1, t9), IsFalse)
c.Assert(l.allConflictTableSmaller(), IsTrue)
c.Assert(l.allConflictTableLarger(), IsTrue)
c.Assert(l.allFinalTableSmaller(), IsFalse)
Expand All @@ -2592,13 +2592,13 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) {
l.tables[source][schema][table2] = t0
l.addConflictTable(source, schema, table2, t1)
l.finalTables[source][schema][table2] = t1
c.Assert(l.noConflictWithNormalTables(source, schema, table2, t1), IsTrue)
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue)
l.removeConflictTable(source, schema, table2)
l.tables[source][schema][table2] = t1
// tb2: rename id to a
l.addConflictTable(source, schema, table2, t9)
l.finalTables[source][schema][table2] = t9
c.Assert(l.noConflictWithNormalTables(source, schema, table2, t9), IsFalse)
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t1, t9), IsFalse)
c.Assert(l.allConflictTableSmaller(), IsTrue)
c.Assert(l.allConflictTableLarger(), IsTrue)
c.Assert(l.allFinalTableSmaller(), IsTrue)
Expand All @@ -2614,6 +2614,58 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) {
c.Assert(l.tables[source][schema][table2], DeepEquals, t0)
}

func (t *testLock) TestNoConflictWithOneNormalTable(c *C) {
var (
source = "source"
schema = "schema"
table1 = "table1"
table2 = "table2"
p = parser.New()
se = mock.NewContext()
tblID int64 = 111
ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, col int)`)
ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, new_col int)`)
ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, col varchar(4))`)
ti3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, new_col2 int)`)
ti4 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, b int, new_col int)`)
t0 = schemacmp.Encode(ti0)
t1 = schemacmp.Encode(ti1)
t2 = schemacmp.Encode(ti2)
t3 = schemacmp.Encode(ti3)
t4 = schemacmp.Encode(ti4)
)
l := &Lock{
tables: map[string]map[string]map[string]schemacmp.Table{
source: {
schema: {table1: t0, table2: t0},
},
},
}

// table1 nothing happened.
// table2 rename column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsFalse)

// mock table1 rename column already
l.tables[source][schema][table1] = t1
// table2 rename column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue)
// table2 modify column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t2), IsFalse)
// table2 different rename
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t3), IsFalse)

// mock table1 rename another column already
l.tables[source][schema][table1] = t4
// same results
// table2 rename column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue)
// table2 modify column
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t2), IsFalse)
// table2 different rename
c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t3), IsFalse)
}

func checkRedirectOp(c *C, task, source, schema, table string) bool {
ops, _, err := GetAllOperations(etcdTestCli)
c.Assert(err, IsNil)
Expand Down