From d1149b7bc07d214f75643f30382761588b90d8df Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 24 Sep 2020 11:24:09 +0800 Subject: [PATCH] sink: fix Canal sink bugs (#966) --- cdc/sink/codec/canal.go | 26 +++++++++++++++++++------- cdc/sink/codec/canal_test.go | 6 ++++++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 39ce8a40ee7..d3092889944 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -216,7 +216,7 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowData, error) { var columns []*canal.Column for _, column := range e.Columns { - if e == nil { + if column == nil { continue } c, err := b.buildColumn(column, column.Name, !e.IsDelete()) @@ -227,7 +227,7 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowDa } var preColumns []*canal.Column for _, column := range e.PreColumns { - if e == nil { + if column == nil { continue } c, err := b.buildColumn(column, column.Name, !e.IsDelete()) @@ -374,16 +374,22 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, // Build implements the EventBatchEncoder interface func (d *CanalEventBatchEncoder) Build() []*MQMessage { + if len(d.messages.Messages) == 0 { + return nil + } + err := d.refreshPacketBody() if err != nil { log.Fatal("Error when generating Canal packet", zap.Error(err)) } + value, err := proto.Marshal(d.packet) if err != nil { log.Fatal("Error when serializing Canal packet", zap.Error(err)) } ret := NewMQMessage(nil, value, 0) d.messages.Reset() + d.resetPacket() return []*MQMessage{ret} } @@ -414,24 +420,30 @@ func (d *CanalEventBatchEncoder) refreshPacketBody() error { if newSize > oldSize { // resize packet body slice d.packet.Body = append(d.packet.Body, make([]byte, newSize-oldSize)...) + } else { + d.packet.Body = d.packet.Body[:newSize] } - _, err := d.messages.MarshalToSizedBuffer(d.packet.Body[:newSize]) + + _, err := d.messages.MarshalToSizedBuffer(d.packet.Body) return err } -// NewCanalEventBatchEncoder creates a new CanalEventBatchEncoder. -func NewCanalEventBatchEncoder() EventBatchEncoder { - p := &canal.Packet{ +func (d *CanalEventBatchEncoder) resetPacket() { + d.packet = &canal.Packet{ VersionPresent: &canal.Packet_Version{ Version: CanalPacketVersion, }, Type: canal.PacketType_MESSAGES, } +} +// NewCanalEventBatchEncoder creates a new CanalEventBatchEncoder. +func NewCanalEventBatchEncoder() EventBatchEncoder { encoder := &CanalEventBatchEncoder{ messages: &canal.Messages{}, - packet: p, entryBuilder: NewCanalEntryBuilder(), } + + encoder.resetPacket() return encoder } diff --git a/cdc/sink/codec/canal_test.go b/cdc/sink/codec/canal_test.go index 9a5bb66d114..024d25770d8 100644 --- a/cdc/sink/codec/canal_test.go +++ b/cdc/sink/codec/canal_test.go @@ -91,6 +91,12 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) { } size := encoder.Size() res := encoder.Build() + + if len(cs) == 0 { + c.Assert(res, check.IsNil) + continue + } + c.Assert(res, check.HasLen, 1) c.Assert(res[0].Key, check.IsNil) c.Assert(len(res[0].Value), check.Equals, size)