diff --git a/cdc/entry/codec.go b/cdc/entry/codec.go index a804bba1ed8..7224957d4d1 100644 --- a/cdc/entry/codec.go +++ b/cdc/entry/codec.go @@ -35,12 +35,9 @@ var ( ) var ( - intLen = 8 - tablePrefixLen = len(tablePrefix) - recordPrefixLen = len(recordPrefix) - metaPrefixLen = len(metaPrefix) - prefixTableIDLen = tablePrefixLen + intLen /*tableID*/ - prefixRecordIDLen = recordPrefixLen + intLen /*recordID*/ + intLen = 8 + tablePrefixLen = len(tablePrefix) + prefixTableIDLen = tablePrefixLen + intLen /*tableID*/ ) // MetaType is for data structure meta/data flag. @@ -63,36 +60,6 @@ const ( ListData MetaType = 'l' ) -type meta interface { - getType() MetaType -} - -type metaHashData struct { - key string - field []byte -} - -func (d metaHashData) getType() MetaType { - return HashData -} - -type metaListData struct { - key string - index int64 -} - -func (d metaListData) getType() MetaType { - return ListData -} - -type other struct { - tp MetaType -} - -func (d other) getType() MetaType { - return d.tp -} - func decodeTableID(key []byte) (rest []byte, tableID int64, err error) { if len(key) < prefixTableIDLen || !bytes.HasPrefix(key, tablePrefix) { return nil, 0, cerror.ErrInvalidRecordKey.GenWithStackByArgs(key) @@ -105,65 +72,6 @@ func decodeTableID(key []byte) (rest []byte, tableID int64, err error) { return } -func decodeRecordID(key []byte) (rest []byte, recordID int64, err error) { - if len(key) < prefixRecordIDLen || !bytes.HasPrefix(key, recordPrefix) { - return nil, 0, cerror.ErrInvalidRecordKey.GenWithStackByArgs(key) - } - key = key[recordPrefixLen:] - rest, recordID, err = codec.DecodeInt(key) - if err != nil { - return nil, 0, cerror.WrapError(cerror.ErrCodecDecode, err) - } - return -} - -func decodeMetaKey(ek []byte) (meta, error) { - if !bytes.HasPrefix(ek, metaPrefix) { - return nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(ek) - } - - ek = ek[metaPrefixLen:] - ek, rawKey, err := codec.DecodeBytes(ek, nil) - if err != nil { - return nil, cerror.WrapError(cerror.ErrCodecDecode, err) - } - key := string(rawKey) - - ek, rawTp, err := codec.DecodeUint(ek) - if err != nil { - return nil, cerror.WrapError(cerror.ErrCodecDecode, err) - } - switch MetaType(rawTp) { - case HashData: - if len(ek) > 0 { - var field []byte - _, field, err = codec.DecodeBytes(ek, nil) - if err != nil { - return nil, cerror.WrapError(cerror.ErrCodecDecode, err) - } - return metaHashData{key: key, field: field}, nil - } - if len(ek) > 0 { - // TODO: warning hash key decode failure - panic("hash key decode failure, should never happen") - } - case ListData: - if len(ek) == 0 { - panic("list key decode failure") - } - var index int64 - _, index, err = codec.DecodeInt(ek) - if err != nil { - return nil, cerror.WrapError(cerror.ErrCodecDecode, err) - } - return metaListData{key: key, index: index}, nil - // TODO decode other key - default: - return other{tp: MetaType(rawTp)}, nil - } - return nil, cerror.ErrUnknownMetaType.GenWithStackByArgs(rawTp) -} - // decodeRow decodes a byte slice into datums with an existing row map. func decodeRow(b []byte, recordID kv.Handle, tableInfo *model.TableInfo, tz *time.Location) (map[int64]types.Datum, error) { if len(b) == 0 { diff --git a/cdc/entry/codec_test.go b/cdc/entry/codec_test.go index f888091f278..96232718bb6 100644 --- a/cdc/entry/codec_test.go +++ b/cdc/entry/codec_test.go @@ -14,44 +14,27 @@ package entry import ( + "encoding/hex" "testing" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/tablecodec" - "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" ) -func TestDecodeRecordKey(t *testing.T) { - t.Parallel() - recordPrefix := tablecodec.GenTableRecordPrefix(12345) - key := tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(67890)) - key, tableID, err := decodeTableID(key) - require.Nil(t, err) - require.Equal(t, tableID, int64(12345)) - key, recordID, err := decodeRecordID(key) - require.Nil(t, err) - require.Equal(t, recordID, int64(67890)) - require.Equal(t, len(key), 0) -} - -func TestDecodeListData(t *testing.T) { - t.Parallel() - key := []byte("hello") - var index int64 = 3 - - meta, err := decodeMetaKey(buildMetaKey(key, index)) - require.Nil(t, err) - require.Equal(t, meta.getType(), ListData) - list := meta.(metaListData) - require.Equal(t, list.key, string(key)) - require.Equal(t, list.index, index) -} - -func buildMetaKey(key []byte, index int64) []byte { - ek := make([]byte, 0, len(metaPrefix)+len(key)+36) - ek = append(ek, metaPrefix...) - ek = codec.EncodeBytes(ek, key) - ek = codec.EncodeUint(ek, uint64(ListData)) - return codec.EncodeInt(ek, index) +func TestDecodeTableID(t *testing.T) { + /* + "7480000000000000685f728000000000000001" + └─## decode hex key + └─"t\200\000\000\000\000\000\000h_r\200\000\000\000\000\000\000\001" + ├─## table prefix + │ └─table: 104 + └─## table row key + ├─table: 104 + └─row: 1 + */ + key := "7480000000000000685f728000000000000001" + keyBytes, err := hex.DecodeString(key) + require.NoError(t, err) + tableID, err := DecodeTableID(keyBytes) + require.NoError(t, err) + require.Equal(t, tableID, int64(104)) } diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index dcc8edabcb1..f8e8d4072b3 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -319,6 +319,8 @@ func (s *schemaStorage) shouldIgnoreTable(t *model.TableInfo) bool { // IsIneligibleTable returns whether the table is ineligible. // It uses the snapshot of the given ts to check the table. +// Ineligible means that the table does not have a primary key +// or not null unique key. func (s *schemaStorage) IsIneligibleTable( ctx context.Context, tableID model.TableID, ts model.Ts, ) (bool, error) { @@ -451,7 +453,7 @@ func (s *schemaStorage) BuildDDLEvents( event.FromJob(job, preTableInfo, tableInfo) ddlEvents = append(ddlEvents, event) } - return s.filterDDLEvents(ddlEvents) + return ddlEvents, nil } // TODO: find a better way to refactor this function. @@ -507,38 +509,6 @@ func (s *schemaStorage) buildRenameEvents( return ddlEvents, nil } -// TODO: delete this function after integration test passed. -func (s *schemaStorage) filterDDLEvents(ddlEvents []*model.DDLEvent) ([]*model.DDLEvent, error) { - res := make([]*model.DDLEvent, 0, len(ddlEvents)) - for _, event := range ddlEvents { - schemaName := event.TableInfo.TableName.Schema - table := event.TableInfo.TableName.Table - if event.Type == timodel.ActionRenameTable { - schemaName = event.PreTableInfo.TableName.Schema - table = event.PreTableInfo.TableName.Table - } - - if s.filter.ShouldDiscardDDL(event.Type, schemaName, table) { - log.Error( - "discarded DDL event should not be sent to owner"+ - "please report a bug to TiCDC if you see this log"+ - "but it is no harm to your replication", - zap.String("namespace", s.id.Namespace), - zap.String("changefeed", s.id.ID), - zap.String("query", event.Query), - zap.String("type", event.Type.String()), - zap.String("schema", event.TableInfo.TableName.Schema), - zap.String("table", event.TableInfo.TableName.Table), - zap.Uint64("startTs", event.StartTs), - zap.Uint64("commitTs", event.CommitTs), - ) - continue - } - res = append(res, event) - } - return res, nil -} - // MockSchemaStorage is for tests. type MockSchemaStorage struct { Resolved uint64 diff --git a/cdc/owner/schema_test.go b/cdc/entry/schema_test.go similarity index 85% rename from cdc/owner/schema_test.go rename to cdc/entry/schema_test.go index 13c5b364cce..109141e2b47 100644 --- a/cdc/owner/schema_test.go +++ b/cdc/entry/schema_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package owner +package entry import ( "context" @@ -23,7 +23,6 @@ import ( timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/types" - "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/filter" @@ -32,16 +31,14 @@ import ( "github.com/tikv/client-go/v2/oracle" ) -var dummyChangeFeedID = model.DefaultChangeFeedID("dummy_changefeed") - func TestAllPhysicalTables(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) + helper := NewSchemaTestHelper(t) defer helper.Close() ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - schema, err := entry.NewSchemaStorage(helper.Storage(), ver.Ver, + schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, false, dummyChangeFeedID, util.RoleTester, f) require.Nil(t, err) tableIDs, err := schema.AllPhysicalTables(context.Background(), ver.Ver) @@ -93,13 +90,13 @@ func TestAllPhysicalTables(t *testing.T) { } func TestAllTables(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) + helper := NewSchemaTestHelper(t) defer helper.Close() ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - schema, err := entry.NewSchemaStorage(helper.Storage(), ver.Ver, + schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, false, dummyChangeFeedID, util.RoleTester, f) require.Nil(t, err) tableInfos, err := schema.AllTables(context.Background(), ver.Ver) @@ -132,13 +129,13 @@ func TestAllTables(t *testing.T) { } func TestIsIneligibleTableID(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) + helper := NewSchemaTestHelper(t) defer helper.Close() ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - schema, err := entry.NewSchemaStorage(helper.Storage(), ver.Ver, + schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, false, dummyChangeFeedID, util.RoleTester, f) require.Nil(t, err) // add normal table @@ -190,13 +187,13 @@ func compareEvents(t *testing.T, e1, e2 *model.DDLEvent) { } func TestBuildDDLEventsFromSingleTableDDL(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) + helper := NewSchemaTestHelper(t) defer helper.Close() ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - schema, err := entry.NewSchemaStorage(helper.Storage(), ver.Ver, + schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, false, dummyChangeFeedID, util.RoleTester, f) require.Nil(t, err) // add normal table @@ -265,14 +262,14 @@ func TestBuildDDLEventsFromSingleTableDDL(t *testing.T) { } func TestBuildDDLEventsFromRenameTablesDDL(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) + helper := NewSchemaTestHelper(t) defer helper.Close() ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - schema, err := entry.NewSchemaStorage(helper.Storage(), ver.Ver, + schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, false, dummyChangeFeedID, util.RoleTester, f) require.Nil(t, err) ctx := context.Background() @@ -393,14 +390,14 @@ func TestBuildDDLEventsFromRenameTablesDDL(t *testing.T) { } func TestBuildDDLEventsFromDropTablesDDL(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) + helper := NewSchemaTestHelper(t) defer helper.Close() ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - schema, err := entry.NewSchemaStorage(helper.Storage(), ver.Ver, + schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, false, dummyChangeFeedID, util.RoleTester, f) require.Nil(t, err) // add test.t1 @@ -496,14 +493,14 @@ func TestBuildDDLEventsFromDropTablesDDL(t *testing.T) { } func TestBuildDDLEventsFromDropViewsDDL(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) + helper := NewSchemaTestHelper(t) defer helper.Close() ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) - schema, err := entry.NewSchemaStorage(helper.Storage(), ver.Ver, + schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, false, dummyChangeFeedID, util.RoleTester, f) require.Nil(t, err) ctx := context.Background() @@ -615,66 +612,3 @@ func TestBuildDDLEventsFromDropViewsDDL(t *testing.T) { }, }) } - -func TestBuildIgnoredDDLJob(t *testing.T) { - helper := entry.NewSchemaTestHelper(t) - defer helper.Close() - - ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) - require.Nil(t, err) - cfg := config.GetDefaultReplicaConfig() - // only replicate ddl event of test.tb1 and test.tb2 - cfg.Filter.Rules = []string{"test.tb1", "test.tb2"} - f, err := filter.NewFilter(cfg, "") - require.Nil(t, err) - schema, err := entry.NewSchemaStorage(helper.Storage(), ver.Ver, - false, dummyChangeFeedID, util.RoleTester, f) - require.Nil(t, err) - ctx := context.Background() - // test case 1: Will not filter out create test.tb1 ddl. - job := helper.DDL2Job("create table test.tb1(id int primary key)") - schema.AdvanceResolvedTs(job.BinlogInfo.FinishedTS - 1) - events, err := schema.BuildDDLEvents(ctx, job) - require.Nil(t, err) - require.Len(t, events, 1) - require.Nil(t, schema.HandleDDLJob(job)) - - // test case 2: Will not filter out create test.tb2 ddl. - job = helper.DDL2Job("create table test.tb2(id int primary key)") - schema.AdvanceResolvedTs(job.BinlogInfo.FinishedTS - 1) - events, err = schema.BuildDDLEvents(ctx, job) - require.Nil(t, err) - require.Len(t, events, 1) - require.Nil(t, schema.HandleDDLJob(job)) - - // test case 3: Will not filter out alter test.tb1 ddl. - job = helper.DDL2Job("alter table test.tb1 add age int") - schema.AdvanceResolvedTs(job.BinlogInfo.FinishedTS - 1) - events, err = schema.BuildDDLEvents(ctx, job) - require.Nil(t, err) - require.Len(t, events, 1) - require.Nil(t, schema.HandleDDLJob(job)) - - // test case 4: Will not filter out alter test.tb2 ddl. - job = helper.DDL2Job("alter table test.tb2 add name char(10)") - schema.AdvanceResolvedTs(job.BinlogInfo.FinishedTS - 1) - events, err = schema.BuildDDLEvents(ctx, job) - require.Nil(t, err) - require.Len(t, events, 1) - require.Nil(t, schema.HandleDDLJob(job)) - - // test case 5: Will filter create test.tb3 ddl. - job = helper.DDL2Job("create table test.tb3(id int primary key)") - schema.AdvanceResolvedTs(job.BinlogInfo.FinishedTS - 1) - events, err = schema.BuildDDLEvents(ctx, job) - require.Nil(t, err) - require.Len(t, events, 0) - require.Nil(t, schema.HandleDDLJob(job)) - - // test case 5: Will filter out drop test.tb3 ddl. - job = helper.DDL2Job("alter table test.tb3 add location char(100)") - schema.AdvanceResolvedTs(job.BinlogInfo.FinishedTS - 1) - events, err = schema.BuildDDLEvents(ctx, job) - require.Nil(t, err) - require.Len(t, events, 0) -} diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index 06bac8f4002..cc7857aa7cf 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -15,11 +15,13 @@ package entry import ( "context" + "encoding/hex" "encoding/json" "strings" "testing" "time" + "github.com/pingcap/log" ticonfig "github.com/pingcap/tidb/pkg/config" tiddl "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" @@ -37,6 +39,7 @@ import ( "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" ) // SchemaTestHelper is a test helper for schema which creates an internal tidb instance to generate DDL jobs with meta information @@ -194,6 +197,7 @@ func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.Ro polymorphicEvent := model.NewPolymorphicEvent(rawKV) err := s.mounter.DecodeEvent(context.Background(), polymorphicEvent) require.NoError(s.t, err) + log.Info("fizz dml event", zap.String("key", hex.EncodeToString(polymorphicEvent.RawKV.Key))) return polymorphicEvent.Row }