Skip to content

Commit

Permalink
This is an automated cherry-pick of #3747
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Rustin170506 authored and ti-chi-bot committed Dec 8, 2021
1 parent 81948a8 commit 279cb04
Show file tree
Hide file tree
Showing 16 changed files with 627 additions and 81 deletions.
3 changes: 2 additions & 1 deletion cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
Expand Down Expand Up @@ -88,7 +89,7 @@ func (a *AvroEventBatchEncoder) SetTimeZone(tz *time.Location) {
// AppendRowChangedEvent appends a row change event to the encoder
// NOTE: the encoder can only store one RowChangedEvent!
func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
mqMessage := NewMQMessage(ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table)
mqMessage := NewMQMessage(config.ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table)

if !e.IsDelete() {
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, a.tz)
Expand Down
5 changes: 3 additions & 2 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/golang/protobuf/proto" // nolint:staticcheck
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
canal "github.com/pingcap/ticdc/proto/canal"
mm "github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -373,7 +374,7 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage,
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

return newDDLMQMessage(ProtocolCanal, nil, b, e), nil
return newDDLMQMessage(config.ProtocolCanal, nil, b, e), nil
}

// Build implements the EventBatchEncoder interface
Expand All @@ -391,7 +392,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if err != nil {
log.Panic("Error when serializing Canal packet", zap.Error(err))
}
ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
d.messages.Reset()
d.resetPacket()
return []*MQMessage{ret}
Expand Down
20 changes: 19 additions & 1 deletion cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerrors "github.com/pingcap/ticdc/pkg/errors"
canal "github.com/pingcap/ticdc/proto/canal"
"go.uber.org/zap"
Expand Down Expand Up @@ -184,7 +185,20 @@ func (c *CanalFlatEventBatchEncoder) newFlatMessageForDDL(e *model.DDLEvent) *ca

// EncodeCheckpointEvent is no-op
func (c *CanalFlatEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
<<<<<<< HEAD
return nil, nil
=======
if !c.enableTiDBExtension {
return nil, nil
}

msg := c.newFlatMessage4CheckpointEvent(ts)
value, err := json.Marshal(msg)
if err != nil {
return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err)
}
return newResolvedMQMessage(config.ProtocolCanalJSON, nil, value, ts), nil
>>>>>>> bedf904e7 (config(ticdc): Fix old value configuration check for maxwell protocol (#3747))
}

// AppendRowChangedEvent implements the interface EventBatchEncoder
Expand Down Expand Up @@ -222,7 +236,7 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa
if err != nil {
return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err)
}
return newDDLMQMessage(ProtocolCanalJSON, nil, value, e), nil
return newDDLMQMessage(config.ProtocolCanalJSON, nil, value, e), nil
}

// Build implements the EventBatchEncoder interface
Expand All @@ -237,7 +251,11 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
<<<<<<< HEAD
ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
=======
ret[i] = NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable())
>>>>>>> bedf904e7 (config(ticdc): Fix old value configuration check for maxwell protocol (#3747))
}
c.resolvedBuf = c.resolvedBuf[0:0]
return ret
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink/codec/craft"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
)

Expand All @@ -38,15 +39,15 @@ type CraftEventBatchEncoder struct {

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
return newResolvedMQMessage(ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil
return newResolvedMQMessage(config.ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil
}

func (e *CraftEventBatchEncoder) flush() {
headers := e.rowChangedBuffer.GetHeaders()
ts := headers.GetTs(0)
schema := headers.GetSchema(0)
table := headers.GetTable(0)
e.messageBuf = append(e.messageBuf, NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table))
e.messageBuf = append(e.messageBuf, NewMQMessage(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table))
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
Expand All @@ -65,7 +66,7 @@ func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult,

// EncodeDDLEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) {
return newDDLMQMessage(ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil
return newDDLMQMessage(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil
}

// Build implements the EventBatchEncoder interface
Expand Down
58 changes: 12 additions & 46 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package codec
import (
"context"
"encoding/binary"
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/security"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -63,7 +63,7 @@ type MQMessage struct {
Schema *string // schema
Table *string // table
Type model.MqMessageType // type
Protocol Protocol // protocol
Protocol config.Protocol // protocol
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
Expand All @@ -83,17 +83,17 @@ func (m *MQMessage) PhysicalTime() time.Time {
return oracle.GetTimeFromTS(m.Ts)
}

func newDDLMQMessage(proto Protocol, key, value []byte, event *model.DDLEvent) *MQMessage {
func newDDLMQMessage(proto config.Protocol, key, value []byte, event *model.DDLEvent) *MQMessage {
return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table)
}

func newResolvedMQMessage(proto Protocol, key, value []byte, ts uint64) *MQMessage {
func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *MQMessage {
return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved, nil, nil)
}

// NewMQMessage should be used when creating a MQMessage struct.
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(proto Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage {
func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage {
ret := &MQMessage{
Key: nil,
Value: nil,
Expand Down Expand Up @@ -143,58 +143,24 @@ const (
EncoderNeedSyncWrite
)

// Protocol is the protocol of the mq message
type Protocol int

// Enum types of the Protocol
const (
ProtocolDefault Protocol = iota
ProtocolCanal
ProtocolAvro
ProtocolMaxwell
ProtocolCanalJSON
ProtocolCraft
)

// FromString converts the protocol from string to Protocol enum type
func (p *Protocol) FromString(protocol string) {
switch strings.ToLower(protocol) {
case "default":
*p = ProtocolDefault
case "canal":
*p = ProtocolCanal
case "avro":
*p = ProtocolAvro
case "maxwell":
*p = ProtocolMaxwell
case "canal-json":
*p = ProtocolCanalJSON
case "craft":
*p = ProtocolCraft
default:
*p = ProtocolDefault
log.Warn("can't support codec protocol, using default protocol", zap.String("protocol", protocol))
}
}

type EncoderBuilder interface {
Build(ctx context.Context) (EventBatchEncoder, error)
}

// NewEventBatchEncoderBuilder returns an EncoderBuilder
func NewEventBatchEncoderBuilder(p Protocol, credential *security.Credential, opts map[string]string) (EncoderBuilder, error) {
func NewEventBatchEncoderBuilder(p config.Protocol, credential *security.Credential, opts map[string]string) (EncoderBuilder, error) {
switch p {
case ProtocolDefault:
case config.ProtocolDefault:
return newJSONEventBatchEncoderBuilder(opts), nil
case ProtocolCanal:
case config.ProtocolCanal:
return newCanalEventBatchEncoderBuilder(opts), nil
case ProtocolAvro:
case config.ProtocolAvro:
return newAvroEventBatchEncoderBuilder(credential, opts)
case ProtocolMaxwell:
case config.ProtocolMaxwell:
return newMaxwellEventBatchEncoderBuilder(opts), nil
case ProtocolCanalJSON:
case config.ProtocolCanalJSON:
return newCanalFlatEventBatchEncoderBuilder(opts), nil
case ProtocolCraft:
case config.ProtocolCraft:
return newCraftEventBatchEncoderBuilder(opts), nil
default:
log.Warn("unknown codec protocol value of EventBatchEncoder, use open-protocol as the default", zap.Int("protocol_value", int(p)))
Expand Down
13 changes: 7 additions & 6 deletions cdc/sink/codec/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package codec
import (
"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/util/testleak"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -54,15 +55,15 @@ func (s *codecInterfaceSuite) TestCreate(c *check.C) {
CommitTs: 5678,
}

msg := NewMQMessage(ProtocolDefault, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MqMessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table)
msg := NewMQMessage(config.ProtocolDefault, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MqMessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table)

c.Assert(msg.Key, check.BytesEquals, []byte("key1"))
c.Assert(msg.Value, check.BytesEquals, []byte("value1"))
c.Assert(msg.Ts, check.Equals, rowEvent.CommitTs)
c.Assert(msg.Type, check.Equals, model.MqMessageTypeRow)
c.Assert(*msg.Schema, check.Equals, rowEvent.Table.Schema)
c.Assert(*msg.Table, check.Equals, rowEvent.Table.Table)
c.Assert(msg.Protocol, check.Equals, ProtocolDefault)
c.Assert(msg.Protocol, check.Equals, config.ProtocolDefault)

job := &timodel.Job{
ID: 1071,
Expand Down Expand Up @@ -100,21 +101,21 @@ func (s *codecInterfaceSuite) TestCreate(c *check.C) {
ddlEvent := &model.DDLEvent{}
ddlEvent.FromJob(job, preTableInfo)

msg = newDDLMQMessage(ProtocolMaxwell, nil, []byte("value1"), ddlEvent)
msg = newDDLMQMessage(config.ProtocolMaxwell, nil, []byte("value1"), ddlEvent)
c.Assert(msg.Key, check.IsNil)
c.Assert(msg.Value, check.BytesEquals, []byte("value1"))
c.Assert(msg.Ts, check.Equals, ddlEvent.CommitTs)
c.Assert(msg.Type, check.Equals, model.MqMessageTypeDDL)
c.Assert(*msg.Schema, check.Equals, ddlEvent.TableInfo.Schema)
c.Assert(*msg.Table, check.Equals, ddlEvent.TableInfo.Table)
c.Assert(msg.Protocol, check.Equals, ProtocolMaxwell)
c.Assert(msg.Protocol, check.Equals, config.ProtocolMaxwell)

msg = newResolvedMQMessage(ProtocolCanal, []byte("key1"), nil, 1234)
msg = newResolvedMQMessage(config.ProtocolCanal, []byte("key1"), nil, 1234)
c.Assert(msg.Key, check.BytesEquals, []byte("key1"))
c.Assert(msg.Value, check.IsNil)
c.Assert(msg.Ts, check.Equals, uint64(1234))
c.Assert(msg.Type, check.Equals, model.MqMessageTypeResolved)
c.Assert(msg.Schema, check.IsNil)
c.Assert(msg.Table, check.IsNil)
c.Assert(msg.Protocol, check.Equals, ProtocolCanal)
c.Assert(msg.Protocol, check.Equals, config.ProtocolCanal)
}
12 changes: 6 additions & 6 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/zap"

"github.com/pingcap/ticdc/cdc/model"
)

const (
Expand Down Expand Up @@ -398,7 +398,7 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er
valueBuf := new(bytes.Buffer)
valueBuf.Write(valueLenByte[:])

ret := newResolvedMQMessage(ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), ts)
ret := newResolvedMQMessage(config.ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), ts)
return ret, nil
}

Expand Down Expand Up @@ -442,7 +442,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)

d.messageBuf = append(d.messageBuf, NewMQMessage(ProtocolDefault, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil))
d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolDefault, versionHead, nil, 0, model.MqMessageTypeRow, nil, nil))
d.curBatchSize = 0
}

Expand Down Expand Up @@ -501,7 +501,7 @@ func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, e
valueBuf.Write(valueLenByte[:])
valueBuf.Write(value)

ret := newDDLMQMessage(ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), e)
ret := newDDLMQMessage(config.ProtocolDefault, keyBuf.Bytes(), valueBuf.Bytes(), e)
return ret, nil
}

Expand All @@ -512,7 +512,7 @@ func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
return nil
}
/* there could be multiple types of event encoded within a single message which means the type is not sure */
ret := NewMQMessage(ProtocolDefault, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown, nil, nil)
ret := NewMQMessage(config.ProtocolDefault, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeUnknown, nil, nil)
return []*MQMessage{ret}
}

Expand Down
5 changes: 3 additions & 2 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
model2 "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -287,7 +288,7 @@ func (d *MaxwellEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage
return nil, errors.Trace(err)
}

return newDDLMQMessage(ProtocolMaxwell, key, value, e), nil
return newDDLMQMessage(config.ProtocolMaxwell, key, value, e), nil
}

// Build implements the EventBatchEncoder interface
Expand All @@ -296,7 +297,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage {
return nil
}

ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
d.Reset()
return []*MQMessage{ret}
}
Expand Down
Loading

0 comments on commit 279cb04

Please sign in to comment.