Skip to content

Commit

Permalink
entry (ticdc): logs all tables to be replicated when changefeed initi…
Browse files Browse the repository at this point in the history
…alized (#10657)

ref #10457
  • Loading branch information
asddongmen authored Mar 5, 2024
1 parent fcd4bfa commit 237baf0
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 10 deletions.
18 changes: 16 additions & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func GetSchemaVersion(meta *timeta.Meta) (int64, error) {

// NewSingleSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSnapshotFromMeta(
id model.ChangeFeedID,
meta *timeta.Meta,
currentTs uint64,
forceReplicate bool,
Expand All @@ -134,11 +135,12 @@ func NewSingleSnapshotFromMeta(
snap.inner.currentTs = currentTs
return snap, nil
}
return NewSnapshotFromMeta(meta, currentTs, forceReplicate, filter)
return NewSnapshotFromMeta(id, meta, currentTs, forceReplicate, filter)
}

// NewSnapshotFromMeta creates a schema snapshot from meta.
func NewSnapshotFromMeta(
id model.ChangeFeedID,
meta *timeta.Meta,
currentTs uint64,
forceReplicate bool,
Expand All @@ -151,7 +153,8 @@ func NewSnapshotFromMeta(
}
// `tag` is used to reverse sort all versions in the generated snapshot.
tag := negative(currentTs)

// record all tables to be replicated for logging use
tables := make([]*model.TableInfo, 0, 1024)
for _, dbinfo := range dbinfos {
if filter.ShouldIgnoreSchema(dbinfo.Name.O) {
log.Debug("ignore database", zap.String("db", dbinfo.Name.O))
Expand Down Expand Up @@ -190,6 +193,8 @@ func NewSnapshotFromMeta(
ineligible := !tableInfo.IsEligible(forceReplicate)
if ineligible {
snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: tableInfo.ID, tag: tag})
} else {
tables = append(tables, tableInfo)
}
if pi := tableInfo.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
Expand All @@ -204,6 +209,15 @@ func NewSnapshotFromMeta(
}
}
snap.inner.currentTs = currentTs
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%d tables to be replicated: ", len(tables)))
for _, table := range tables {
sb.WriteString(fmt.Sprintf("%s.%s, ", table.TableName.Schema, table.TableName.Table))
}
log.Info("schema snapshot created",
zap.Stringer("changefeed", id),
zap.Uint64("currentTs", currentTs),
zap.String("tables", sb.String()))
return snap, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewSchemaStorage(
snap = schema.NewEmptySnapshot(forceReplicate)
} else {
meta := kv.GetSnapshotMeta(storage, startTs)
snap, err = schema.NewSnapshotFromMeta(meta, startTs, forceReplicate, filter)
snap, err = schema.NewSnapshotFromMeta(id, meta, startTs, forceReplicate, filter)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func TestCreateSnapFromMeta(t *testing.T) {
meta := kv.GetSnapshotMeta(store, ver.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false, f)
snap, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta, ver.Ver, false, f)
require.Nil(t, err)
_, ok := snap.TableByName("test", "simple_test1")
require.True(t, ok)
Expand Down Expand Up @@ -731,12 +731,12 @@ func TestExplicitTables(t *testing.T) {
meta1 := kv.GetSnapshotMeta(store, ver1.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap1, err := schema.NewSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */, f)
snap1, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta1, ver1.Ver, true /* forceReplicate */, f)
require.Nil(t, err)
meta2 := kv.GetSnapshotMeta(store, ver2.Ver)
snap2, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */, f)
snap2, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta2, ver2.Ver, false /* forceReplicate */, f)
require.Nil(t, err)
snap3, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */, f)
snap3, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta2, ver2.Ver, true /* forceReplicate */, f)
require.Nil(t, err)

// we don't need to count system tables since TiCDC
Expand Down Expand Up @@ -890,7 +890,7 @@ func TestSchemaStorage(t *testing.T) {
for _, job := range jobs {
ts := job.BinlogInfo.FinishedTS
meta := kv.GetSnapshotMeta(store, ts)
snapFromMeta, err := schema.NewSnapshotFromMeta(meta, ts, false, f)
snapFromMeta, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta, ts, false, f)
require.Nil(t, err)
snapFromSchemaStore, err := schemaStorage.GetSnapshot(ctx, ts)
require.Nil(t, err)
Expand Down Expand Up @@ -972,7 +972,7 @@ func TestHandleKey(t *testing.T) {
meta := kv.GetSnapshotMeta(store, ver.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false, f)
snap, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta, ver.Ver, false, f)
require.Nil(t, err)
tb1, ok := snap.TableByName("test", "simple_test1")
require.True(t, ok)
Expand Down
4 changes: 3 additions & 1 deletion cdc/entry/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func VerifyTables(
err error,
) {
meta := kv.GetSnapshotMeta(storage, startTs)
snap, err := schema.NewSingleSnapshotFromMeta(meta, startTs, false /* explicitTables */, f)
snap, err := schema.NewSingleSnapshotFromMeta(
model.ChangeFeedID4Test("api", "verifyTable"),
meta, startTs, false /* explicitTables */, f)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
Expand Down

0 comments on commit 237baf0

Please sign in to comment.