Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/filter(ticdc): ignore ddl in ddl_manager #10518

Merged
merged 8 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,9 @@ func (s *schemaStorage) filterDDLEvents(ddlEvents []*model.DDLEvent) ([]*model.D
table = event.PreTableInfo.TableName.Table
}

ignored, err := s.filter.ShouldDiscardDDL(event.StartTs, event.Type, schemaName, table, event.Query)
if err != nil {
return nil, errors.Trace(err)
}
if ignored {
if s.filter.ShouldDiscardDDL(event.Type, schemaName, table) {
log.Error(
"ignored DDL event should not be sent to owner"+
"discarded DDL event should not be sent to owner"+
"please report a bug to TiCDC if you see this log"+
"but it is no harm to your replication",
zap.String("namespace", s.id.Namespace),
Expand Down
13 changes: 7 additions & 6 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/pdutil"
redoCfg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/sink/observer"
Expand Down Expand Up @@ -151,7 +151,7 @@ type changefeed struct {
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
filter pfilter.Filter,
) puller.DDLPuller

newSink func(
Expand Down Expand Up @@ -223,7 +223,7 @@ func newChangefeed4Test(
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
filter pfilter.Filter,
) puller.DDLPuller,
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
Expand Down Expand Up @@ -597,13 +597,13 @@ LOOP2:
}
c.barriers.Update(finishBarrier, c.latestInfo.GetTargetTs())

f, err := filter.NewFilter(c.latestInfo.Config, "")
filter, err := pfilter.NewFilter(c.latestInfo.Config, "")
if err != nil {
return errors.Trace(err)
}
c.schema, err = entry.NewSchemaStorage(
c.upstream.KVStorage, ddlStartTs,
c.latestInfo.Config.ForceReplicate, c.id, util.RoleOwner, f)
c.latestInfo.Config.ForceReplicate, c.id, util.RoleOwner, filter)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -630,7 +630,7 @@ LOOP2:
})
c.ddlSink.run(cancelCtx)

c.ddlPuller = c.newDDLPuller(cancelCtx, c.upstream, ddlStartTs, c.id, c.schema, f)
c.ddlPuller = c.newDDLPuller(cancelCtx, c.upstream, ddlStartTs, c.id, c.schema, filter)
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -669,6 +669,7 @@ LOOP2:
ddlStartTs,
c.latestStatus.CheckpointTs,
c.ddlSink,
filter,
c.ddlPuller,
c.schema,
c.redoDDLMgr,
Expand Down
90 changes: 52 additions & 38 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type ddlManager struct {
redoMetaManager redo.MetaManager
// ddlSink is used to ddlSink DDL events to the downstream
ddlSink DDLSink
filter filter.Filter

// pendingDDLs store the pending DDL events of all tables
// the DDL events in the same table are ordered by commitTs.
Expand All @@ -136,6 +137,7 @@ func newDDLManager(
startTs model.Ts,
checkpointTs model.Ts,
ddlSink DDLSink,
filter filter.Filter,
ddlPuller puller.DDLPuller,
schema entry.SchemaStorage,
redoManager redo.DDLManager,
Expand All @@ -152,6 +154,7 @@ func newDDLManager(
return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
filter: filter,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
Expand Down Expand Up @@ -208,7 +211,7 @@ func (m *ddlManager) tick(

log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.String("changefeed", m.changfeedID.ID),
zap.Int64("tableID", job.TableID),
zap.Int64("jobID", job.ID),
zap.String("query", job.Query),
Expand Down Expand Up @@ -244,8 +247,14 @@ func (m *ddlManager) tick(
// Send DDL events to redo log.
if m.redoDDLManager.Enabled() {
for _, event := range events {
err := m.redoDDLManager.EmitDDLEvent(ctx, event)
skip, _, err := m.shouldSkipDDL(event)
if err != nil {
return nil, nil, errors.Trace(err)
}
if skip {
continue
}
if err := m.redoDDLManager.EmitDDLEvent(ctx, event); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -327,29 +336,34 @@ func (m *ddlManager) shouldExecDDL(nextDDL *model.DDLEvent) bool {
return checkpointReachBarrier && redoCheckpointReachBarrier && redoDDLResolvedTsExceedBarrier
}

func (m *ddlManager) shouldSkipDDL(ddl *model.DDLEvent) (bool, string, error) {
ignored, err := m.filter.ShouldIgnoreDDLEvent(ddl)
if err != nil {
return false, "", errors.Trace(err)
}
if ignored {
return true, "ddl is ignored by event filter rule, skip it", nil
}

// In a BDR mode cluster, TiCDC can receive DDLs from all roles of TiDB.
// However, CDC only executes the DDLs from the TiDB that has BDRRolePrimary role.
if m.BDRMode && ddl.BDRRole != string(ast.BDRRolePrimary) {
return true, "changefeed is in BDRMode and the DDL is not executed by Primary Cluster, skip it", nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second returned value looks used to print log, can we just print the log here, instead of return it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If redo is enabled, shouldSkipDDL will be called twice, so we use the return value to avoid printing duplicate logs.

}
return false, "", nil
}

// executeDDL executes ddlManager.executingDDL.
func (m *ddlManager) executeDDL(ctx context.Context) error {
if m.executingDDL == nil {
return nil
}

// In a BDR mode cluster, TiCDC can receive DDLs from all roles of TiDB.
// However, CDC only executes the DDLs from the TiDB that has BDRRolePrimary role.
if m.BDRMode && m.executingDDL.BDRRole != string(ast.BDRRolePrimary) {
log.Info("changefeed is in BDRMode and "+
"the DDL is not executed by Primary Cluster, skip it",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
zap.Any("ddlEvent", m.executingDDL),
zap.String("bdrRole", m.executingDDL.BDRRole))
tableName := m.executingDDL.TableInfo.TableName
// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
m.pendingDDLs[tableName] = m.pendingDDLs[tableName][1:]
m.schema.DoGC(m.executingDDL.CommitTs - 1)
m.justSentDDL = m.executingDDL
m.executingDDL = nil
m.cleanCache()
skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we decide whether to skip this ddl here, but not in where we append ddls into pendingDDLs in "ddlManager.tick"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because all ddls must be cached in pengingDDLs until m.checkpointTs == nextDDL.CommitTs. Otherwise, the changefeed's checkpoint cannot be guaranteed to be correct.

if err != nil {
return errors.Trace(err)
}
if skip {
m.cleanCache(cleanMsg)
return nil
}

Expand All @@ -371,23 +385,10 @@ func (m *ddlManager) executeDDL(ctx context.Context) error {

done, err := m.ddlSink.emitDDLEvent(ctx, m.executingDDL)
if err != nil {
return err
return errors.Trace(err)
}
if done {
tableName := m.executingDDL.TableInfo.TableName
log.Info("execute a ddl event successfully",
zap.String("ddl", m.executingDDL.Query),
zap.String("namespace", m.executingDDL.BDRRole),
zap.Uint64("commitTs", m.executingDDL.CommitTs),
zap.Stringer("table", tableName),
)
// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
m.pendingDDLs[tableName] = m.pendingDDLs[tableName][1:]
m.schema.DoGC(m.executingDDL.CommitTs - 1)
m.justSentDDL = m.executingDDL
m.executingDDL = nil
m.cleanCache()
m.cleanCache("execute a ddl event successfully")
}
return nil
}
Expand Down Expand Up @@ -555,9 +556,22 @@ func (m *ddlManager) getSnapshotTs() (ts uint64) {
}

// cleanCache cleans the tableInfoCache and physicalTablesCache.
// It should be called after a DDL is applied to schema or a DDL
// is sent to downstream successfully.
func (m *ddlManager) cleanCache() {
// It should be called after a DDL is skipped or sent to downstream successfully.
func (m *ddlManager) cleanCache(msg string) {
tableName := m.executingDDL.TableInfo.TableName
log.Info(msg, zap.String("ddl", m.executingDDL.Query),
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.String("bdrRole", m.executingDDL.BDRRole),
zap.Any("ddlEvent", m.executingDDL))

// Set it to nil first to accelerate GC.
m.pendingDDLs[tableName][0] = nil
m.pendingDDLs[tableName] = m.pendingDDLs[tableName][1:]
m.schema.DoGC(m.executingDDL.CommitTs - 1)
m.justSentDDL = m.executingDDL
m.executingDDL = nil

m.tableInfoCache = nil
m.physicalTablesCache = nil
}
Expand Down
1 change: 1 addition & 0 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func createDDLManagerForTest(t *testing.T) *ddlManager {
startTs,
checkpointTs,
ddlSink,
f,
ddlPuller,
schema,
redo.NewDisabledDDLManager(),
Expand Down
40 changes: 7 additions & 33 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,23 +299,15 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
if !ok {
shouldDiscardOldTable = true
} else {
shouldDiscardOldTable, err = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldSchemaNames[i].O, oldTable.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, oldTable.Name.O)
}

newSchemaName, ok := snap.SchemaByID(newSchemaIDs[i])
if !ok {
// the new table name does not hit the filter rule, so we should discard the table.
shouldDiscardNewTable = true
} else {
shouldDiscardNewTable, err = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, newSchemaName.Name.O, newTableNames[i].O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, newTableNames[i].O)
}

if shouldDiscardOldTable && shouldDiscardNewTable {
Expand Down Expand Up @@ -437,12 +429,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Error(err))
discard, fErr := p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query)
if fErr != nil {
return false, errors.Trace(fErr)
}
if discard {
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
return true, nil
}
return true, errors.Trace(err)
Expand All @@ -467,11 +454,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
discard, err := p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
if !discard {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
Expand All @@ -487,13 +470,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
}
// since we can find the old table, it must be able to find the old schema.
// 2. If we can find the preTableInfo, we filter it by the old table name.
skipByOldTableName, err := p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.Query)
if err != nil {
return true, errors.Trace(err)
}
skipByNewTableName, err := p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query)
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
if err != nil {
return true, errors.Trace(err)
}
Expand All @@ -518,11 +496,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
if job.BinlogInfo.TableInfo != nil {
job.TableName = job.BinlogInfo.TableInfo.Name.O
}
skip, err = p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query)
if err != nil {
return false, errors.Trace(err)
}
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
}

if skip {
Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func TestHandleJob(t *testing.T) {
job = helper.DDL2Job("alter table test1.t1 add column c1 int")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)
require.False(t, skip)

job = helper.DDL2Job("create table test1.testStartTs(id int)")
skip, err = ddlJobPullerImpl.handleJob(job)
Expand All @@ -445,7 +445,7 @@ func TestHandleJob(t *testing.T) {
job.StartTS = 1
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)
require.False(t, skip)

job = helper.DDL2Job("create table test1.t2(id int)")
skip, err = ddlJobPullerImpl.handleJob(job)
Expand Down
27 changes: 19 additions & 8 deletions cmd/filter-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"

timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -79,19 +80,29 @@ func runFilter(cmd *cobra.Command, args []string) {
}
fmt.Printf("Table: %s, Not matched filter rule\n", table)
case "ddl":
startTs := uint64(0)
ddlType := timodel.ActionCreateTable
discard, err := ft.ShouldDiscardDDL(startTs,
ddlType,
tableAndSchema[0],
tableAndSchema[1],
ddl)
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1])
if discard {
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
return
}
ignored, err := ft.ShouldIgnoreDDLEvent(&model.DDLEvent{
StartTs: uint64(0),
Query: ddl,
Type: ddlType,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: tableAndSchema[0],
Table: tableAndSchema[1],
},
},
})
if err != nil {
fmt.Printf("filter ddl error: %s, error: %v\n", ddl, err)
return
}
if discard {
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
if ignored {
fmt.Printf("DDL: %s, should be ignored by event filter rule\n", ddl)
return
}
fmt.Printf("DDL: %s, should not be discard by event filter rule\n", ddl)
Expand Down
Loading
Loading