diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 268b625724f..92a1df7627c 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -58,10 +58,9 @@ type mqSink struct { partitionNum int32 partitionInput []chan mqEvent partitionResolvedTs []uint64 - - checkpointTs uint64 - resolvedNotifier *notify.Notifier - resolvedReceiver *notify.Receiver + tableCheckpointTs map[model.TableID]uint64 + resolvedNotifier *notify.Notifier + resolvedReceiver *notify.Receiver statistics *Statistics } @@ -108,6 +107,7 @@ func newMqSink( partitionNum: partitionNum, partitionInput: partitionInput, partitionResolvedTs: make([]uint64, partitionNum), + tableCheckpointTs: make(map[model.TableID]uint64), resolvedNotifier: notifier, resolvedReceiver: resolvedReceiver, @@ -151,8 +151,8 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha } func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { - if resolvedTs <= k.checkpointTs { - return k.checkpointTs, nil + if checkpointTs, ok := k.tableCheckpointTs[tableID]; ok && resolvedTs <= checkpointTs { + return checkpointTs, nil } for i := 0; i < int(k.partitionNum); i++ { @@ -185,9 +185,9 @@ flushLoop: if err != nil { return 0, errors.Trace(err) } - k.checkpointTs = resolvedTs + k.tableCheckpointTs[tableID] = resolvedTs k.statistics.PrintStatus(ctx) - return k.checkpointTs, nil + return resolvedTs, nil } func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index d1628ed1d94..beb1c09f40a 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -236,3 +236,111 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) { c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1) c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304) } + +func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { + defer testleak.AfterTest(c)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + topic := kafka.DefaultMockTopicName + leader := sarama.NewMockBroker(c, 1) + defer leader.Close() + + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + leader.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) + + uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + + "&max-message-bytes=1048576&partition-num=1" + + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=default" + uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + replicaConfig := config.GetDefaultReplicaConfig() + fr, err := filter.NewFilter(replicaConfig) + c.Assert(err, check.IsNil) + opts := map[string]string{} + errCh := make(chan error, 1) + + kafkap.NewSaramaAdminClientImpl = kafka.NewMockAdminClient + defer func() { + kafkap.NewSaramaAdminClientImpl = kafka.NewSaramaAdminClient + }() + + sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) + c.Assert(err, check.IsNil) + + // mock kafka broker processes 1 row changed event + leader.Returns(prodSuccess) + tableID1 := model.TableID(1) + row1 := &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t1", + TableID: tableID1, + }, + StartTs: 100, + CommitTs: 120, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + } + err = sink.EmitRowChangedEvents(ctx, row1) + c.Assert(err, check.IsNil) + + tableID2 := model.TableID(2) + row2 := &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t2", + TableID: tableID2, + }, + StartTs: 90, + CommitTs: 110, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + } + err = sink.EmitRowChangedEvents(ctx, row2) + c.Assert(err, check.IsNil) + + tableID3 := model.TableID(3) + row3 := &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t3", + TableID: tableID3, + }, + StartTs: 110, + CommitTs: 130, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, + } + + err = sink.EmitRowChangedEvents(ctx, row3) + c.Assert(err, check.IsNil) + + // mock kafka broker processes 1 row resolvedTs event + leader.Returns(prodSuccess) + checkpointTs1, err := sink.FlushRowChangedEvents(ctx, tableID1, row1.CommitTs) + c.Assert(err, check.IsNil) + c.Assert(checkpointTs1, check.Equals, row1.CommitTs) + + checkpointTs2, err := sink.FlushRowChangedEvents(ctx, tableID2, row2.CommitTs) + c.Assert(err, check.IsNil) + c.Assert(checkpointTs2, check.Equals, row2.CommitTs) + + checkpointTs3, err := sink.FlushRowChangedEvents(ctx, tableID3, row3.CommitTs) + c.Assert(err, check.IsNil) + c.Assert(checkpointTs3, check.Equals, row3.CommitTs) + + // flush older resolved ts + checkpointTsOld, err := sink.FlushRowChangedEvents(ctx, tableID1, uint64(110)) + c.Assert(err, check.IsNil) + c.Assert(checkpointTsOld, check.Equals, row1.CommitTs) + + err = sink.Close(ctx) + if err != nil { + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + } +}