Skip to content

Commit

Permalink
ignore ddl in ddl_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jan 22, 2024
1 parent b1c22cf commit 05666a5
Show file tree
Hide file tree
Showing 16 changed files with 382 additions and 234 deletions.
8 changes: 2 additions & 6 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,9 @@ func (s *schemaStorage) filterDDLEvents(ddlEvents []*model.DDLEvent) ([]*model.D
table = event.PreTableInfo.TableName.Table
}

ignored, err := s.filter.ShouldDiscardDDL(event.StartTs, event.Type, schemaName, table, event.Query)
if err != nil {
return nil, errors.Trace(err)
}
if ignored {
if s.filter.ShouldDiscardDDL(event.StartTs, event.Type, schemaName, table) {
log.Error(
"ignored DDL event should not be sent to owner"+
"discarded DDL event should not be sent to owner"+
"please report a bug to TiCDC if you see this log"+
"but it is no harm to your replication",
zap.String("namespace", s.id.Namespace),
Expand Down
7 changes: 4 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,13 +597,13 @@ LOOP2:
}
c.barriers.Update(finishBarrier, c.latestInfo.GetTargetTs())

f, err := filter.NewFilter(c.latestInfo.Config, "")
filter, err := filter.NewFilter(c.latestInfo.Config, "")
if err != nil {
return errors.Trace(err)
}
c.schema, err = entry.NewSchemaStorage(
c.upstream.KVStorage, ddlStartTs,
c.latestInfo.Config.ForceReplicate, c.id, util.RoleOwner, f)
c.latestInfo.Config.ForceReplicate, c.id, util.RoleOwner, filter)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -630,7 +630,7 @@ LOOP2:
})
c.ddlSink.run(cancelCtx)

c.ddlPuller = c.newDDLPuller(cancelCtx, c.upstream, ddlStartTs, c.id, c.schema, f)
c.ddlPuller = c.newDDLPuller(cancelCtx, c.upstream, ddlStartTs, c.id, c.schema, filter)
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -669,6 +669,7 @@ LOOP2:
ddlStartTs,
c.latestStatus.CheckpointTs,
c.ddlSink,
filter,
c.ddlPuller,
c.schema,
c.redoDDLMgr,
Expand Down
90 changes: 52 additions & 38 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type ddlManager struct {
redoMetaManager redo.MetaManager
// ddlSink is used to ddlSink DDL events to the downstream
ddlSink DDLSink
filter filter.Filter

// pendingDDLs store the pending DDL events of all tables
// the DDL events in the same table are ordered by commitTs.
Expand All @@ -136,6 +137,7 @@ func newDDLManager(
startTs model.Ts,
checkpointTs model.Ts,
ddlSink DDLSink,
filter filter.Filter,
ddlPuller puller.DDLPuller,
schema entry.SchemaStorage,
redoManager redo.DDLManager,
Expand All @@ -152,6 +154,7 @@ func newDDLManager(
return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
filter: filter,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
Expand Down Expand Up @@ -208,7 +211,7 @@ func (m *ddlManager) tick(

log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.String("changefeed", m.changfeedID.ID),
zap.Int64("tableID", job.TableID),
zap.Int64("jobID", job.ID),
zap.String("query", job.Query),
Expand Down Expand Up @@ -244,8 +247,14 @@ func (m *ddlManager) tick(
// Send DDL events to redo log.
if m.redoDDLManager.Enabled() {
for _, event := range events {
err := m.redoDDLManager.EmitDDLEvent(ctx, event)
skip, _, err := m.shouldSkipDDL(event)
if err != nil {
return nil, nil, errors.Trace(err)
}
if skip {
continue
}
if err := m.redoDDLManager.EmitDDLEvent(ctx, event); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -327,29 +336,34 @@ func (m *ddlManager) shouldExecDDL(nextDDL *model.DDLEvent) bool {
return checkpointReachBarrier && redoCheckpointReachBarrier && redoDDLResolvedTsExceedBarrier
}

func (m *ddlManager) shouldSkipDDL(ddl *model.DDLEvent) (bool, string, error) {
ignored, err := m.filter.ShouldIgnoreDDLEvent(ddl)
if err != nil {
return false, "", errors.Trace(err)
}
if ignored {
return true, "ddl is ignored by event filter rule, skip it", nil
}

// In a BDR mode cluster, TiCDC can receive DDLs from all roles of TiDB.
// However, CDC only executes the DDLs from the TiDB that has BDRRolePrimary role.
if m.BDRMode && ddl.BDRRole != string(ast.BDRRolePrimary) {
return true, "changefeed is in BDRMode and the DDL is not executed by Primary Cluster, skip it", nil
}
return false, "", nil
}

// executeDDL executes ddlManager.executingDDL.
func (m *ddlManager) executeDDL(ctx context.Context) error {
if m.executingDDL == nil {
return nil
}

// In a BDR mode cluster, TiCDC can receive DDLs from all roles of TiDB.
// However, CDC only executes the DDLs from the TiDB that has BDRRolePrimary role.
if m.BDRMode && m.executingDDL.BDRRole != string(ast.BDRRolePrimary) {
log.Info("changefeed is in BDRMode and "+
"the DDL is not executed by Primary Cluster, skip it",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.Any("ddlEvent", m.executingDDL),
zap.String("bdrRole", m.executingDDL.BDRRole))
tableName := m.executingDDL.TableInfo.TableName
// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
m.pendingDDLs[tableName] = m.pendingDDLs[tableName][1:]
m.schema.DoGC(m.executingDDL.CommitTs - 1)
m.justSentDDL = m.executingDDL
m.executingDDL = nil
m.cleanCache()
skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL)
if err != nil {
return errors.Trace(err)
}
if skip {
m.cleanCache(cleanMsg)
return nil
}

Expand All @@ -371,23 +385,10 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {

done, err := m.ddlSink.emitDDLEvent(ctx, m.executingDDL)
if err != nil {
return err
return errors.Trace(err)
}
if done {
tableName := m.executingDDL.TableInfo.TableName
log.Info("execute a ddl event successfully",
zap.String("ddl", m.executingDDL.Query),
zap.String("namespace", m.executingDDL.BDRRole),
zap.Uint64("commitTs", m.executingDDL.CommitTs),
zap.Stringer("table", tableName),
)
// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
m.pendingDDLs[tableName] = m.pendingDDLs[tableName][1:]
m.schema.DoGC(m.executingDDL.CommitTs - 1)
m.justSentDDL = m.executingDDL
m.executingDDL = nil
m.cleanCache()
m.cleanCache("execute a ddl event successfully")
}
return nil
}
Expand Down Expand Up @@ -555,9 +556,22 @@ func (m *ddlManager) getSnapshotTs() (ts uint64) {
}

// cleanCache cleans the tableInfoCache and physicalTablesCache.
// It should be called after a DDL is applied to schema or a DDL
// is sent to downstream successfully.
func (m *ddlManager) cleanCache() {
// It should be called after a DDL is skipped or sent to downstream successfully.
func (m *ddlManager) cleanCache(msg string) {
tableName := m.executingDDL.TableInfo.TableName
log.Info(msg, zap.String("ddl", m.executingDDL.Query),
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.String("bdrRole", m.executingDDL.BDRRole),
zap.Any("ddlEvent", m.executingDDL))

// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
m.pendingDDLs[tableName] = m.pendingDDLs[tableName][1:]
m.schema.DoGC(m.executingDDL.CommitTs - 1)
m.justSentDDL = m.executingDDL
m.executingDDL = nil

m.tableInfoCache = nil
m.physicalTablesCache = nil
}
Expand Down
1 change: 1 addition & 0 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func createDDLManagerForTest(t *testing.T) *ddlManager {
startTs,
checkpointTs,
ddlSink,
f,
ddlPuller,
schema,
redo.NewDisabledDDLManager(),
Expand Down
42 changes: 9 additions & 33 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,23 +299,17 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
if !ok {
shouldDiscardOldTable = true
} else {
shouldDiscardOldTable, err = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldSchemaNames[i].O, oldTable.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldSchemaNames[i].O, oldTable.Name.O)
}

newSchemaName, ok := snap.SchemaByID(newSchemaIDs[i])
if !ok {
// the new table name does not hit the filter rule, so we should discard the table.
shouldDiscardNewTable = true
} else {
shouldDiscardNewTable, err = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, newSchemaName.Name.O, newTableNames[i].O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, newSchemaName.Name.O, newTableNames[i].O)
}

if shouldDiscardOldTable && shouldDiscardNewTable {
Expand Down Expand Up @@ -437,12 +431,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Error(err))
discard, fErr := p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query)
if fErr != nil {
return false, errors.Trace(fErr)
}
if discard {
if p.filter.ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName) {
return true, nil
}
return true, errors.Trace(err)
Expand All @@ -467,11 +456,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
discard, err := p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
discard := p.filter.ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
if !discard {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
Expand All @@ -487,13 +472,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
}
// since we can find the old table, it must be able to find the old schema.
// 2. If we can find the preTableInfo, we filter it by the old table name.
skipByOldTableName, err := p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.Query)
if err != nil {
return true, errors.Trace(err)
}
skipByNewTableName, err := p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query)
skipByOldTableName := p.filter.ShouldDiscardDDL(job.StartTS, job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
if err != nil {
return true, errors.Trace(err)
}
Expand All @@ -518,11 +498,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
if job.BinlogInfo.TableInfo != nil {
job.TableName = job.BinlogInfo.TableInfo.Name.O
}
skip, err = p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query)
if err != nil {
return false, errors.Trace(err)
}
skip = p.filter.ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName)
}

if skip {
Expand Down
25 changes: 20 additions & 5 deletions cmd/filter-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"

timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -81,17 +82,31 @@ func runFilter(cmd *cobra.Command, args []string) {
case "ddl":
startTs := uint64(0)
ddlType := timodel.ActionCreateTable
discard, err := ft.ShouldDiscardDDL(startTs,
discard := ft.ShouldDiscardDDL(startTs,
ddlType,
tableAndSchema[0],
tableAndSchema[1],
ddl)
tableAndSchema[1])
if discard {
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
return
}
ignored, err := ft.ShouldIgnoreDDLEvent(&model.DDLEvent{
StartTs: startTs,
Query: ddl,
Type: ddlType,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: tableAndSchema[0],
Table: tableAndSchema[1],
},
},
})
if err != nil {
fmt.Printf("filter ddl error: %s, error: %v\n", ddl, err)
return
}
if discard {
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
if ignored {
fmt.Printf("DDL: %s, should be ignored by event filter rule\n", ddl)
return
}
fmt.Printf("DDL: %s, should not be discard by event filter rule\n", ddl)
Expand Down
34 changes: 16 additions & 18 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ var allowDDLList = []timodel.ActionType{
type Filter interface {
// ShouldIgnoreDMLEvent returns true and nil if the DML event should be ignored.
ShouldIgnoreDMLEvent(dml *model.RowChangedEvent, rawRow model.RowChangedDatums, tableInfo *model.TableInfo) (bool, error)
// ShouldIgnoreDDLEvent returns true if the DDL event should be ignored.
// If a ddl is ignored, it will be applied to cdc's schema storage,
// but will not be sent to downstream.
ShouldIgnoreDDLEvent(ddl *model.DDLEvent) (bool, error)
// ShouldDiscardDDL returns true if this DDL should be discarded.
// If a ddl is discarded, it will neither be applied to cdc's schema storage
// nor sent to downstream.
ShouldDiscardDDL(startTs uint64, ddlType timodel.ActionType, schema, table, query string) (bool, error)
ShouldDiscardDDL(startTs uint64, ddlType timodel.ActionType, schema, table string) bool
// ShouldIgnoreTable returns true if the table should be ignored.
ShouldIgnoreTable(schema, table string) bool
// ShouldIgnoreSchema returns true if the schema should be ignored.
Expand Down Expand Up @@ -149,28 +153,22 @@ func (f *filter) ShouldIgnoreDMLEvent(
// ShouldDiscardDDL returns true if this DDL should be discarded.
// If a ddl is discarded, it will not be applied to cdc's schema storage
// and sent to downstream.
func (f *filter) ShouldDiscardDDL(startTs uint64, ddlType timodel.ActionType, schema, table, query string) (discard bool, err error) {
discard = !isAllowedDDL(ddlType)
if discard {
return
}

discard = f.shouldIgnoreStartTs(startTs)
if discard {
return
func (f *filter) ShouldDiscardDDL(startTs uint64, ddlType timodel.ActionType, schema, table string) bool {
if !isAllowedDDL(ddlType) || f.shouldIgnoreStartTs(startTs) {
return true
}

if IsSchemaDDL(ddlType) {
discard = !f.tableFilter.MatchSchema(schema)
} else {
discard = f.ShouldIgnoreTable(schema, table)
}

if discard {
return
return f.ShouldIgnoreSchema(schema)
}
return f.ShouldIgnoreTable(schema, table)
}

return f.sqlEventFilter.shouldSkipDDL(ddlType, schema, table, query)
// ShouldIgnoreDDLEvent returns true if the DDL event should be ignored.
// If a ddl is ignored, it will be applied to cdc's schema storage,
// but will not be sent to downstream.
func (f *filter) ShouldIgnoreDDLEvent(ddl *model.DDLEvent) (discard bool, err error) {
return f.sqlEventFilter.shouldSkipDDL(ddl)
}

// ShouldIgnoreTable returns true if the specified table should be ignored by this changefeed.
Expand Down
Loading

0 comments on commit 05666a5

Please sign in to comment.