From 2aa7c041886389fc04c2d2cfa3a4a7a28df4e1a9 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 27 Oct 2021 14:55:21 +0800 Subject: [PATCH 1/3] sink: ignore the empty row event --- cdc/processor/pipeline/sink.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 30570519dcc..15b28cb94ed 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -157,6 +157,13 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE colLen := len(event.Row.Columns) preColLen := len(event.Row.PreColumns) + // Some transactions could generate empty row change event, such as + // begin; insert into t (id) values (1); delete from t where id=1; commit; + // Just ignore these row changed events + if colLen == 0 && preColLen == 0 { + return nil + } + config := ctx.ChangefeedVars().Info.Config // This indicates that it is an update event, From 7582fac57e34d90a13ac6af844282cd6c82a1151 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 27 Oct 2021 15:42:58 +0800 Subject: [PATCH 2/3] sink: fix test --- cdc/processor/pipeline/sink_test.go | 72 +++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 4 deletions(-) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index d2abf134a98..74a42849acd 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -263,11 +263,43 @@ func (s *outputSuite) TestManyTs(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{CommitTs: 1}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{ + CommitTs: 1, + Columns: []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + }, + }, + }), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{CommitTs: 2}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{ + CommitTs: 2, + Columns: []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + }, + }, + }), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -283,8 +315,40 @@ func (s *outputSuite) TestManyTs(c *check.C) { resolvedTs model.Ts row *model.RowChangedEvent }{ - {row: &model.RowChangedEvent{CommitTs: 1}}, - {row: &model.RowChangedEvent{CommitTs: 2}}, + { + row: &model.RowChangedEvent{ + CommitTs: 1, + Columns: []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + }, + }, + }, + { + row: &model.RowChangedEvent{ + CommitTs: 2, + Columns: []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + }, + }, + }, {resolvedTs: 1}, }) sink.Reset() From 34e64b9106bf164b15bca4809af24c5c9cfc7b32 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 27 Oct 2021 16:25:48 +0800 Subject: [PATCH 3/3] test: add TestIgnoreEmptyRowChangeEvent test --- cdc/processor/pipeline/sink_test.go | 20 ++++++++++++++++++++ cdc/sink/codec/json.go | 6 ------ cdc/sink/codec/json_test.go | 22 ---------------------- 3 files changed, 20 insertions(+), 28 deletions(-) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 74a42849acd..12ba3bb1fa5 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -368,6 +368,26 @@ func (s *outputSuite) TestManyTs(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(2)) } +func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-ignore-empty-row-change-event", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + }, + }) + sink := &mockSink{} + node := newSinkNode(sink, 0, 10, &mockFlowController{}) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) + + // empty row, no Columns and PreColumns. + c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{CommitTs: 1}}), nil)), check.IsNil) + c.Assert(node.eventBuffer, check.HasLen, 0) +} + func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index baec90b37af..a16b39f0ecf 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -403,12 +403,6 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er // AppendRowChangedEvent implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) { - // Some transactions could generate empty row change event, such as - // begin; insert into t (id) values (1); delete from t where id=1; commit; - // Just ignore these row changed events - if len(e.Columns) == 0 && len(e.PreColumns) == 0 { - return EncoderNoOperation, nil - } keyMsg, valueMsg := rowEventToMqMessage(e) key, err := keyMsg.Encode() if err != nil { diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 6561496e540..497434fd662 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -322,28 +322,6 @@ func (s *batchSuite) TestMaxBatchSize(c *check.C) { c.Check(sum, check.Equals, 10000) } -func (s *batchSuite) TestEmptyMessage(c *check.C) { - defer testleak.AfterTest(c)() - encoder := NewJSONEventBatchEncoder() - err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) - c.Check(err, check.IsNil) - - emptyEvent := &model.RowChangedEvent{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{}, - } - - for i := 0; i < 10000; i++ { - r, err := encoder.AppendRowChangedEvent(emptyEvent) - c.Check(r, check.Equals, EncoderNoOperation) - c.Check(err, check.IsNil) - } - - messages := encoder.Build() - c.Assert(messages, check.HasLen, 0) -} - func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) { defer testleak.AfterTest(c)() s.testBatchCodec(c, func() EventBatchEncoder {