Skip to content

Commit

Permalink
sink: ignore the empty row event (#3144)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Oct 27, 2021
1 parent 0a50f8b commit 7b49a53
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 32 deletions.
7 changes: 7 additions & 0 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicE

colLen := len(event.Row.Columns)
preColLen := len(event.Row.PreColumns)
// Some transactions could generate empty row change event, such as
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events
if colLen == 0 && preColLen == 0 {
return nil
}

config := ctx.ChangefeedVars().Info.Config

// This indicates that it is an update event,
Expand Down
92 changes: 88 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,43 @@ func (s *outputSuite) TestManyTs(c *check.C) {
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{CommitTs: 1}}), nil)), check.IsNil)
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{
CommitTs: 1,
Columns: []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: model.HandleKeyFlag,
Value: "col2-value",
},
},
},
}), nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{CommitTs: 2}}), nil)), check.IsNil)
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{
CommitTs: 2,
Columns: []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: model.HandleKeyFlag,
Value: "col2-value",
},
},
},
}), nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand All @@ -283,8 +315,40 @@ func (s *outputSuite) TestManyTs(c *check.C) {
resolvedTs model.Ts
row *model.RowChangedEvent
}{
{row: &model.RowChangedEvent{CommitTs: 1}},
{row: &model.RowChangedEvent{CommitTs: 2}},
{
row: &model.RowChangedEvent{
CommitTs: 1,
Columns: []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: model.HandleKeyFlag,
Value: "col2-value",
},
},
},
},
{
row: &model.RowChangedEvent{
CommitTs: 2,
Columns: []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col2",
Flag: model.HandleKeyFlag,
Value: "col2-value",
},
},
},
},
{resolvedTs: 1},
})
sink.Reset()
Expand All @@ -304,6 +368,26 @@ func (s *outputSuite) TestManyTs(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(2))
}

func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test-ignore-empty-row-change-event",
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// empty row, no Columns and PreColumns.
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{CommitTs: 1}}), nil)), check.IsNil)
c.Assert(node.eventBuffer, check.HasLen, 0)
}

func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,6 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
// Some transactions could generate empty row change event, such as
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events
if len(e.Columns) == 0 && len(e.PreColumns) == 0 {
return EncoderNoOperation, nil
}
keyMsg, valueMsg := rowEventToMqMessage(e)
key, err := keyMsg.Encode()
if err != nil {
Expand Down
22 changes: 0 additions & 22 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,28 +322,6 @@ func (s *batchSuite) TestMaxBatchSize(c *check.C) {
c.Check(sum, check.Equals, 10000)
}

func (s *batchSuite) TestEmptyMessage(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)

emptyEvent := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{},
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(emptyEvent)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}

messages := encoder.Build()
c.Assert(messages, check.HasLen, 0)
}

func (s *batchSuite) TestDefaultEventBatchCodec(c *check.C) {
defer testleak.AfterTest(c)()
s.testBatchCodec(c, func() EventBatchEncoder {
Expand Down

0 comments on commit 7b49a53

Please sign in to comment.