diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 569d1015974..9e658ba3412 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -35,8 +35,6 @@ import ( const ( // BatchVersion1 represents the version of batch format BatchVersion1 uint64 = 1 - // DefaultMaxMessageBytes sets the default value for max-message-bytes - DefaultMaxMessageBytes int = 1 * 1024 * 1024 // 1M // DefaultMaxBatchSize sets the default value for max-batch-size DefaultMaxBatchSize int = 16 ) @@ -317,13 +315,13 @@ type JSONEventBatchEncoder struct { messageBuf []*MQMessage curBatchSize int // configs - maxKafkaMessageSize int - maxBatchSize int + maxMessageBytes int + maxBatchSize int } -// GetMaxKafkaMessageSize is only for unit testing. -func (d *JSONEventBatchEncoder) GetMaxKafkaMessageSize() int { - return d.maxKafkaMessageSize +// GetMaxMessageBytes is only for unit testing. +func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int { + return d.maxMessageBytes } // GetMaxBatchSize is only for unit testing. @@ -402,15 +400,15 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) // for single message that longer than max-message-size, do not send it. // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` length := len(key) + len(value) + maximumRecordOverhead + 16 + 8 - if length > d.maxKafkaMessageSize { + if length > d.maxMessageBytes { log.Warn("Single message too large", - zap.Int("max-message-size", d.maxKafkaMessageSize), zap.Int("length", length), zap.Any("table", e.Table)) + zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table)) return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs() } if len(d.messageBuf) == 0 || d.curBatchSize >= d.maxBatchSize || - d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxKafkaMessageSize { + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes { versionHead := make([]byte, 8) binary.BigEndian.PutUint64(versionHead, BatchVersion1) @@ -428,10 +426,10 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) message.Schema = &e.Table.Schema message.Table = &e.Table.Table - if message.Length() > d.maxKafkaMessageSize { + if message.Length() > d.maxMessageBytes { // `len(d.messageBuf) == 1` is implied log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", - zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize)) + zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes)) } d.curBatchSize++ } @@ -549,17 +547,19 @@ func (d *JSONEventBatchEncoder) Reset() { // SetParams reads relevant parameters for Open Protocol func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { var err error - if maxMessageBytes, ok := params["max-message-bytes"]; ok { - d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes) - if err != nil { - return cerror.ErrKafkaInvalidConfig.Wrap(err) - } - } else { - d.maxKafkaMessageSize = DefaultMaxMessageBytes + + maxMessageBytes, ok := params["max-message-bytes"] + if !ok { + return cerror.ErrKafkaInvalidConfig.GenWithStack("max-message-bytes not found") + } + + d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - if d.maxKafkaMessageSize <= 0 { - return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxKafkaMessageSize)) + if d.maxMessageBytes <= 0 { + return cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes)) } if maxBatchSize, ok := params["max-batch-size"]; ok { diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index cdd3c128d56..495783ac731 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -19,8 +19,10 @@ import ( "testing" "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/util/testleak" ) @@ -224,10 +226,10 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco func (s *batchSuite) TestParamsEdgeCases(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder) - err := encoder.SetParams(map[string]string{}) + err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) - c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes) + c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) err = encoder.SetParams(map[string]string{"max-message-bytes": "0"}) c.Assert(err, check.ErrorMatches, ".*invalid.*") @@ -238,28 +240,50 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) { err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) - c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32) + c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxInt32) err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) - c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32) + c.Assert(encoder.maxMessageBytes, check.Equals, math.MaxUint32) - err = encoder.SetParams(map[string]string{"max-batch-size": "0"}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "0"}) c.Assert(err, check.ErrorMatches, ".*invalid.*") - err = encoder.SetParams(map[string]string{"max-batch-size": "-1"}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "-1"}) c.Assert(err, check.ErrorMatches, ".*invalid.*") - err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32) - c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes) + c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) - err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32) - c.Assert(encoder.maxKafkaMessageSize, check.Equals, DefaultMaxMessageBytes) + c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) +} + +func (s *batchSuite) TestSetParams(c *check.C) { + defer testleak.AfterTest(c) + + opts := make(map[string]string) + encoder := NewJSONEventBatchEncoder() + err := encoder.SetParams(opts) + c.Assert( + errors.Cause(err), + check.ErrorMatches, + ".*max-message-bytes not found.*", + ) + + opts["max-message-bytes"] = "1" + encoder = NewJSONEventBatchEncoder() + err = encoder.SetParams(opts) + c.Assert(err, check.IsNil) + c.Assert(encoder, check.NotNil) + jsonEncoder, ok := encoder.(*JSONEventBatchEncoder) + c.Assert(ok, check.IsTrue) + c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1) } func (s *batchSuite) TestMaxMessageBytes(c *check.C) { @@ -307,8 +331,13 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { func (s *batchSuite) TestMaxBatchSize(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder() - err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) - c.Check(err, check.IsNil) + err := encoder.SetParams(map[string]string{"max-message-bytes": "1048576", "max-batch-size": "64"}) + c.Assert(encoder, check.NotNil) + c.Assert(err, check.IsNil) + + jsonEncoder, ok := encoder.(*JSONEventBatchEncoder) + c.Assert(ok, check.IsTrue) + c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1048576) testEvent := &model.RowChangedEvent{ CommitTs: 1, diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 6edc098c829..8581b7b9d84 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -413,7 +413,11 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool { return r == '/' }) - producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh) + if topic == "" { + return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") + } + + producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) if err != nil { return nil, errors.Trace(err) } @@ -443,8 +447,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if s != "" { opts["max-batch-size"] = s } - // For now, it's a place holder. Avro format have to make connection to Schema Registery, - // and it may needs credential. + // For now, it's a placeholder. Avro format have to make connection to Schema Registry, + // and it may need credential. credential := &security.Credential{} sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh) if err != nil { diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index 31c43fd6f86..fafa6558a06 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -62,13 +62,19 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { c.Assert(err, check.IsNil) opts := map[string]string{} errCh := make(chan error, 1) + + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") + }() + sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) c.Assert(err, check.IsNil) encoder := sink.newEncoder() c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{}) c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1) - c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304) + c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304) // mock kafka broker processes 1 row changed event leader.Returns(prodSuccess) @@ -159,6 +165,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) { c.Assert(err, check.IsNil) opts := map[string]string{} errCh := make(chan error, 1) + + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") + }() + sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) c.Assert(err, check.IsNil) @@ -216,5 +228,5 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) { encoder := sink.newEncoder() c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{}) c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1) - c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxKafkaMessageSize(), check.Equals, 4194304) + c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304) } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 9b9fc650992..6a5b9465de9 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -37,11 +37,14 @@ import ( "go.uber.org/zap" ) -const defaultPartitionNum = 4 +const defaultPartitionNum = 3 -// Config stores the Kafka configuration +// Config stores user specified Kafka producer configuration type Config struct { - PartitionNum int32 + BrokerEndpoints []string + PartitionNum int32 + + // User should make sure that `replication-factor` not greater than the number of kafka brokers. ReplicationFactor int16 Version string @@ -50,8 +53,8 @@ type Config struct { ClientID string Credential *security.Credential SaslScram *security.SaslScram - // control whether to create topic and verify partition number - TopicPreProcess bool + // control whether to create topic + AutoCreate bool } // NewConfig returns a default Kafka configuration @@ -59,23 +62,28 @@ func NewConfig() *Config { return &Config{ Version: "2.4.0", // MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker. - MaxMessageBytes: 1 * 1024 * 1024, + MaxMessageBytes: config.DefaultMaxMessageBytes, ReplicationFactor: 1, Compression: "none", Credential: &security.Credential{}, SaslScram: &security.SaslScram{}, - TopicPreProcess: true, + AutoCreate: true, } } // Initialize the kafka configuration func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error { - s := sinkURI.Query().Get("partition-num") + c.BrokerEndpoints = strings.Split(sinkURI.Host, ",") + params := sinkURI.Query() + s := params.Get("partition-num") if s != "" { a, err := strconv.Atoi(s) if err != nil { return err } + if a <= 0 { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(a) + } c.PartitionNum = int32(a) } @@ -156,7 +164,7 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi if err != nil { return err } - c.TopicPreProcess = autoCreate + c.AutoCreate = autoCreate } return nil @@ -379,85 +387,125 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { } } -// kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't -// exit, creates it automatically. -func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) { - admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg) +func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error { + // FIXME: find a way to remove this failpoint for workload the unit test + failpoint.Inject("SkipTopicAutoCreate", func() { + failpoint.Return(nil) + }) + admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig) if err != nil { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } defer func() { - err := admin.Close() - if err != nil { - log.Warn("close admin client failed", zap.Error(err)) + if err := admin.Close(); err != nil { + log.Warn("close kafka cluster admin failed", zap.Error(err)) } }() + topics, err := admin.ListTopics() if err != nil { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - partitionNum := config.PartitionNum - topicDetail, exist := topics[topic] - if exist { - log.Info("get partition number of topic", zap.String("topic", topic), zap.Int32("partition_num", topicDetail.NumPartitions)) - if partitionNum == 0 { - partitionNum = topicDetail.NumPartitions - } else if partitionNum < topicDetail.NumPartitions { - log.Warn("partition number assigned in sink-uri is less than that of topic", zap.Int32("topic partition num", topicDetail.NumPartitions)) - } else if partitionNum > topicDetail.NumPartitions { - return 0, cerror.ErrKafkaInvalidPartitionNum.GenWithStack( - "partition number(%d) assigned in sink-uri is more than that of topic(%d)", partitionNum, topicDetail.NumPartitions) + + info, created := topics[topic] + // once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid. + if created { + // make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes` + topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - } else { - if partitionNum == 0 { - partitionNum = defaultPartitionNum - log.Warn("topic not found and partition number is not specified, using default partition number", zap.String("topic", topic), zap.Int32("partition_num", partitionNum)) + + if topicMaxMessageBytes < config.MaxMessageBytes { + log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+ + "use topic's `max.message.bytes` to initialize the Kafka producer", + zap.Int("max.message.bytes", topicMaxMessageBytes), + zap.Int("max-message-bytes", config.MaxMessageBytes)) + saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes } - log.Info("create a topic", zap.String("topic", topic), - zap.Int32("partition_num", partitionNum), - zap.Int16("replication_factor", config.ReplicationFactor)) - err := admin.CreateTopic(topic, &sarama.TopicDetail{ - NumPartitions: partitionNum, - ReplicationFactor: config.ReplicationFactor, - }, false) - // TODO idenfity the cause of "Topic with this name already exists" - if err != nil && !strings.Contains(err.Error(), "already exists") { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + + // no need to create the topic, but we would have to log user if they found enter wrong topic name later + if config.AutoCreate { + log.Warn("topic already exist, TiCDC will not create the topic", + zap.String("topic", topic), zap.Any("detail", info)) + } + + if err := config.adjustPartitionNum(info.NumPartitions); err != nil { + return errors.Trace(err) } + + return nil } - return partitionNum, nil + if !config.AutoCreate { + return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found") + } + + brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin) + if err != nil { + log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration") + return errors.Trace(err) + } + + // when create the topic, `max.message.bytes` is decided by the broker, + // it would use broker's `message.max.bytes` to set topic's `max.message.bytes`. + // TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than + // broker's `message.max.bytes`. + if brokerMessageMaxBytes < config.MaxMessageBytes { + log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+ + "use broker's `message.max.bytes` to initialize the Kafka producer", + zap.Int("message.max.bytes", brokerMessageMaxBytes), + zap.Int("max-message-bytes", config.MaxMessageBytes)) + saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes + } + + // topic not created yet, and user does not specify the `partition-num` in the sink uri. + if config.PartitionNum == 0 { + config.PartitionNum = defaultPartitionNum + log.Warn("partition-num is not set, use the default partition count", + zap.String("topic", topic), zap.Int32("partitions", config.PartitionNum)) + } + + err = admin.CreateTopic(topic, &sarama.TopicDetail{ + NumPartitions: config.PartitionNum, + ReplicationFactor: config.ReplicationFactor, + }, false) + // TODO identify the cause of "Topic with this name already exists" + if err != nil && !strings.Contains(err.Error(), "already exists") { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + log.Info("TiCDC create the topic", + zap.Int32("partition-num", config.PartitionNum), + zap.Int16("replication-factor", config.ReplicationFactor)) + + return nil } var newSaramaConfigImpl = newSaramaConfig // NewKafkaSaramaProducer creates a kafka sarama producer -func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) { +func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { return nil, err } - if config.PartitionNum < 0 { - return nil, cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(config.PartitionNum) + + if err := topicPreProcess(topic, config, cfg); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - asyncClient, err := sarama.NewAsyncProducer(strings.Split(address, ","), cfg) + opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes) + + asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - syncClient, err := sarama.NewSyncProducer(strings.Split(address, ","), cfg) + syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - partitionNum := config.PartitionNum - if config.TopicPreProcess { - partitionNum, err = kafkaTopicPreProcess(topic, address, config, cfg) - if err != nil { - return nil, err - } - } - notifier := new(notify.Notifier) flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond) if err != nil { @@ -467,11 +515,11 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c asyncClient: asyncClient, syncClient: syncClient, topic: topic, - partitionNum: partitionNum, + partitionNum: config.PartitionNum, partitionOffset: make([]struct { flushed uint64 sent uint64 - }, partitionNum), + }, config.PartitionNum), flushedNotifier: notifier, flushedReceiver: flushedReceiver, closeCh: make(chan struct{}), @@ -558,6 +606,11 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config.Producer.Return.Successes = true config.Producer.Return.Errors = true config.Producer.RequiredAcks = sarama.WaitForAll + + // Time out in five minutes(600 * 500ms). + config.Producer.Retry.Max = 600 + config.Producer.Retry.Backoff = 500 * time.Millisecond + switch strings.ToLower(strings.TrimSpace(c.Compression)) { case "none": config.Producer.Compression = sarama.CompressionNone @@ -574,10 +627,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config.Producer.Compression = sarama.CompressionNone } - // Time out in five minutes(600 * 500ms). - config.Producer.Retry.Max = 600 - config.Producer.Retry.Backoff = 500 * time.Millisecond - // Time out in one minute(120 * 500ms). config.Admin.Retry.Max = 120 config.Admin.Retry.Backoff = 500 * time.Millisecond @@ -606,3 +655,72 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { return config, err } + +func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) { + target := "message.max.bytes" + _, controllerID, err := admin.DescribeCluster() + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + configEntries, err := admin.DescribeConfig(sarama.ConfigResource{ + Type: sarama.BrokerResource, + Name: strconv.Itoa(int(controllerID)), + ConfigNames: []string{target}, + }) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + if len(configEntries) == 0 || configEntries[0].Name != target { + return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack( + "since cannot find the `message.max.bytes` from the broker's configuration, " + + "ticdc decline to create the topic and changefeed to prevent potential error") + } + + result, err := strconv.Atoi(configEntries[0].Value) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + return result, nil +} + +func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) { + if a, ok := info.ConfigEntries["max.message.bytes"]; ok { + result, err := strconv.Atoi(*a) + if err != nil { + return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + return result, nil + } + + return getBrokerMessageMaxBytes(admin) +} + +// adjust the partition-num by the topic's partition count +func (c *Config) adjustPartitionNum(realPartitionCount int32) error { + // user does not specify the `partition-num` in the sink-uri + if c.PartitionNum == 0 { + c.PartitionNum = realPartitionCount + return nil + } + + if c.PartitionNum < realPartitionCount { + log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+ + "Some partitions will not have messages dispatched to", + zap.Int32("sink-uri partitions", c.PartitionNum), + zap.Int32("topic partitions", realPartitionCount)) + return nil + } + + // Make sure that the user-specified `partition-num` is not greater than + // the real partition count, since messages would be dispatched to different + // partitions, this could prevent potential correctness problems. + if c.PartitionNum > realPartitionCount { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStack( + "the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)", + c.PartitionNum, realPartitionCount) + } + return nil +} diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index 2206b96d582..ebcd6093068 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "net/url" + "strings" "sync" "testing" "time" @@ -24,6 +25,7 @@ import ( "github.com/Shopify/sarama" "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -96,6 +98,12 @@ func (s *kafkaSuite) TestInitializeConfig(c *check.C) { for k, v := range opts { c.Assert(v, check.Equals, expectedOpts[k]) } + + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + err = cfg.Initialize(sinkURI, replicaConfig, opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*") } func (s *kafkaSuite) TestSaramaProducer(c *check.C) { @@ -116,7 +124,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { prodSuccess.AddTopicPartition(topic, 0, sarama.ErrNoError) prodSuccess.AddTopicPartition(topic, 1, sarama.ErrNoError) // 200 async messages and 2 sync message, Kafka flush could be in batch, - // we can set flush.maxmessages to 1 to control message count exactly. + // we can set flush.max.messages to 1 to control message count exactly. for i := 0; i < 202; i++ { leader.Returns(prodSuccess) } @@ -128,7 +136,8 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 config.Version = "0.9.0.0" config.PartitionNum = int32(2) - config.TopicPreProcess = false + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") newSaramaConfigImplBak := newSaramaConfigImpl newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { @@ -137,13 +146,16 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { cfg.Producer.Flush.MaxMessages = 1 return cfg, err } + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) defer func() { newSaramaConfigImpl = newSaramaConfigImplBak + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") }() - - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) + c.Assert(opts, check.HasKey, "max-message-bytes") for i := 0; i < 100; i++ { err = producer.SendMessage(ctx, &codec.MQMessage{ Key: []byte("test-key-1"), @@ -223,6 +235,23 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { } } +func (s *kafkaSuite) TestAdjustPartitionNum(c *check.C) { + defer testleak.AfterTest(c)() + config := NewConfig() + err := config.adjustPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(config.PartitionNum, check.Equals, int32(2)) + + config.PartitionNum = 1 + err = config.adjustPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(config.PartitionNum, check.Equals, int32(1)) + + config.PartitionNum = 3 + err = config.adjustPartitionNum(2) + c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) +} + func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { defer testleak.AfterTest(c) topic := "unit_test_2" @@ -240,47 +269,20 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { "MetadataRequest": metaResponse, "DescribeConfigsRequest": sarama.NewMockDescribeConfigsResponse(c), }) - config := NewConfig() config.PartitionNum = int32(0) + config.BrokerEndpoints = strings.Split(broker.Addr(), ",") + config.AutoCreate = false + cfg, err := newSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) - num, err := kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) - c.Assert(err, check.IsNil) - c.Assert(num, check.Equals, int32(2)) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) + config.BrokerEndpoints = []string{""} cfg.Metadata.Retry.Max = 1 - _, err = kafkaTopicPreProcess(topic, "", config, cfg) - c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers) - - config.PartitionNum = int32(4) - _, err = kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) - c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) -} - -func (s *kafkaSuite) TestTopicPreProcessCreate(c *check.C) { - defer testleak.AfterTest(c)() - topic := "unit_test_3" - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - broker := sarama.NewMockBroker(c, 1) - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": sarama.NewMockMetadataResponse(c). - SetBroker(broker.Addr(), broker.BrokerID()). - SetController(broker.BrokerID()), - "DescribeConfigsRequest": sarama.NewMockDescribeConfigsResponse(c), - "CreateTopicsRequest": sarama.NewMockCreateTopicsResponse(c), - }) - defer broker.Close() - config := NewConfig() - config.PartitionNum = int32(0) - cfg, err := newSaramaConfigImpl(ctx, config) - c.Assert(err, check.IsNil) - num, err := kafkaTopicPreProcess(topic, broker.Addr(), config, cfg) - c.Assert(err, check.IsNil) - c.Assert(num, check.Equals, int32(4)) + err = topicPreProcess(topic, config, cfg) + c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers) } func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { @@ -345,13 +347,13 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { errCh := make(chan error, 1) config := NewConfig() config.Version = "invalid" - _, err := NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh) + config.BrokerEndpoints = []string{"127.0.0.1:1111"} + topic := "topic" + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + _, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") - config.Version = "0.8.2.0" - config.PartitionNum = int32(-1) - _, err = NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh) - c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") } func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { @@ -375,7 +377,10 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 config.Version = "0.9.0.0" config.PartitionNum = int32(2) - config.TopicPreProcess = false + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") + + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) newSaramaConfigImplBak := newSaramaConfigImpl newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { @@ -391,8 +396,9 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { }() errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") err := producer.Close() c.Assert(err, check.IsNil) }() @@ -449,13 +455,17 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 config.Version = "0.9.0.0" config.PartitionNum = int32(2) - config.TopicPreProcess = false + config.AutoCreate = false + config.BrokerEndpoints = strings.Split(leader.Addr(), ",") + + c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) defer func() { err := producer.Close() c.Assert(err, check.IsNil) + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") }() c.Assert(err, check.IsNil) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 90fff12451d..7bb67a27567 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -13,6 +13,9 @@ package config +// DefaultMaxMessageBytes sets the default value for max-message-bytes +const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M + // SinkConfig represents sink config for a changefeed type SinkConfig struct { DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers"`