Skip to content

Commit

Permalink
shardddl/optimism: add synced as struct member (pingcap#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jul 24, 2020
1 parent a90c5f1 commit e6ae03a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Lock struct {
// upstream source ID -> upstream schema name -> upstream table name -> table info.
// if all of them are the same, then we call the lock `synced`.
tables map[string]map[string]map[string]schemacmp.Table
synced bool

// whether DDLs operations have done (execute the shard DDL) to the downstream.
// if all of them have done and have the same schema, then we call the lock `resolved`.
Expand All @@ -62,6 +63,7 @@ func NewLock(ID, task, downSchema, downTable string, ti *model.TableInfo, tts []
joined: schemacmp.Encode(ti),
tables: make(map[string]map[string]map[string]schemacmp.Table),
done: make(map[string]map[string]map[string]bool),
synced: true,
}
l.addTables(tts)
return l
Expand Down Expand Up @@ -116,6 +118,11 @@ func (l *Lock) TrySync(callerSource, callerSchema, callerTable string,
log.L().Info("update table info", zap.String("lock", l.ID), zap.String("source", callerSource), zap.String("schema", callerSchema), zap.String("table", callerTable),
zap.Stringer("from", oldTable), zap.Stringer("to", newTable), zap.Strings("ddls", ddls))

defer func() {
_, remain := l.syncStatus()
l.synced = remain == 0
}()

// special case: if the DDL does not affect the schema at all, assume it is
// idempotent and just execute the DDL directly.
// if any real conflicts after joined exist, they will be detected by the following steps.
Expand Down Expand Up @@ -229,6 +236,8 @@ func (l *Lock) TryRemoveTable(source, schema, table string) bool {
}

delete(l.tables[source][schema], table)
_, remain := l.syncStatus()
l.synced = remain == 0
delete(l.done[source][schema], table)
log.L().Info("table removed from the lock", zap.String("lock", l.ID),
zap.String("source", source), zap.String("schema", schema), zap.String("table", table),
Expand Down
7 changes: 7 additions & 0 deletions pkg/shardddl/optimism/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) {
synced, remain := l.IsSynced()
c.Assert(synced, Equals, syncedCount == tableCount)
c.Assert(remain, Equals, tableCount-syncedCount)
c.Assert(synced, Equals, l.synced)
}
}
}
Expand Down Expand Up @@ -137,6 +138,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) {
synced, remain := l.IsSynced()
c.Assert(synced, IsFalse)
c.Assert(remain, Equals, tableCount-1)
c.Assert(synced, Equals, l.synced)
cmp, err := l.tables[sources[0]][dbs[0]][tbls[0]].Compare(l.tables[sources[0]][dbs[0]][tbls[1]])
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 1)
Expand All @@ -157,6 +159,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) {
synced, remain = l.IsSynced()
c.Assert(synced, IsFalse)
c.Assert(remain, Equals, tableCount-2)
c.Assert(synced, Equals, l.synced)
cmp, err = l.tables[sources[0]][dbs[0]][tbls[0]].Compare(l.tables[sources[0]][dbs[0]][tbls[1]])
c.Assert(err, IsNil)
c.Assert(cmp, Equals, 0)
Expand Down Expand Up @@ -192,6 +195,7 @@ func (t *testLock) TestLockTrySyncNormal(c *C) {
DDLs, err = l.TrySync(source, db, tbl, DDLs3, ti3, tts)
c.Assert(err, IsNil)
synced, remain = l.IsSynced()
c.Assert(synced, Equals, l.synced)
if syncedCount == tableCount {
c.Assert(DDLs, DeepEquals, DDLs3)
c.Assert(synced, IsTrue)
Expand Down Expand Up @@ -315,6 +319,7 @@ func (t *testLock) TestLockTrySyncIndex(c *C) {
c.Assert(err, IsNil)
c.Assert(DDLs, DeepEquals, DDLs1)
synced, remain := l.IsSynced()
c.Assert(synced, Equals, l.synced)
c.Assert(synced, IsFalse)
c.Assert(remain, Equals, 1)

Expand All @@ -330,6 +335,7 @@ func (t *testLock) TestLockTrySyncIndex(c *C) {
c.Assert(err, IsNil)
c.Assert(DDLs, DeepEquals, []string{}) // no DDLs returned
synced, remain = l.IsSynced()
c.Assert(synced, Equals, l.synced)
c.Assert(synced, IsFalse)
c.Assert(remain, Equals, 1)

Expand Down Expand Up @@ -1088,6 +1094,7 @@ func (t *testLock) trySyncForAllTablesLarger(c *C, l *Lock,

func (t *testLock) checkLockSynced(c *C, l *Lock) {
synced, remain := l.IsSynced()
c.Assert(synced, Equals, l.synced)
c.Assert(synced, IsTrue)
c.Assert(remain, Equals, 0)

Expand Down

0 comments on commit e6ae03a

Please sign in to comment.