diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 1eac1a015d7..d72e3ccf224 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -395,6 +395,12 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er // AppendRowChangedEvent implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) { + // Some transactions could generate empty row change event, such as + // begin; insert into t (id) values (1); delete from t where id=1; commit; + // Just ignore these row changed events + if len(e.Columns) == 0 && len(e.PreColumns) == 0 { + return EncoderNoOperation, nil + } keyMsg, valueMsg := rowEventToMqMessage(e) key, err := keyMsg.Encode() if err != nil { diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 5c898227a21..f84afd16e80 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -304,6 +304,28 @@ func (s *batchSuite) TestMaxBatchSize(c *check.C) { c.Check(sum, check.Equals, 10000) } +func (s *batchSuite) TestEmptyMessage(c *check.C) { + defer testleak.AfterTest(c)() + encoder := NewJSONEventBatchEncoder() + err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) + c.Check(err, check.IsNil) + + emptyEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{}, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(emptyEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + c.Assert(messages, check.HasLen, 0) +} + func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) { defer testleak.AfterTest(c)() s.testBatchCodec(c, func() EventBatchEncoder {