Skip to content

Commit

Permalink
sink/mq: fix empty value in open protocol (#2615) (#2621)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 27, 2021
1 parent 844ba0a commit ebf7710
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
// Some transactions could generate empty row change event, such as
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events
if len(e.Columns) == 0 && len(e.PreColumns) == 0 {
return EncoderNoOperation, nil
}
keyMsg, valueMsg := rowEventToMqMessage(e)
key, err := keyMsg.Encode()
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,28 @@ func (s *batchSuite) TestMaxBatchSize(c *check.C) {
c.Check(sum, check.Equals, 10000)
}

func (s *batchSuite) TestEmptyMessage(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)

emptyEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{},
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(emptyEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages := encoder.Build()
c.Assert(messages, check.HasLen, 0)
}

func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) {
defer testleak.AfterTest(c)()
s.testBatchCodec(c, func() EventBatchEncoder {
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
},
StartTs: 100,
CommitTs: 120,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}
err = sink.EmitRowChangedEvents(ctx, row)
c.Assert(err, check.IsNil)
Expand Down

0 comments on commit ebf7710

Please sign in to comment.