Skip to content

Commit

Permalink
cdc/sink: mysql sink manage checkpoint per table (pingcap#3645)
Browse files Browse the repository at this point in the history
# Conflicts:
#	cdc/sink/common/common_test.go
#	cdc/sink/mysql_params_test.go
#	cdc/sink/mysql_test.go
#	cdc/sink/mysql_worker.go
#	cdc/sink/mysql_worker_test.go
#	tests/integration_tests/sink_hang/run.sh
  • Loading branch information
sdojjy committed Dec 27, 2021
1 parent a4f78ac commit 8e34358
Showing 1 changed file with 0 additions and 108 deletions.
108 changes: 0 additions & 108 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,111 +223,3 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) {
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304)
}

func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) {
defer testleak.AfterTest(c)()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

topic := kafka.DefaultMockTopicName
leader := sarama.NewMockBroker(c, 1)
defer leader.Close()

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)
leader.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError)

uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=default"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
replicaConfig := config.GetDefaultReplicaConfig()
fr, err := filter.NewFilter(replicaConfig)
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

kafkap.NewSaramaAdminClientImpl = kafka.NewMockAdminClient
defer func() {
kafkap.NewSaramaAdminClientImpl = kafka.NewSaramaAdminClient
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
tableID1 := model.TableID(1)
row1 := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t1",
TableID: tableID1,
},
StartTs: 100,
CommitTs: 120,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}
err = sink.EmitRowChangedEvents(ctx, row1)
c.Assert(err, check.IsNil)

tableID2 := model.TableID(2)
row2 := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t2",
TableID: tableID2,
},
StartTs: 90,
CommitTs: 110,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}
err = sink.EmitRowChangedEvents(ctx, row2)
c.Assert(err, check.IsNil)

tableID3 := model.TableID(3)
row3 := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t3",
TableID: tableID3,
},
StartTs: 110,
CommitTs: 130,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}

err = sink.EmitRowChangedEvents(ctx, row3)
c.Assert(err, check.IsNil)

// mock kafka broker processes 1 row resolvedTs event
leader.Returns(prodSuccess)
checkpointTs1, err := sink.FlushRowChangedEvents(ctx, tableID1, row1.CommitTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs1, check.Equals, row1.CommitTs)

checkpointTs2, err := sink.FlushRowChangedEvents(ctx, tableID2, row2.CommitTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs2, check.Equals, row2.CommitTs)

checkpointTs3, err := sink.FlushRowChangedEvents(ctx, tableID3, row3.CommitTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs3, check.Equals, row3.CommitTs)

// flush older resolved ts
checkpointTsOld, err := sink.FlushRowChangedEvents(ctx, tableID1, uint64(110))
c.Assert(err, check.IsNil)
c.Assert(checkpointTsOld, check.Equals, row1.CommitTs)

err = sink.Close(ctx)
if err != nil {
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
}
}

0 comments on commit 8e34358

Please sign in to comment.