Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10518
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jan 24, 2024
1 parent a3a7939 commit d7b3829
Show file tree
Hide file tree
Showing 17 changed files with 678 additions and 184 deletions.
148 changes: 148 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,154 @@ func (s *schemaStorageImpl) skipJob(job *timodel.Job) bool {
return !job.IsDone()
}

<<<<<<< HEAD
=======
// BuildDDLEvents by parsing the DDL job
func (s *schemaStorage) BuildDDLEvents(
ctx context.Context, job *timodel.Job,
) (ddlEvents []*model.DDLEvent, err error) {
switch job.Type {
case timodel.ActionRenameTables:
// The result contains more than one DDLEvent for a rename tables job.
ddlEvents, err = s.buildRenameEvents(ctx, job)
if err != nil {
return nil, errors.Trace(err)
}
default:
// parse preTableInfo
preSnap, err := s.GetSnapshot(ctx, job.BinlogInfo.FinishedTS-1)
if err != nil {
return nil, errors.Trace(err)
}
preTableInfo, err := preSnap.PreTableInfo(job)
if err != nil {
return nil, errors.Trace(err)
}

// parse tableInfo
var tableInfo *model.TableInfo
err = preSnap.FillSchemaName(job)
if err != nil {
log.Error("build DDL event fail", zap.Any("job", job), zap.Error(err))
return nil, errors.Trace(err)
}
// TODO: find a better way to refactor this. For example, drop table job should not
// have table info.
if job.BinlogInfo != nil && job.BinlogInfo.TableInfo != nil {
tableInfo = model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, job.BinlogInfo.TableInfo)

// TODO: remove this after job is fixed by TiDB.
// ref: https://github.com/pingcap/tidb/issues/43819
if job.Type == timodel.ActionExchangeTablePartition {
oldTableInfo, ok := preSnap.PhysicalTableByID(job.BinlogInfo.TableInfo.ID)
if !ok {
return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(job.TableID)
}
tableInfo.SchemaID = oldTableInfo.SchemaID
tableInfo.TableName = oldTableInfo.TableName
}
} else {
// Just retrieve the schema name for a DDL job that does not contain TableInfo.
// Currently supported by cdc are: ActionCreateSchema, ActionDropSchema,
// and ActionModifySchemaCharsetAndCollate.
tableInfo = &model.TableInfo{
TableName: model.TableName{Schema: job.SchemaName},
Version: job.BinlogInfo.FinishedTS,
}
}
event := new(model.DDLEvent)
event.FromJob(job, preTableInfo, tableInfo)
ddlEvents = append(ddlEvents, event)
}
return s.filterDDLEvents(ddlEvents)
}

// TODO: find a better way to refactor this function.
// buildRenameEvents gets a list of DDLEvent from a rename tables DDL job.
func (s *schemaStorage) buildRenameEvents(
ctx context.Context, job *timodel.Job,
) ([]*model.DDLEvent, error) {
var (
oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
newTableNames, oldSchemaNames []*timodel.CIStr
ddlEvents []*model.DDLEvent
)
err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs,
&newTableNames, &oldTableIDs, &oldSchemaNames)
if err != nil {
return nil, errors.Trace(err)
}

multiTableInfos := job.BinlogInfo.MultipleTableInfos
if len(multiTableInfos) != len(oldSchemaIDs) ||
len(multiTableInfos) != len(newSchemaIDs) ||
len(multiTableInfos) != len(newTableNames) ||
len(multiTableInfos) != len(oldTableIDs) ||
len(multiTableInfos) != len(oldSchemaNames) {
return nil, cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
}

preSnap, err := s.GetSnapshot(ctx, job.BinlogInfo.FinishedTS-1)
if err != nil {
return nil, errors.Trace(err)
}

for i, tableInfo := range multiTableInfos {
newSchema, ok := preSnap.SchemaByID(newSchemaIDs[i])
if !ok {
return nil, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(
newSchemaIDs[i])
}
newSchemaName := newSchema.Name.O
oldSchemaName := oldSchemaNames[i].O
event := new(model.DDLEvent)
preTableInfo, ok := preSnap.PhysicalTableByID(tableInfo.ID)
if !ok {
return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(
job.TableID)
}

tableInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName,
job.BinlogInfo.FinishedTS, tableInfo)
event.FromJobWithArgs(job, preTableInfo, tableInfo, oldSchemaName, newSchemaName)
ddlEvents = append(ddlEvents, event)
}
return ddlEvents, nil
}

// TODO: delete this function after integration test passed.
func (s *schemaStorage) filterDDLEvents(ddlEvents []*model.DDLEvent) ([]*model.DDLEvent, error) {
res := make([]*model.DDLEvent, 0, len(ddlEvents))
for _, event := range ddlEvents {
schemaName := event.TableInfo.TableName.Schema
table := event.TableInfo.TableName.Table
if event.Type == timodel.ActionRenameTable {
schemaName = event.PreTableInfo.TableName.Schema
table = event.PreTableInfo.TableName.Table
}

if s.filter.ShouldDiscardDDL(event.Type, schemaName, table) {
log.Error(
"discarded DDL event should not be sent to owner"+
"please report a bug to TiCDC if you see this log"+
"but it is no harm to your replication",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("query", event.Query),
zap.String("type", event.Type.String()),
zap.String("schema", event.TableInfo.TableName.Schema),
zap.String("table", event.TableInfo.TableName.Table),
zap.Uint64("startTs", event.StartTs),
zap.Uint64("commitTs", event.CommitTs),
)
continue
}
res = append(res, event)
}
return res, nil
}

>>>>>>> 8d3e31b32a (pkg/filter(ticdc): ignore ddl in ddl_manager (#10518))
// MockSchemaStorage is for tests.
type MockSchemaStorage struct {
Resolved uint64
Expand Down
29 changes: 29 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
<<<<<<< HEAD
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/orchestrator"
=======
pfilter "github.com/pingcap/tiflow/pkg/filter"
>>>>>>> 8d3e31b32a (pkg/filter(ticdc): ignore ddl in ddl_manager (#10518))
"github.com/pingcap/tiflow/pkg/pdutil"
redoCfg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/sink/observer"
Expand Down Expand Up @@ -123,8 +127,13 @@ type changefeed struct {
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
<<<<<<< HEAD
filter filter.Filter,
) (puller.DDLPuller, error)
=======
filter pfilter.Filter,
) puller.DDLPuller
>>>>>>> 8d3e31b32a (pkg/filter(ticdc): ignore ddl in ddl_manager (#10518))

newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
Expand Down Expand Up @@ -181,8 +190,13 @@ func newChangefeed4Test(
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
<<<<<<< HEAD
filter filter.Filter,
) (puller.DDLPuller, error),
=======
filter pfilter.Filter,
) puller.DDLPuller,
>>>>>>> 8d3e31b32a (pkg/filter(ticdc): ignore ddl in ddl_manager (#10518))
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
Expand Down Expand Up @@ -530,6 +544,7 @@ LOOP2:
}
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())

<<<<<<< HEAD
filter, err := filter.NewFilter(c.state.Info.Config, "")
if err != nil {
return errors.Trace(err)
Expand All @@ -540,6 +555,15 @@ LOOP2:
c.state.Info.Config,
c.id,
filter)
=======
filter, err := pfilter.NewFilter(c.latestInfo.Config, "")
if err != nil {
return errors.Trace(err)
}
c.schema, err = entry.NewSchemaStorage(
c.upstream.KVStorage, ddlStartTs,
c.latestInfo.Config.ForceReplicate, c.id, util.RoleOwner, filter)
>>>>>>> 8d3e31b32a (pkg/filter(ticdc): ignore ddl in ddl_manager (#10518))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -566,6 +590,7 @@ LOOP2:
})
c.ddlSink.run(cancelCtx)

<<<<<<< HEAD
c.ddlPuller, err = c.newDDLPuller(cancelCtx,
c.state.Info.Config,
c.upstream, ddlStartTs,
Expand All @@ -576,6 +601,9 @@ LOOP2:
return errors.Trace(err)
}

=======
c.ddlPuller = c.newDDLPuller(cancelCtx, c.upstream, ddlStartTs, c.id, c.schema, filter)
>>>>>>> 8d3e31b32a (pkg/filter(ticdc): ignore ddl in ddl_manager (#10518))
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -619,6 +647,7 @@ LOOP2:
ddlStartTs,
c.state.Status.CheckpointTs,
c.ddlSink,
filter,
c.ddlPuller,
c.schema,
c.redoDDLMgr,
Expand Down
Loading

0 comments on commit d7b3829

Please sign in to comment.