Skip to content

Commit

Permalink
Merge branch 'release-5.2' into cherry-pick-4474-to-release-5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Feb 24, 2022
2 parents 0b2c184 + 19b9177 commit 479b7fd
Show file tree
Hide file tree
Showing 13 changed files with 490 additions and 226 deletions.
6 changes: 3 additions & 3 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"go.uber.org/zap"
)

// newBlackHoleSink creates a block hole sink
func newBlackHoleSink(ctx context.Context, opts map[string]string) *blackHoleSink {
// newBlackHoleSink creates a black hole sink
func newBlackHoleSink(ctx context.Context) *blackHoleSink {
return &blackHoleSink{
statistics: NewStatistics(ctx, "blackhole", opts),
statistics: NewStatistics(ctx, "blackhole"),
}
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestFlushTable(t *testing.T) {
defer func() {
cancel()
}()
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg))
require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
tbl1 := &model.TableName{TableID: 1}
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestFlushTable(t *testing.T) {

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg))
checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newMqSink(
resolvedNotifier: notifier,
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ", opts),
statistics: NewStatistics(ctx, "MQ"),
}

go func() {
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func newMySQLSink(
params: params,
filter: filter,
txnCache: common.NewUnresolvedTxnCache(),
statistics: NewStatistics(ctx, "mysql", opts),
statistics: NewStatistics(ctx, "mysql"),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
metricBucketSizeCounters: metricBucketSizeCounters,
errCh: make(chan error, 1),
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newMySQLSink4Test(ctx context.Context, t *testing.T) *mysqlSink {
return &mysqlSink{
txnCache: common.NewUnresolvedTxnCache(),
filter: f,
statistics: NewStatistics(ctx, "test", make(map[string]string)),
statistics: NewStatistics(ctx, "test"),
params: params,
}
}
Expand Down
243 changes: 122 additions & 121 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/kafka"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -397,37 +398,122 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}
}

func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error {
// FIXME: find a way to remove this failpoint for workload the unit test
failpoint.Inject("SkipTopicAutoCreate", func() {
failpoint.Return(nil)
})
admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig)
var (
newSaramaConfigImpl = newSaramaConfig
// NewAdminClientImpl specifies the build method for the admin client.
NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient
)

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return nil, err
}

admin, err := NewAdminClientImpl(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
defer func() {
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
}
}()

if err := validateAndCreateTopic(admin, topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes)

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &kafkaSaramaProducer{
asyncClient: asyncClient,
syncClient: syncClient,
topic: topic,
partitionNum: config.PartitionNum,
partitionOffset: make([]struct {
flushed uint64
sent uint64
}, config.PartitionNum),
flushedNotifier: notifier,
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
closing: kafkaProducerRunning,
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
select {
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
}
}
}()
return k, nil
}

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
}

var (
validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
commonInvalidChar = regexp.MustCompile(`[\?:,"]`)
)

func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) {
if configuredClientID != "" {
clientID = configuredClientID
} else {
clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID)
clientID = commonInvalidChar.ReplaceAllString(clientID, "_")
}
if !validClientID.MatchString(clientID) {
return "", cerror.ErrKafkaInvalidClientID.GenWithStackByArgs(clientID)
}
return
}

func validateAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config) error {
topics, err := admin.ListTopics()
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

info, created := topics[topic]
info, exists := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if created {
if exists {
// make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes`
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
topicMaxMessageBytesStr, err := getTopicConfig(admin, info, kafka.TopicMaxMessageBytesConfigName,
kafka.BrokerMessageMaxBytesConfigName)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
topicMaxMessageBytes, err := strconv.Atoi(topicMaxMessageBytesStr)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if topicMaxMessageBytes < config.MaxMessageBytes {
log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+
log.Warn("topic's `max.message.bytes` less than the `max-message-bytes`,"+
"use topic's `max.message.bytes` to initialize the Kafka producer",
zap.Int("max.message.bytes", topicMaxMessageBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
Expand All @@ -440,7 +526,7 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config)
zap.String("topic", topic), zap.Any("detail", info))
}

if err := config.adjustPartitionNum(info.NumPartitions); err != nil {
if err := config.setPartitionNum(info.NumPartitions); err != nil {
return errors.Trace(err)
}

Expand All @@ -451,25 +537,29 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config)
return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found")
}

brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
brokerMessageMaxBytesStr, err := getBrokerConfig(admin, kafka.BrokerMessageMaxBytesConfigName)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}
brokerMessageMaxBytes, err := strconv.Atoi(brokerMessageMaxBytesStr)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

// when create the topic, `max.message.bytes` is decided by the broker,
// it would use broker's `message.max.bytes` to set topic's `max.message.bytes`.
// TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// broker's `message.max.bytes`.
if brokerMessageMaxBytes < config.MaxMessageBytes {
log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+
log.Warn("broker's `message.max.bytes` less than the `max-message-bytes`,"+
"use broker's `message.max.bytes` to initialize the Kafka producer",
zap.Int("message.max.bytes", brokerMessageMaxBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes
}

// topic not created yet, and user does not specify the `partition-num` in the sink uri.
// topic not exists yet, and user does not specify the `partition-num` in the sink uri.
if config.PartitionNum == 0 {
config.PartitionNum = defaultPartitionNum
log.Warn("partition-num is not set, use the default partition count",
Expand All @@ -492,87 +582,6 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config)
return nil
}

var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}

if err := topicPreProcess(topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes)

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &kafkaSaramaProducer{
asyncClient: asyncClient,
syncClient: syncClient,
topic: topic,
partitionNum: config.PartitionNum,
partitionOffset: make([]struct {
flushed uint64
sent uint64
}, config.PartitionNum),
flushedNotifier: notifier,
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
closing: kafkaProducerRunning,
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
select {
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
}
}
}()
return k, nil
}

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
}

var (
validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
commonInvalidChar = regexp.MustCompile(`[\?:,"]`)
)

func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) {
if configuredClientID != "" {
clientID = configuredClientID
} else {
clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID)
clientID = commonInvalidChar.ReplaceAllString(clientID, "_")
}
if !validClientID.MatchString(clientID) {
return "", cerror.ErrKafkaInvalidClientID.GenWithStackByArgs(clientID)
}
return
}

// NewSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()

Expand Down Expand Up @@ -666,50 +675,42 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
return config, err
}

func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) {
target := "message.max.bytes"
// getBrokerConfig gets broker config by name.
func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (string, error) {
_, controllerID, err := admin.DescribeCluster()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return "", err
}

configEntries, err := admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(int(controllerID)),
ConfigNames: []string{target},
ConfigNames: []string{brokerConfigName},
})
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return "", err
}

if len(configEntries) == 0 || configEntries[0].Name != target {
return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack(
"since cannot find the `message.max.bytes` from the broker's configuration, " +
"ticdc decline to create the topic and changefeed to prevent potential error")
if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
return "", errors.New(fmt.Sprintf(
"cannot find the `%s` from the broker's configuration", brokerConfigName))
}

result, err := strconv.Atoi(configEntries[0].Value)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

return result, nil
return configEntries[0].Value, nil
}

func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) {
if a, ok := info.ConfigEntries["max.message.bytes"]; ok {
result, err := strconv.Atoi(*a)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
return result, nil
// getTopicConfig gets topic config by name.
// If the topic does not have this configuration, we will try to get it from the broker's configuration.
// NOTICE: The configuration names of topic and broker may be different for the same configuration.
func getTopicConfig(admin kafka.ClusterAdminClient, detail sarama.TopicDetail, topicConfigName string, brokerConfigName string) (string, error) {
if a, ok := detail.ConfigEntries[topicConfigName]; ok {
return *a, nil
}

return getBrokerMessageMaxBytes(admin)
return getBrokerConfig(admin, brokerConfigName)
}

// adjust the partition-num by the topic's partition count
func (c *Config) adjustPartitionNum(realPartitionCount int32) error {
func (c *Config) setPartitionNum(realPartitionCount int32) error {
// user does not specify the `partition-num` in the sink-uri
if c.PartitionNum == 0 {
c.PartitionNum = realPartitionCount
Expand Down
Loading

0 comments on commit 479b7fd

Please sign in to comment.