Skip to content

Commit

Permalink
mq (ticdc): mq sink manage checkpoint ts per table (pingcap#3746)
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored and sdojjy committed Dec 27, 2021
1 parent dc74082 commit a4f78ac
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 8 deletions.
16 changes: 8 additions & 8 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ type mqSink struct {
partitionNum int32
partitionInput []chan mqEvent
partitionResolvedTs []uint64

checkpointTs uint64
resolvedNotifier *notify.Notifier
resolvedReceiver *notify.Receiver
tableCheckpointTs map[model.TableID]uint64
resolvedNotifier *notify.Notifier
resolvedReceiver *notify.Receiver

statistics *Statistics
}
Expand Down Expand Up @@ -111,6 +110,7 @@ func newMqSink(
partitionNum: partitionNum,
partitionInput: partitionInput,
partitionResolvedTs: make([]uint64, partitionNum),
tableCheckpointTs: make(map[model.TableID]uint64),
resolvedNotifier: notifier,
resolvedReceiver: resolvedReceiver,

Expand Down Expand Up @@ -154,8 +154,8 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
}

func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs <= k.checkpointTs {
return k.checkpointTs, nil
if checkpointTs, ok := k.tableCheckpointTs[tableID]; ok && resolvedTs <= checkpointTs {
return checkpointTs, nil
}

for i := 0; i < int(k.partitionNum); i++ {
Expand Down Expand Up @@ -188,9 +188,9 @@ flushLoop:
if err != nil {
return 0, errors.Trace(err)
}
k.checkpointTs = resolvedTs
k.tableCheckpointTs[tableID] = resolvedTs
k.statistics.PrintStatus(ctx)
return k.checkpointTs, nil
return resolvedTs, nil
}

func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand Down
108 changes: 108 additions & 0 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,111 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) {
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304)
}

func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) {
defer testleak.AfterTest(c)()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

topic := kafka.DefaultMockTopicName
leader := sarama.NewMockBroker(c, 1)
defer leader.Close()

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)
leader.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError)

uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=default"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
replicaConfig := config.GetDefaultReplicaConfig()
fr, err := filter.NewFilter(replicaConfig)
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

kafkap.NewSaramaAdminClientImpl = kafka.NewMockAdminClient
defer func() {
kafkap.NewSaramaAdminClientImpl = kafka.NewSaramaAdminClient
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
tableID1 := model.TableID(1)
row1 := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t1",
TableID: tableID1,
},
StartTs: 100,
CommitTs: 120,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}
err = sink.EmitRowChangedEvents(ctx, row1)
c.Assert(err, check.IsNil)

tableID2 := model.TableID(2)
row2 := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t2",
TableID: tableID2,
},
StartTs: 90,
CommitTs: 110,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}
err = sink.EmitRowChangedEvents(ctx, row2)
c.Assert(err, check.IsNil)

tableID3 := model.TableID(3)
row3 := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t3",
TableID: tableID3,
},
StartTs: 110,
CommitTs: 130,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

err = sink.EmitRowChangedEvents(ctx, row3)
c.Assert(err, check.IsNil)

// mock kafka broker processes 1 row resolvedTs event
leader.Returns(prodSuccess)
checkpointTs1, err := sink.FlushRowChangedEvents(ctx, tableID1, row1.CommitTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs1, check.Equals, row1.CommitTs)

checkpointTs2, err := sink.FlushRowChangedEvents(ctx, tableID2, row2.CommitTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs2, check.Equals, row2.CommitTs)

checkpointTs3, err := sink.FlushRowChangedEvents(ctx, tableID3, row3.CommitTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs3, check.Equals, row3.CommitTs)

// flush older resolved ts
checkpointTsOld, err := sink.FlushRowChangedEvents(ctx, tableID1, uint64(110))
c.Assert(err, check.IsNil)
c.Assert(checkpointTsOld, check.Equals, row1.CommitTs)

err = sink.Close(ctx)
if err != nil {
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
}
}

0 comments on commit a4f78ac

Please sign in to comment.