diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index c2cecc367b4..205d363e438 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -223,24 +223,7 @@ func (m *ddlManager) tick( } for _, event := range events { - // TODO: find a better place to do this check - // check if the ddl event is belong to an ineligible table. - // If so, we should ignore it. - if !filter.IsSchemaDDL(event.Type) { - ignore, err := m.schema. - IsIneligibleTable(ctx, event.TableInfo.TableName.TableID, event.CommitTs) - if err != nil { - return nil, nil, errors.Trace(err) - } - if ignore { - log.Warn("ignore the DDL event of ineligible table", - zap.String("changefeed", m.changfeedID.ID), zap.Any("ddl", event)) - continue - } - } - tableName := event.TableInfo.TableName - // Add all valid DDL events to the pendingDDLs. m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event) } diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 5cac79a8633..009256bcf7b 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -16,6 +16,7 @@ package puller import ( "context" "encoding/json" + "fmt" "sync" "sync/atomic" "time" @@ -26,6 +27,7 @@ import ( timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/kv/sharedconn" "github.com/pingcap/tiflow/cdc/model" @@ -204,8 +206,7 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model if job != nil { skip, err := p.handleJob(job) if err != nil { - return cerror.WrapError(cerror.ErrHandleDDLFailed, - err, job.Query, job.StartTS, job.StartTS) + return err } if skip { return nil @@ -353,7 +354,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) { return true, nil } - return true, errors.Trace(err) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } switch job.Type { @@ -369,7 +371,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { zap.Uint64("startTs", job.StartTS), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), zap.Error(err)) - return true, errors.Trace(err) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } case timodel.ActionRenameTable: oldTable, ok := snap.PhysicalTableByID(job.TableID) @@ -377,7 +380,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { // 1. If we can not find the old table, and the new table name is in filter rule, return error. discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) if !discard { - return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) + return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) } log.Warn("skip rename table ddl since cannot found the old table info", zap.String("namespace", p.changefeedID.Namespace), @@ -394,16 +397,16 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table) skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) if err != nil { - return true, errors.Trace(err) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } // 3. If its old table name is not in filter rule, and its new table name in filter rule, return error. if skipByOldTableName { if !skipByNewTableName { - return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) + return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) } return true, nil } - log.Info("ddl puller receive rename table ddl job", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), @@ -426,12 +429,52 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { err = p.schemaStorage.HandleDDLJob(job) if err != nil { - return true, errors.Trace(err) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } - p.setResolvedTs(job.BinlogInfo.FinishedTS) p.schemaVersion = job.BinlogInfo.SchemaVersion - return false, nil + + return p.checkIneligibleTableDDL(snap, job) +} + +// checkIneligibleTableDDL checks if the table is ineligible before and after the DDL. +// 1. If it is not a table DDL, we shouldn't check it. +// 2. If the table after the DDL is ineligible: +// a. If the table is not exist before the DDL, we should ignore the DDL. +// b. If the table is ineligible before the DDL, we should ignore the DDL. +// c. If the table is eligible before the DDL, we should return an error. +func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, job *timodel.Job) (skip bool, err error) { + if filter.IsSchemaDDL(job.Type) { + return false, nil + } + + ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID) + if !ineligible { + return false, nil + } + + // If the table is not in the snapshot before the DDL, + // we should ignore the DDL. + _, exist := snapBefore.PhysicalTableByID(job.TableID) + if !exist { + return true, nil + } + + // If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL. + // If so, we should return an error to inform the user that it is a + // dangerous operation and should be handled manually. + isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID) + if isBeforeineligible { + log.Warn("ignore the DDL event of ineligible table", + zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job)) + return true, nil + } + return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+ + "it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+ + "pelase pause the changefeed and update the `force-replicate=true` "+ + "in the changefeed configuration, "+ + "then resume the changefeed.", job.Query)) } // handleRenameTables gets all the tables that are renamed diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index 7eb766955db..2b08ddd3da8 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -166,23 +166,23 @@ func TestHandleRenameTable(t *testing.T) { inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t1(id int)") + job = helper.DDL2Job("create table test1.t1(id int primary key)") remainTables[0] = job.TableID inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t2(id int)") + job = helper.DDL2Job("create table test1.t2(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t3(id int)") + job = helper.DDL2Job("create table test1.t3(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t5(id int)") + job = helper.DDL2Job("create table test1.t5(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -192,7 +192,7 @@ func TestHandleRenameTable(t *testing.T) { inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table ignore1.a(id int)") + job = helper.DDL2Job("create table ignore1.a(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -207,7 +207,7 @@ func TestHandleRenameTable(t *testing.T) { } { - _ = helper.DDL2Job("create table test1.t6(id int)") + _ = helper.DDL2Job("create table test1.t6(id int primary key)") job := helper.DDL2Job("rename table test1.t2 to test1.t22, test1.t6 to test1.t66") skip, err := ddlJobPullerImpl.handleRenameTables(job) require.Error(t, err) @@ -224,17 +224,17 @@ func TestHandleRenameTable(t *testing.T) { inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test2.t1(id int)") + job = helper.DDL2Job("create table test2.t1(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test2.t2(id int)") + job = helper.DDL2Job("create table test2.t2(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test2.t3(id int)") + job = helper.DDL2Job("create table test2.t3(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -252,13 +252,13 @@ func TestHandleRenameTable(t *testing.T) { inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table Test3.t1(id int)") + job = helper.DDL2Job("create table Test3.t1(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // skip this table - job = helper.DDL2Job("create table Test3.t2(id int)") + job = helper.DDL2Job("create table Test3.t2(id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -274,34 +274,34 @@ func TestHandleRenameTable(t *testing.T) { // test rename table { - job := helper.DDL2Job("create table test1.t99 (id int)") + job := helper.DDL2Job("create table test1.t99 (id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // this ddl should be skipped - job = helper.DDL2Job("create table test1.t1000 (id int)") + job = helper.DDL2Job("create table test1.t1000 (id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // this ddl should be skipped - job = helper.DDL2Job("create table test1.t888 (id int)") + job = helper.DDL2Job("create table test1.t888 (id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t20230808 (id int)") + job = helper.DDL2Job("create table test1.t20230808 (id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t202308081 (id int)") + job = helper.DDL2Job("create table test1.t202308081 (id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t202308082 (id int)") + job = helper.DDL2Job("create table test1.t202308082 (id int primary key)") inputDDL(t, ddlJobPullerImpl, job) inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) @@ -313,9 +313,8 @@ func TestHandleRenameTable(t *testing.T) { // since test1.t100 is in filter rule, replicate it job = helper.DDL2Job("rename table test1.t1000 to test1.t100") - skip, err = ddlJobPullerImpl.handleJob(job) + _, err = ddlJobPullerImpl.handleJob(job) require.Error(t, err) - require.True(t, skip) require.Contains(t, err.Error(), fmt.Sprintf("table's old name is not in filter rule, and its new name in filter rule "+ "table id '%d', ddl query: [%s], it's an unexpected behavior, "+ "if you want to replicate this table, please add its old name to filter rule.", job.TableID, job.Query)) @@ -337,9 +336,8 @@ func TestHandleRenameTable(t *testing.T) { // but now it will throw an error since schema ignore1 are not in schemaStorage // ref: https://github.com/pingcap/tiflow/issues/9488 job = helper.DDL2Job("rename table test1.t202308081 to ignore1.ignore1, test1.t202308082 to ignore1.dongmen") - skip, err = ddlJobPullerImpl.handleJob(job) + _, err = ddlJobPullerImpl.handleJob(job) require.NotNil(t, err) - require.True(t, skip) require.Contains(t, err.Error(), "ErrSnapshotSchemaNotFound") } } @@ -398,7 +396,7 @@ func TestHandleJob(t *testing.T) { // test create table { - job := helper.DDL2Job("create table test1.t1(id int) partition by range(id) (partition p0 values less than (10))") + job := helper.DDL2Job("create table test1.t1(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err := ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) @@ -408,7 +406,7 @@ func TestHandleJob(t *testing.T) { require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.testStartTs(id int)") + job = helper.DDL2Job("create table test1.testStartTs(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) @@ -419,23 +417,23 @@ func TestHandleJob(t *testing.T) { require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.t2(id int)") + job = helper.DDL2Job("create table test1.t2(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.t3(id int)") + job = helper.DDL2Job("create table test1.t3(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) - job = helper.DDL2Job("create table test1.t4(id int) partition by range(id) (partition p0 values less than (10))") + job = helper.DDL2Job("create table test1.t4(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) // make sure no schema not found error - job = helper.DDL2Job("create table test3.t1(id int) partition by range(id) (partition p0 values less than (10))") + job = helper.DDL2Job("create table test3.t1(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) @@ -598,7 +596,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 5, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 2, FinishedTS: 18}, - Query: "create table test.t1(id int)", + Query: "create table test.t1(id int primary key)", }) inputDDL(t, ddlJobPullerImpl, &timodel.Job{ ID: 1, @@ -606,7 +604,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 5, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, FinishedTS: 16}, - Query: "create table t2(id int)", + Query: "create table t2(id int primary key)", }) resolvedTs, ddl = p.PopFrontDDL() require.Equal(t, resolvedTs, uint64(15)) @@ -631,7 +629,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 4, FinishedTS: 25}, - Query: "create table t3(id int)", + Query: "create table t3(id int primary key)", }) inputDDL(t, ddlJobPullerImpl, &timodel.Job{ @@ -640,7 +638,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 4, FinishedTS: 25}, - Query: "create table t3(id int)", + Query: "create table t3(id int primary key)", }) inputTs(t, ddlJobPullerImpl, 30) @@ -663,7 +661,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateCancelled, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 6, FinishedTS: 36}, - Query: "create table t4(id int)", + Query: "create table t4(id int primary key)", }) inputTs(t, ddlJobPullerImpl, 40) @@ -742,3 +740,64 @@ func waitResolvedTsGrowing(t *testing.T, p DDLPuller, targetTs model.Ts) { }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(200)) require.Nil(t, err) } + +func TestCcheckIneligibleTableDDL(t *testing.T) { + ddlJobPuller, helper := newMockDDLJobPuller(t, true) + defer helper.Close() + + startTs := uint64(10) + ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl) + ddlJobPullerImpl.setResolvedTs(startTs) + + cfg := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(cfg, "") + require.NoError(t, err) + ddlJobPullerImpl.filter = f + + ddl := helper.DDL2Job("CREATE DATABASE test1") + skip, err := ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + // case 1: create a table only has a primary key and drop it, expect an error. + // It is because the table is not eligible after the drop primary key DDL. + ddl = helper.DDL2Job(`CREATE TABLE test1.t1 ( + id INT PRIMARY KEY /*T![clustered_index] NONCLUSTERED */, + name VARCHAR(255), + email VARCHAR(255) UNIQUE + );`) + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + ddl = helper.DDL2Job("ALTER TABLE test1.t1 DROP PRIMARY KEY;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.Error(t, err) + require.False(t, skip) + require.Contains(t, err.Error(), "An eligible table become ineligible after DDL") + + // case 2: create a table has a primary key and another not null unique key, + // and drop the primary key, expect no error. + // It is because the table is still eligible after the drop primary key DDL. + ddl = helper.DDL2Job(`CREATE TABLE test1.t2 ( + id INT PRIMARY KEY /*T![clustered_index] NONCLUSTERED */, + name VARCHAR(255), + email VARCHAR(255) NOT NULL UNIQUE + );`) + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + ddl = helper.DDL2Job("ALTER TABLE test1.t2 DROP PRIMARY KEY;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + // case 3: continue to drop the unique key, expect an error. + // It is because the table is not eligible after the drop unique key DDL. + ddl = helper.DDL2Job("ALTER TABLE test1.t2 DROP INDEX email;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.Error(t, err) + require.False(t, skip) + require.Contains(t, err.Error(), "An eligible table become ineligible after DDL") +}