diff --git a/drainer/syncer.go b/drainer/syncer.go index 423b78b98..d6804b316 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -465,6 +465,12 @@ ForLoop: schema, table string ) sql := b.job.Query + + if b.job.BinlogInfo.SchemaVersion == 0 { + log.Info("skip ddl due to the failed ddl", zap.String("sql", sql), zap.Int64("commit ts", commitTS)) + appendFakeBinlogIfNeeded(nil, commitTS) + continue + } schema, table, err = s.schema.getSchemaTableAndDelete(b.job.BinlogInfo.SchemaVersion) if err != nil { err = errors.Trace(err) @@ -504,8 +510,7 @@ ForLoop: } } } else { - log.Info("skip ddl by SyncDDL setting to false", zap.String("schema", schema), zap.String("table", table), - zap.String("sql", sql), zap.Int64("commit ts", commitTS)) + log.Info("skip ddl by SyncDDL setting to false", zap.String("sql", sql), zap.Int64("commit ts", commitTS)) // A empty sql force it to evict the downstream table info. if s.cfg.DestDBType == "tidb" || s.cfg.DestDBType == "mysql" || s.cfg.DestDBType == "oracle" { shouldSkip = true diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index d5233fc86..5a4309387 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -168,6 +168,34 @@ func (s *syncerSuite) TestNewSyncer(c *check.C) { job: job, }) + // Add failed ddl + commitTS++ + jobID++ + binlog = &pb.Binlog{ + Tp: pb.BinlogType_Commit, + CommitTs: commitTS, + DdlQuery: []byte("alter table test.test add column a int"), + DdlJobId: jobID, + } + job = &model.Job{ + ID: jobID, + SchemaID: 1, // must be the previous create schema id of `test` + Type: model.ActionAddColumn, + State: model.JobStateSynced, + Query: "create table test.test(id int)", + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 0, + TableInfo: &model.TableInfo{ + ID: testTableID, + Name: model.CIStr{O: "test", L: "test"}, + }, + }, + } + syncer.Add(&binlogItem{ + binlog: binlog, + job: job, + }) + // Add dml commitTS++ binlog = &pb.Binlog{