diff --git a/dm/pkg/shardddl/optimism/lock.go b/dm/pkg/shardddl/optimism/lock.go index 5c39444d57f..bf96c1e6912 100644 --- a/dm/pkg/shardddl/optimism/lock.go +++ b/dm/pkg/shardddl/optimism/lock.go @@ -889,8 +889,8 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab log.L().Info("found conflict for DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable), log.ShortError(tableErr)) - if idempotent { - log.L().Info("return conflict DDL for idempotent DDL", zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable)) + if idempotent || l.noConflictWithOneNormalTable(source, schema, table, prevTable, postTable) { + log.L().Info("directly return conflict DDL", zap.Bool("idempotent", idempotent), zap.String("source", source), zap.String("schema", schema), zap.String("table", table), zap.Stringer("prevTable", prevTable), zap.Stringer("postTable", postTable)) l.tables[source][schema][table] = postTable l.finalTables[source][schema][table] = postTable return true, ConflictNone @@ -902,15 +902,6 @@ func (l *Lock) trySyncForOneDDL(source, schema, table string, prevTable, postTab l.addConflictTable(source, schema, table, postTable) l.finalTables[source][schema][table] = postTable - // if more than one conflict tables and this conflict DDL has no conflict with normal tables - // e.g. tb1,tb2 put ddl1(rename a to b); tb1 put ddl2(rename c to d); tb2 crash and reput ddl1(rename a to b) - // now tb2's ddl1 is a conflict DDL but has no conflict with normal tables - if l.multipleConflictTables() && l.noConflictWithNormalTables(source, schema, table, postTable) { - l.removeConflictTable(source, schema, table) - l.tables[source][schema][table] = postTable - return true, ConflictNone - } - // if any conflict happened between conflict DDLs, return error // e.g. tb1: "ALTER TABLE RENAME a TO b", tb2: "ALTER TABLE RENAME c TO d" if !l.noConflictForConflictTables() { @@ -1085,33 +1076,39 @@ func (l *Lock) allFinalTableLarger() bool { return l.allTableLarger(finalTables) } -// judge whether a conflict DDL has no conflict with all normal tables. -func (l *Lock) noConflictWithNormalTables(source, schema, table string, postTable schemacmp.Table) bool { - // revert conflict tables and final tables - currentConflictTables := l.conflictTables - currentFinalTables := l.finalTables - defer func() { - l.conflictTables = currentConflictTables - l.finalTables = currentFinalTables - }() - - // reset conflict tables and final tables - l.conflictTables = make(map[string]map[string]map[string]schemacmp.Table) - l.finalTables = make(map[string]map[string]map[string]schemacmp.Table) +// jude a conflict ddl is no conflict with at least one normal table. +func (l *Lock) noConflictWithOneNormalTable(callerSource, callerSchema, callerTable string, prevTable, postTable schemacmp.Table) bool { for source, schemaTables := range l.tables { - l.finalTables[source] = make(map[string]map[string]schemacmp.Table) for schema, tables := range schemaTables { - l.finalTables[source][schema] = make(map[string]schemacmp.Table) for table, ti := range tables { - l.finalTables[source][schema][table] = ti + if source == callerSource && schema == callerSchema && table == callerTable { + continue + } + + // judge joined no error + joined, err := postTable.Join(ti) + if err != nil { + continue + } + + // judge this normal table is smaller(same as allTableSmaller) + if _, err = joined.Compare(prevTable); err == nil { + continue + } + + // judge this normal table is larger(same as allTableLarger) + if joined, err = prevTable.Join(ti); err != nil { + joined = ti + } + if cmp, err := joined.Compare(postTable); err != nil || cmp < 0 { + continue + } + + return true } } } - // update for current conflict DDL - l.addConflictTable(source, schema, table, postTable) - l.finalTables[source][schema][table] = postTable - - return l.noConflictForFinalTables() + return false } // judge whether all conflict tables has no conflict. @@ -1202,19 +1199,3 @@ func (l *Lock) redirectForConflictTables(callerSource, callerSchema, callerTable } return nil } - -// multipleConflictTables check whether a lock has multiple conflict tables. -func (l *Lock) multipleConflictTables() bool { - cnt := 0 - for _, schemaTables := range l.conflictTables { - for _, tables := range schemaTables { - for range tables { - cnt++ - if cnt > 1 { - return true - } - } - } - } - return false -} diff --git a/dm/pkg/shardddl/optimism/lock_test.go b/dm/pkg/shardddl/optimism/lock_test.go index bdea3ae752c..b47ad7002a3 100644 --- a/dm/pkg/shardddl/optimism/lock_test.go +++ b/dm/pkg/shardddl/optimism/lock_test.go @@ -2583,7 +2583,7 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) { // tb1: rename id to a l.addConflictTable(source, schema, table1, t9) l.finalTables[source][schema][table1] = t9 - c.Assert(l.noConflictWithNormalTables(source, schema, table1, t1), IsFalse) + c.Assert(l.noConflictWithOneNormalTable(source, schema, table1, t1, t9), IsFalse) c.Assert(l.allConflictTableSmaller(), IsTrue) c.Assert(l.allConflictTableLarger(), IsTrue) c.Assert(l.allFinalTableSmaller(), IsFalse) @@ -2592,13 +2592,13 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) { l.tables[source][schema][table2] = t0 l.addConflictTable(source, schema, table2, t1) l.finalTables[source][schema][table2] = t1 - c.Assert(l.noConflictWithNormalTables(source, schema, table2, t1), IsTrue) + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue) l.removeConflictTable(source, schema, table2) l.tables[source][schema][table2] = t1 // tb2: rename id to a l.addConflictTable(source, schema, table2, t9) l.finalTables[source][schema][table2] = t9 - c.Assert(l.noConflictWithNormalTables(source, schema, table2, t9), IsFalse) + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t1, t9), IsFalse) c.Assert(l.allConflictTableSmaller(), IsTrue) c.Assert(l.allConflictTableLarger(), IsTrue) c.Assert(l.allFinalTableSmaller(), IsTrue) @@ -2614,6 +2614,58 @@ func (t *testLock) TestAllTableSmallerLarger(c *C) { c.Assert(l.tables[source][schema][table2], DeepEquals, t0) } +func (t *testLock) TestNoConflictWithOneNormalTable(c *C) { + var ( + source = "source" + schema = "schema" + table1 = "table1" + table2 = "table2" + p = parser.New() + se = mock.NewContext() + tblID int64 = 111 + ti0 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, col int)`) + ti1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, new_col int)`) + ti2 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, col varchar(4))`) + ti3 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, a int, new_col2 int)`) + ti4 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, b int, new_col int)`) + t0 = schemacmp.Encode(ti0) + t1 = schemacmp.Encode(ti1) + t2 = schemacmp.Encode(ti2) + t3 = schemacmp.Encode(ti3) + t4 = schemacmp.Encode(ti4) + ) + l := &Lock{ + tables: map[string]map[string]map[string]schemacmp.Table{ + source: { + schema: {table1: t0, table2: t0}, + }, + }, + } + + // table1 nothing happened. + // table2 rename column + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsFalse) + + // mock table1 rename column already + l.tables[source][schema][table1] = t1 + // table2 rename column + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue) + // table2 modify column + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t2), IsFalse) + // table2 different rename + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t3), IsFalse) + + // mock table1 rename another column already + l.tables[source][schema][table1] = t4 + // same results + // table2 rename column + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t1), IsTrue) + // table2 modify column + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t2), IsFalse) + // table2 different rename + c.Assert(l.noConflictWithOneNormalTable(source, schema, table2, t0, t3), IsFalse) +} + func checkRedirectOp(c *C, task, source, schema, table string) bool { ops, _, err := GetAllOperations(etcdTestCli) c.Assert(err, IsNil)