From 237baf01aa4ab5b12602a5b454b90ce5075b79cb Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 5 Mar 2024 12:33:34 +0800 Subject: [PATCH] entry (ticdc): logs all tables to be replicated when changefeed initialized (#10657) ref pingcap/tiflow#10457 --- cdc/entry/schema/snapshot.go | 18 ++++++++++++++++-- cdc/entry/schema_storage.go | 2 +- cdc/entry/schema_storage_test.go | 12 ++++++------ cdc/entry/validator.go | 4 +++- 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 94a10ae137b..daf9e3078cb 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -123,6 +123,7 @@ func GetSchemaVersion(meta *timeta.Meta) (int64, error) { // NewSingleSnapshotFromMeta creates a new single schema snapshot from a tidb meta func NewSingleSnapshotFromMeta( + id model.ChangeFeedID, meta *timeta.Meta, currentTs uint64, forceReplicate bool, @@ -134,11 +135,12 @@ func NewSingleSnapshotFromMeta( snap.inner.currentTs = currentTs return snap, nil } - return NewSnapshotFromMeta(meta, currentTs, forceReplicate, filter) + return NewSnapshotFromMeta(id, meta, currentTs, forceReplicate, filter) } // NewSnapshotFromMeta creates a schema snapshot from meta. func NewSnapshotFromMeta( + id model.ChangeFeedID, meta *timeta.Meta, currentTs uint64, forceReplicate bool, @@ -151,7 +153,8 @@ func NewSnapshotFromMeta( } // `tag` is used to reverse sort all versions in the generated snapshot. tag := negative(currentTs) - + // record all tables to be replicated for logging use + tables := make([]*model.TableInfo, 0, 1024) for _, dbinfo := range dbinfos { if filter.ShouldIgnoreSchema(dbinfo.Name.O) { log.Debug("ignore database", zap.String("db", dbinfo.Name.O)) @@ -190,6 +193,8 @@ func NewSnapshotFromMeta( ineligible := !tableInfo.IsEligible(forceReplicate) if ineligible { snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: tableInfo.ID, tag: tag}) + } else { + tables = append(tables, tableInfo) } if pi := tableInfo.GetPartitionInfo(); pi != nil { for _, partition := range pi.Definitions { @@ -204,6 +209,15 @@ func NewSnapshotFromMeta( } } snap.inner.currentTs = currentTs + var sb strings.Builder + sb.WriteString(fmt.Sprintf("%d tables to be replicated: ", len(tables))) + for _, table := range tables { + sb.WriteString(fmt.Sprintf("%s.%s, ", table.TableName.Schema, table.TableName.Table)) + } + log.Info("schema snapshot created", + zap.Stringer("changefeed", id), + zap.Uint64("currentTs", currentTs), + zap.String("tables", sb.String())) return snap, nil } diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index f8e8d4072b3..a5c68b4dc17 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -100,7 +100,7 @@ func NewSchemaStorage( snap = schema.NewEmptySnapshot(forceReplicate) } else { meta := kv.GetSnapshotMeta(storage, startTs) - snap, err = schema.NewSnapshotFromMeta(meta, startTs, forceReplicate, filter) + snap, err = schema.NewSnapshotFromMeta(id, meta, startTs, forceReplicate, filter) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/entry/schema_storage_test.go b/cdc/entry/schema_storage_test.go index ba7cb9926f0..0b0d02d3588 100644 --- a/cdc/entry/schema_storage_test.go +++ b/cdc/entry/schema_storage_test.go @@ -694,7 +694,7 @@ func TestCreateSnapFromMeta(t *testing.T) { meta := kv.GetSnapshotMeta(store, ver.Ver) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false, f) + snap, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta, ver.Ver, false, f) require.Nil(t, err) _, ok := snap.TableByName("test", "simple_test1") require.True(t, ok) @@ -731,12 +731,12 @@ func TestExplicitTables(t *testing.T) { meta1 := kv.GetSnapshotMeta(store, ver1.Ver) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - snap1, err := schema.NewSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */, f) + snap1, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta1, ver1.Ver, true /* forceReplicate */, f) require.Nil(t, err) meta2 := kv.GetSnapshotMeta(store, ver2.Ver) - snap2, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */, f) + snap2, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta2, ver2.Ver, false /* forceReplicate */, f) require.Nil(t, err) - snap3, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */, f) + snap3, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta2, ver2.Ver, true /* forceReplicate */, f) require.Nil(t, err) // we don't need to count system tables since TiCDC @@ -890,7 +890,7 @@ func TestSchemaStorage(t *testing.T) { for _, job := range jobs { ts := job.BinlogInfo.FinishedTS meta := kv.GetSnapshotMeta(store, ts) - snapFromMeta, err := schema.NewSnapshotFromMeta(meta, ts, false, f) + snapFromMeta, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta, ts, false, f) require.Nil(t, err) snapFromSchemaStore, err := schemaStorage.GetSnapshot(ctx, ts) require.Nil(t, err) @@ -972,7 +972,7 @@ func TestHandleKey(t *testing.T) { meta := kv.GetSnapshotMeta(store, ver.Ver) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false, f) + snap, err := schema.NewSnapshotFromMeta(model.DefaultChangeFeedID("test"), meta, ver.Ver, false, f) require.Nil(t, err) tb1, ok := snap.TableByName("test", "simple_test1") require.True(t, ok) diff --git a/cdc/entry/validator.go b/cdc/entry/validator.go index 3bb19bce561..e19ceb70772 100644 --- a/cdc/entry/validator.go +++ b/cdc/entry/validator.go @@ -34,7 +34,9 @@ func VerifyTables( err error, ) { meta := kv.GetSnapshotMeta(storage, startTs) - snap, err := schema.NewSingleSnapshotFromMeta(meta, startTs, false /* explicitTables */, f) + snap, err := schema.NewSingleSnapshotFromMeta( + model.ChangeFeedID4Test("api", "verifyTable"), + meta, startTs, false /* explicitTables */, f) if err != nil { return nil, nil, nil, errors.Trace(err) }