From 1d4209c7b3084b3e8544ed3ee0c61affff7b9cfa Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 25 Aug 2021 20:00:04 +0800 Subject: [PATCH] sink/mq: fix empty value in open protocol (#2615) (#2622) --- cdc/sink/codec/json.go | 6 ++++++ cdc/sink/codec/json_test.go | 22 ++++++++++++++++++++++ cdc/sink/mq_test.go | 1 + 3 files changed, 29 insertions(+) diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 1eac1a015d7..d72e3ccf224 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -395,6 +395,12 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er // AppendRowChangedEvent implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) { + // Some transactions could generate empty row change event, such as + // begin; insert into t (id) values (1); delete from t where id=1; commit; + // Just ignore these row changed events + if len(e.Columns) == 0 && len(e.PreColumns) == 0 { + return EncoderNoOperation, nil + } keyMsg, valueMsg := rowEventToMqMessage(e) key, err := keyMsg.Encode() if err != nil { diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 5c898227a21..f84afd16e80 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -304,6 +304,28 @@ func (s *batchSuite) TestMaxBatchSize(c *check.C) { c.Check(sum, check.Equals, 10000) } +func (s *batchSuite) TestEmptyMessage(c *check.C) { + defer testleak.AfterTest(c)() + encoder := NewJSONEventBatchEncoder() + err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) + c.Check(err, check.IsNil) + + emptyEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{}, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(emptyEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + c.Assert(messages, check.HasLen, 0) +} + func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) { defer testleak.AfterTest(c)() s.testBatchCodec(c, func() EventBatchEncoder { diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index a87946d50d2..40421fb3c30 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -79,6 +79,7 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { }, StartTs: 100, CommitTs: 120, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } err = sink.EmitRowChangedEvents(ctx, row) c.Assert(err, check.IsNil)