Skip to content

Commit

Permalink
adjust ut
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyangyu committed May 6, 2022
1 parent 59c8c6b commit 71357b5
Show file tree
Hide file tree
Showing 13 changed files with 2,149 additions and 275 deletions.
55 changes: 23 additions & 32 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/avro"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

// AvroEventBatchEncoder converts the events to binary Avro data
type AvroEventBatchEncoder struct {
keySchemaManager *avro.AvroSchemaManager
valueSchemaManager *avro.AvroSchemaManager
keySchemaManager *AvroSchemaManager
valueSchemaManager *AvroSchemaManager
resultBuf []*MQMessage

enableTiDBExtension bool
Expand Down Expand Up @@ -136,8 +135,8 @@ func (a *AvroEventBatchEncoder) avroEncode(ctx context.Context, e *model.RowChan
var cols []*model.Column
var colInfos []rowcodec.ColInfo
var enableTiDBExtension bool
var schemaManager *avro.AvroSchemaManager
var operation string = ""
var schemaManager *AvroSchemaManager
var operation string
if key {
cols, colInfos = e.HandleKeyColInfos()
enableTiDBExtension = false
Expand All @@ -149,9 +148,11 @@ func (a *AvroEventBatchEncoder) avroEncode(ctx context.Context, e *model.RowChan
schemaManager = a.valueSchemaManager
if e.IsInsert() {
operation = "c"
}
if e.IsUpdate() {
} else if e.IsUpdate() {
operation = "u"
} else {
log.Panic("unknown operation", zap.Reflect("rowChangedEvent", e))
return nil, cerror.ErrAvroEncodeFailed.GenWithStack("unknown operation")
}
}

Expand Down Expand Up @@ -193,7 +194,7 @@ type avroSchemaTop struct {
Fields []map[string]interface{} `json:"fields"`
}

const TIDB_TYPE = "tidb.type"
const TIDB_TYPE = "tidbType"

var type2TiDBType = map[byte]string{
mysql.TypeTiny: "INT",
Expand Down Expand Up @@ -272,11 +273,11 @@ func rowToAvroSchema(fqdn string, columnInfo []*model.Column, colInfos []rowcode
if enableTiDBExtension {
top.Fields = append(top.Fields,
map[string]interface{}{
"name": "tidb.op",
"name": "tidbOp",
"type": "string",
},
map[string]interface{}{
"name": "tidb.commitTs",
"name": "tidbCommitTs",
"type": "long",
},
)
Expand All @@ -290,7 +291,7 @@ func rowToAvroSchema(fqdn string, columnInfo []*model.Column, colInfos []rowcode
return string(str), nil
}

func rowToAvroData(cols []*model.Column, colInfos []rowcodec.ColInfo, commitTs uint64, operation string, enableTiDBExtension bool, decimalHandlingMode string) (interface{}, error) {
func rowToAvroData(cols []*model.Column, colInfos []rowcodec.ColInfo, commitTs uint64, operation string, enableTiDBExtension bool, decimalHandlingMode string) (map[string]interface{}, error) {
ret := make(map[string]interface{}, len(cols))
for i, col := range cols {
if col == nil {
Expand All @@ -310,8 +311,8 @@ func rowToAvroData(cols []*model.Column, colInfos []rowcodec.ColInfo, commitTs u
}

if enableTiDBExtension {
ret["tidb.op"] = operation
ret["tidb.commitTs"] = commitTs
ret["tidbOp"] = operation
ret["tidbCommitTs"] = int64(commitTs)
}

return ret, nil
Expand Down Expand Up @@ -447,16 +448,16 @@ func columnToAvroData(col *model.Column, ft *types.FieldType, decimalHandlingMod
if col.Flag.IsUnsigned() {
return int64(col.Value.(uint64)), "long", nil
}
return col.Value.(int64), "int", nil
return int32(col.Value.(int64)), "int", nil
case mysql.TypeLonglong:
if col.Flag.IsUnsigned() {
return col.Value.(uint64), "long", nil
return int64(col.Value.(uint64)), "long", nil
}
return col.Value.(int64), "long", nil
case mysql.TypeFloat, mysql.TypeDouble:
return col.Value.(float64), "double", nil
case mysql.TypeBit:
return types.NewBinaryLiteralFromUint(col.Value.(uint64), -1), "bytes", nil
return []byte(types.NewBinaryLiteralFromUint(col.Value.(uint64), -1)), "bytes", nil
case mysql.TypeNewDecimal:
if decimalHandlingMode == "precise" {
v, succ := new(big.Rat).SetString(col.Value.(string))
Expand All @@ -469,19 +470,9 @@ func columnToAvroData(col *model.Column, ft *types.FieldType, decimalHandlingMod
}
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
if col.Flag.IsBinary() {
switch val := col.Value.(type) {
case string:
return []byte(val), "bytes", nil
case []byte:
return val, "bytes", nil
}
return col.Value, "bytes", nil
} else {
switch val := col.Value.(type) {
case string:
return val, "string", nil
case []byte:
return string(val), "string", nil
}
return string(col.Value.([]byte)), "string", nil
}
log.Panic("Avro could not process text-like type", zap.Reflect("col", col))
return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack("unknown datum value")
Expand Down Expand Up @@ -525,8 +516,8 @@ func (r *avroEncodeResult) toEnvelope() ([]byte, error) {

type avroEventBatchEncoderBuilder struct {
config *Config
keySchemaManager *avro.AvroSchemaManager
valueSchemaManager *avro.AvroSchemaManager
keySchemaManager *AvroSchemaManager
valueSchemaManager *AvroSchemaManager
}

const (
Expand All @@ -536,12 +527,12 @@ const (

func newAvroEventBatchEncoderBuilder(config *Config) (EncoderBuilder, error) {
ctx := context.Background()
keySchemaManager, err := avro.NewAvroSchemaManager(ctx, nil, config.avroSchemaRegistry, keySchemaSuffix)
keySchemaManager, err := NewAvroSchemaManager(ctx, nil, config.avroSchemaRegistry, keySchemaSuffix)
if err != nil {
return nil, errors.Trace(err)
}

valueSchemaManager, err := avro.NewAvroSchemaManager(ctx, nil, config.avroSchemaRegistry, valueSchemaSuffix)
valueSchemaManager, err := NewAvroSchemaManager(ctx, nil, config.avroSchemaRegistry, valueSchemaSuffix)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 71357b5

Please sign in to comment.