Skip to content

Commit

Permalink
codec(ticdc): avro encode DDL event (pingcap#10736)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Mar 7, 2024
1 parent da0656d commit 5df9142
Show file tree
Hide file tree
Showing 3 changed files with 522 additions and 6 deletions.
77 changes: 71 additions & 6 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/linkedin/goavro/v2"
"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
Expand Down Expand Up @@ -149,18 +150,33 @@ func (a *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)
return nil, nil
}

type ddlEvent struct {
Query string `json:"query"`
Type timodel.ActionType `json:"type"`
Schema string `json:"schema"`
Table string `json:"table"`
CommitTs uint64 `json:"commitTs"`
}

// EncodeDDLEvent only encode DDL event if the watermark event is enabled
// it's only used for the testing purpose.
func (a *BatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error) {
if a.config.EnableTiDBExtension && a.config.AvroEnableWatermark {
buf := new(bytes.Buffer)
data := []interface{}{ddlByte, e.Query}
for _, v := range data {
err := binary.Write(buf, binary.BigEndian, v)
if err != nil {
return nil, cerror.WrapError(cerror.ErrAvroToEnvelopeError, err)
}
_ = binary.Write(buf, binary.BigEndian, ddlByte)

event := &ddlEvent{
Query: e.Query,
Type: e.Type,
Schema: e.TableInfo.TableName.Schema,
Table: e.TableInfo.TableName.Table,
CommitTs: e.CommitTs,
}
data, err := json.Marshal(event)
if err != nil {
return nil, cerror.WrapError(cerror.ErrAvroToEnvelopeError, err)
}
buf.Write(data)

value := buf.Bytes()
return common.NewDDLMsg(config.ProtocolAvro, nil, value, e), nil
Expand Down Expand Up @@ -389,6 +405,55 @@ func sanitizeTopic(name string) string {
return strings.ReplaceAll(name, ".", replacementChar)
}

func flagFromTiDBType(tp string) model.ColumnFlagType {
var flag model.ColumnFlagType
if strings.Contains(tp, "UNSIGNED") {
flag.SetIsUnsigned()
}
return flag
}

func mysqlTypeFromTiDBType(tidbType string) byte {
var result byte
switch tidbType {
case "INT", "INT UNSIGNED":
result = mysql.TypeLong
case "BIGINT", "BIGINT UNSIGNED":
result = mysql.TypeLonglong
case "FLOAT":
result = mysql.TypeFloat
case "DOUBLE":
result = mysql.TypeDouble
case "BIT":
result = mysql.TypeBit
case "DECIMAL":
result = mysql.TypeNewDecimal
case "TEXT":
result = mysql.TypeVarchar
case "BLOB":
result = mysql.TypeLongBlob
case "ENUM":
result = mysql.TypeEnum
case "SET":
result = mysql.TypeSet
case "JSON":
result = mysql.TypeJSON
case "DATE":
result = mysql.TypeDate
case "DATETIME":
result = mysql.TypeDatetime
case "TIMESTAMP":
result = mysql.TypeTimestamp
case "TIME":
result = mysql.TypeDuration
case "YEAR":
result = mysql.TypeYear
default:
log.Panic("this should not happen, unknown TiDB type", zap.String("type", tidbType))
}
return result
}

// https://github.com/debezium/debezium/blob/9f7ede0e0695f012c6c4e715e96aed85eecf6b5f \
// /debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/ \
// MySqlAntlrDdlParser.java#L374
Expand Down
87 changes: 87 additions & 0 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/linkedin/goavro/v2"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
Expand Down Expand Up @@ -1006,3 +1007,89 @@ func TestArvoAppendRowChangedEventWithCallback(t *testing.T) {
require.Equal(t, expected, count, "expected one callback be called")
}
}

func TestEncodeCheckpoint(t *testing.T) {
t.Parallel()

config := &common.Config{
EnableTiDBExtension: true,
AvroEnableWatermark: true,
}

encoder := &BatchEncoder{
namespace: model.DefaultNamespace,
result: make([]*common.Message, 0, 1),
config: config,
}

message, err := encoder.EncodeCheckpointEvent(446266400629063682)
require.NoError(t, err)
require.NotNil(t, message)

topic := "test-topic"
decoder := NewDecoder(config, nil, topic)
err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

messageType, exist, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, exist)
require.Equal(t, model.MessageTypeResolved, messageType)

obtained, err := decoder.NextResolvedEvent()
require.NoError(t, err)
require.Equal(t, uint64(446266400629063682), obtained)
}

func TestEncodeDDLEvent(t *testing.T) {
t.Parallel()

config := &common.Config{
EnableTiDBExtension: true,
AvroEnableWatermark: true,
}

encoder := &BatchEncoder{
namespace: model.DefaultNamespace,
result: make([]*common.Message, 0, 1),
config: config,
}

message, err := encoder.EncodeDDLEvent(&model.DDLEvent{
StartTs: 1020,
CommitTs: 1030,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "test",
Table: "t1",
TableID: 0,
IsPartition: false,
},
},
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t1 ADD COLUMN a int",
})
require.NoError(t, err)
require.NotNil(t, message)

topic := "test-topic"
decoder := NewDecoder(config, nil, topic)
err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

messageType, exist, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, exist)
require.Equal(t, model.MessageTypeDDL, messageType)

decodedEvent, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedEvent)
require.Equal(t, uint64(1030), decodedEvent.CommitTs)
require.Equal(t, timodel.ActionAddColumn, decodedEvent.Type)
require.Equal(t, "ALTER TABLE test.t1 ADD COLUMN a int", decodedEvent.Query)
require.Equal(t, "test", decodedEvent.TableInfo.TableName.Schema)
require.Equal(t, "t1", decodedEvent.TableInfo.TableName.Table)
require.Equal(t, int64(0), decodedEvent.TableInfo.TableName.TableID)
require.False(t, decodedEvent.TableInfo.TableName.IsPartition)
}
Loading

0 comments on commit 5df9142

Please sign in to comment.