Skip to content

Commit

Permalink
entry (ticdc):remove useless code in entry package (#10505)
Browse files Browse the repository at this point in the history
ref #10457
  • Loading branch information
asddongmen authored Jan 25, 2024
1 parent c736ff5 commit 63ccb9d
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 244 deletions.
98 changes: 3 additions & 95 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ var (
)

var (
intLen = 8
tablePrefixLen = len(tablePrefix)
recordPrefixLen = len(recordPrefix)
metaPrefixLen = len(metaPrefix)
prefixTableIDLen = tablePrefixLen + intLen /*tableID*/
prefixRecordIDLen = recordPrefixLen + intLen /*recordID*/
intLen = 8
tablePrefixLen = len(tablePrefix)
prefixTableIDLen = tablePrefixLen + intLen /*tableID*/
)

// MetaType is for data structure meta/data flag.
Expand All @@ -63,36 +60,6 @@ const (
ListData MetaType = 'l'
)

type meta interface {
getType() MetaType
}

type metaHashData struct {
key string
field []byte
}

func (d metaHashData) getType() MetaType {
return HashData
}

type metaListData struct {
key string
index int64
}

func (d metaListData) getType() MetaType {
return ListData
}

type other struct {
tp MetaType
}

func (d other) getType() MetaType {
return d.tp
}

func decodeTableID(key []byte) (rest []byte, tableID int64, err error) {
if len(key) < prefixTableIDLen || !bytes.HasPrefix(key, tablePrefix) {
return nil, 0, cerror.ErrInvalidRecordKey.GenWithStackByArgs(key)
Expand All @@ -105,65 +72,6 @@ func decodeTableID(key []byte) (rest []byte, tableID int64, err error) {
return
}

func decodeRecordID(key []byte) (rest []byte, recordID int64, err error) {
if len(key) < prefixRecordIDLen || !bytes.HasPrefix(key, recordPrefix) {
return nil, 0, cerror.ErrInvalidRecordKey.GenWithStackByArgs(key)
}
key = key[recordPrefixLen:]
rest, recordID, err = codec.DecodeInt(key)
if err != nil {
return nil, 0, cerror.WrapError(cerror.ErrCodecDecode, err)
}
return
}

func decodeMetaKey(ek []byte) (meta, error) {
if !bytes.HasPrefix(ek, metaPrefix) {
return nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(ek)
}

ek = ek[metaPrefixLen:]
ek, rawKey, err := codec.DecodeBytes(ek, nil)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCodecDecode, err)
}
key := string(rawKey)

ek, rawTp, err := codec.DecodeUint(ek)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCodecDecode, err)
}
switch MetaType(rawTp) {
case HashData:
if len(ek) > 0 {
var field []byte
_, field, err = codec.DecodeBytes(ek, nil)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCodecDecode, err)
}
return metaHashData{key: key, field: field}, nil
}
if len(ek) > 0 {
// TODO: warning hash key decode failure
panic("hash key decode failure, should never happen")
}
case ListData:
if len(ek) == 0 {
panic("list key decode failure")
}
var index int64
_, index, err = codec.DecodeInt(ek)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCodecDecode, err)
}
return metaListData{key: key, index: index}, nil
// TODO decode other key
default:
return other{tp: MetaType(rawTp)}, nil
}
return nil, cerror.ErrUnknownMetaType.GenWithStackByArgs(rawTp)
}

// decodeRow decodes a byte slice into datums with an existing row map.
func decodeRow(b []byte, recordID kv.Handle, tableInfo *model.TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
if len(b) == 0 {
Expand Down
53 changes: 18 additions & 35 deletions cdc/entry/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,27 @@
package entry

import (
"encoding/hex"
"testing"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
)

func TestDecodeRecordKey(t *testing.T) {
t.Parallel()
recordPrefix := tablecodec.GenTableRecordPrefix(12345)
key := tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(67890))
key, tableID, err := decodeTableID(key)
require.Nil(t, err)
require.Equal(t, tableID, int64(12345))
key, recordID, err := decodeRecordID(key)
require.Nil(t, err)
require.Equal(t, recordID, int64(67890))
require.Equal(t, len(key), 0)
}

func TestDecodeListData(t *testing.T) {
t.Parallel()
key := []byte("hello")
var index int64 = 3

meta, err := decodeMetaKey(buildMetaKey(key, index))
require.Nil(t, err)
require.Equal(t, meta.getType(), ListData)
list := meta.(metaListData)
require.Equal(t, list.key, string(key))
require.Equal(t, list.index, index)
}

func buildMetaKey(key []byte, index int64) []byte {
ek := make([]byte, 0, len(metaPrefix)+len(key)+36)
ek = append(ek, metaPrefix...)
ek = codec.EncodeBytes(ek, key)
ek = codec.EncodeUint(ek, uint64(ListData))
return codec.EncodeInt(ek, index)
func TestDecodeTableID(t *testing.T) {
/*
"7480000000000000685f728000000000000001"
└─## decode hex key
└─"t\200\000\000\000\000\000\000h_r\200\000\000\000\000\000\000\001"
├─## table prefix
│ └─table: 104
└─## table row key
├─table: 104
└─row: 1
*/
key := "7480000000000000685f728000000000000001"
keyBytes, err := hex.DecodeString(key)
require.NoError(t, err)
tableID, err := DecodeTableID(keyBytes)
require.NoError(t, err)
require.Equal(t, tableID, int64(104))
}
36 changes: 3 additions & 33 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ func (s *schemaStorage) shouldIgnoreTable(t *model.TableInfo) bool {

// IsIneligibleTable returns whether the table is ineligible.
// It uses the snapshot of the given ts to check the table.
// Ineligible means that the table does not have a primary key
// or not null unique key.
func (s *schemaStorage) IsIneligibleTable(
ctx context.Context, tableID model.TableID, ts model.Ts,
) (bool, error) {
Expand Down Expand Up @@ -451,7 +453,7 @@ func (s *schemaStorage) BuildDDLEvents(
event.FromJob(job, preTableInfo, tableInfo)
ddlEvents = append(ddlEvents, event)
}
return s.filterDDLEvents(ddlEvents)
return ddlEvents, nil
}

// TODO: find a better way to refactor this function.
Expand Down Expand Up @@ -507,38 +509,6 @@ func (s *schemaStorage) buildRenameEvents(
return ddlEvents, nil
}

// TODO: delete this function after integration test passed.
func (s *schemaStorage) filterDDLEvents(ddlEvents []*model.DDLEvent) ([]*model.DDLEvent, error) {
res := make([]*model.DDLEvent, 0, len(ddlEvents))
for _, event := range ddlEvents {
schemaName := event.TableInfo.TableName.Schema
table := event.TableInfo.TableName.Table
if event.Type == timodel.ActionRenameTable {
schemaName = event.PreTableInfo.TableName.Schema
table = event.PreTableInfo.TableName.Table
}

if s.filter.ShouldDiscardDDL(event.Type, schemaName, table) {
log.Error(
"discarded DDL event should not be sent to owner"+
"please report a bug to TiCDC if you see this log"+
"but it is no harm to your replication",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("query", event.Query),
zap.String("type", event.Type.String()),
zap.String("schema", event.TableInfo.TableName.Schema),
zap.String("table", event.TableInfo.TableName.Table),
zap.Uint64("startTs", event.StartTs),
zap.Uint64("commitTs", event.CommitTs),
)
continue
}
res = append(res, event)
}
return res, nil
}

// MockSchemaStorage is for tests.
type MockSchemaStorage struct {
Resolved uint64
Expand Down
Loading

0 comments on commit 63ccb9d

Please sign in to comment.