From 34e56607db00ac9552c4f6c98989a36dd065f320 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 24 Feb 2022 12:45:43 +0800 Subject: [PATCH 1/2] consumer(ticdc): update the logic of consumer. (#4129) (#4470) close pingcap/tiflow#4128 --- cdc/sink/black_hole.go | 6 +- cdc/sink/buffer_sink_test.go | 4 +- cdc/sink/mq.go | 2 +- cdc/sink/mysql.go | 2 +- cdc/sink/mysql_test.go | 2 +- cdc/sink/sink.go | 2 +- cdc/sink/statistics.go | 13 ++--- cmd/kafka-consumer/main.go | 106 ++++++++++++++++++++--------------- 8 files changed, 77 insertions(+), 60 deletions(-) diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 27f6b37df39..2e424fb3bb4 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -22,10 +22,10 @@ import ( "go.uber.org/zap" ) -// newBlackHoleSink creates a block hole sink -func newBlackHoleSink(ctx context.Context, opts map[string]string) *blackHoleSink { +// newBlackHoleSink creates a black hole sink +func newBlackHoleSink(ctx context.Context) *blackHoleSink { return &blackHoleSink{ - statistics: NewStatistics(ctx, "blackhole", opts), + statistics: NewStatistics(ctx, "blackhole"), } } diff --git a/cdc/sink/buffer_sink_test.go b/cdc/sink/buffer_sink_test.go index e1fe467a0a5..96ca0ec6ebc 100644 --- a/cdc/sink/buffer_sink_test.go +++ b/cdc/sink/buffer_sink_test.go @@ -34,7 +34,7 @@ func TestFlushTable(t *testing.T) { defer func() { cancel() }() - b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg)) + b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg)) require.Equal(t, uint64(5), b.getTableCheckpointTs(2)) require.Nil(t, b.EmitRowChangedEvents(ctx)) tbl1 := &model.TableName{TableID: 1} @@ -74,7 +74,7 @@ func TestFlushTable(t *testing.T) { func TestFlushFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) - b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg)) + b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg)) checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8) require.True(t, checkpoint <= 8) require.Nil(t, err) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index cde4fb5a831..389c2b3338a 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -147,7 +147,7 @@ func newMqSink( resolvedNotifier: notifier, resolvedReceiver: resolvedReceiver, - statistics: NewStatistics(ctx, "MQ", opts), + statistics: NewStatistics(ctx, "MQ"), } go func() { diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 219f17b82e8..b4c99a9a4c8 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -577,7 +577,7 @@ func newMySQLSink( params: params, filter: filter, txnCache: common.NewUnresolvedTxnCache(), - statistics: NewStatistics(ctx, "mysql", opts), + statistics: NewStatistics(ctx, "mysql"), metricConflictDetectDurationHis: metricConflictDetectDurationHis, metricBucketSizeCounters: metricBucketSizeCounters, errCh: make(chan error, 1), diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 754c4e49832..b7a96a4e0ff 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -50,7 +50,7 @@ func newMySQLSink4Test(ctx context.Context, t *testing.T) *mysqlSink { return &mysqlSink{ txnCache: common.NewUnresolvedTxnCache(), filter: f, - statistics: NewStatistics(ctx, "test", make(map[string]string)), + statistics: NewStatistics(ctx, "test"), params: params, } } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 1dd36d4477b..67ddb9005cd 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -65,7 +65,7 @@ func init() { // register blackhole sink sinkIniterMap["blackhole"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { - return newBlackHoleSink(ctx, opts), nil + return newBlackHoleSink(ctx), nil } // register mysql sink diff --git a/cdc/sink/statistics.go b/cdc/sink/statistics.go index 26c1bad7fb4..c12931e714e 100644 --- a/cdc/sink/statistics.go +++ b/cdc/sink/statistics.go @@ -30,13 +30,12 @@ const ( ) // NewStatistics creates a statistics -func NewStatistics(ctx context.Context, name string, opts map[string]string) *Statistics { - statistics := &Statistics{name: name, lastPrintStatusTime: time.Now()} - if cid, ok := opts[OptChangefeedID]; ok { - statistics.changefeedID = cid - } - if cid, ok := opts[OptCaptureAddr]; ok { - statistics.captureAddr = cid +func NewStatistics(ctx context.Context, name string) *Statistics { + statistics := &Statistics{ + name: name, + captureAddr: util.CaptureAddrFromCtx(ctx), + changefeedID: util.ChangefeedIDFromCtx(ctx), + lastPrintStatusTime: time.Now(), } statistics.metricExecTxnHis = execTxnHistogram.WithLabelValues(statistics.captureAddr, statistics.changefeedID) statistics.metricExecDDLHis = execDDLHistogram.WithLabelValues(statistics.captureAddr, statistics.changefeedID) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index e1d954572a0..a7a9d7d2c02 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -80,16 +80,17 @@ func init() { File: logPath, }) if err != nil { - log.Fatal("init logger failed", zap.Error(err)) + log.Panic("init logger failed", zap.Error(err)) } upstreamURI, err := url.Parse(upstreamURIStr) if err != nil { - log.Fatal("invalid upstream-uri", zap.Error(err)) + log.Panic("invalid upstream-uri", zap.Error(err)) } scheme := strings.ToLower(upstreamURI.Scheme) if scheme != "kafka" { - log.Fatal("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`", zap.String("upstream-uri", upstreamURIStr)) + log.Panic("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`", + zap.String("upstreamURI", upstreamURIStr)) } s := upstreamURI.Query().Get("version") if s != "" { @@ -106,20 +107,20 @@ func init() { config, err := newSaramaConfig() if err != nil { - log.Fatal("Error creating sarama config", zap.Error(err)) + log.Panic("Error creating sarama config", zap.Error(err)) } s = upstreamURI.Query().Get("partition-num") if s == "" { partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, config) if err != nil { - log.Fatal("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err)) + log.Panic("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err)) } kafkaPartitionNum = partition } else { c, err := strconv.Atoi(s) if err != nil { - log.Fatal("invalid partition-num of upstream-uri") + log.Panic("invalid partition-num of upstream-uri") } kafkaPartitionNum = int32(c) } @@ -128,7 +129,7 @@ func init() { if s != "" { c, err := strconv.Atoi(s) if err != nil { - log.Fatal("invalid max-message-bytes of upstream-uri") + log.Panic("invalid max-message-bytes of upstream-uri") } log.Info("Setting max-message-bytes", zap.Int("max-message-bytes", c)) kafkaMaxMessageBytes = c @@ -138,7 +139,7 @@ func init() { if s != "" { c, err := strconv.Atoi(s) if err != nil { - log.Fatal("invalid max-batch-size of upstream-uri") + log.Panic("invalid max-batch-size of upstream-uri") } log.Info("Setting max-batch-size", zap.Int("max-batch-size", c)) kafkaMaxBatchSize = c @@ -227,24 +228,24 @@ func main() { */ config, err := newSaramaConfig() if err != nil { - log.Fatal("Error creating sarama config", zap.Error(err)) + log.Panic("Error creating sarama config", zap.Error(err)) } err = waitTopicCreated(kafkaAddrs, kafkaTopic, config) if err != nil { - log.Fatal("wait topic created failed", zap.Error(err)) + log.Panic("wait topic created failed", zap.Error(err)) } /** * Setup a new Sarama consumer group */ consumer, err := NewConsumer(context.TODO()) if err != nil { - log.Fatal("Error creating consumer", zap.Error(err)) + log.Panic("Error creating consumer", zap.Error(err)) } ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(kafkaAddrs, kafkaGroupID, config) if err != nil { - log.Fatal("Error creating consumer group client", zap.Error(err)) + log.Panic("Error creating consumer group client", zap.Error(err)) } wg := &sync.WaitGroup{} @@ -256,7 +257,7 @@ func main() { // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), consumer); err != nil { - log.Fatal("Error from consumer: %v", zap.Error(err)) + log.Panic("Error from consumer: %v", zap.Error(err)) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { @@ -268,7 +269,7 @@ func main() { go func() { if err := consumer.Run(ctx); err != nil { - log.Fatal("Error running consumer: %v", zap.Error(err)) + log.Panic("Error running consumer: %v", zap.Error(err)) } }() @@ -286,7 +287,7 @@ func main() { cancel() wg.Wait() if err = client.Close(); err != nil { - log.Fatal("Error closing client", zap.Error(err)) + log.Panic("Error closing client", zap.Error(err)) } } @@ -383,9 +384,9 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram if sink == nil { panic("sink should initialized") } -ClaimMessages: + for message := range claim.Messages() { - log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) + log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value) if err != nil { return errors.Trace(err) @@ -395,7 +396,7 @@ ClaimMessages: for { tp, hasNext, err := batchDecoder.HasNext() if err != nil { - log.Fatal("decode message key failed", zap.Error(err)) + log.Panic("decode message key failed", zap.Error(err)) } if !hasNext { break @@ -404,29 +405,30 @@ ClaimMessages: counter++ // If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning. if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes && counter > 1 { - log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), - zap.Int("recevied-bytes", len(message.Key)+len(message.Value))) + log.Panic("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), + zap.Int("receviedBytes", len(message.Key)+len(message.Value))) } switch tp { case model.MqMessageTypeDDL: ddl, err := batchDecoder.NextDDLEvent() if err != nil { - log.Fatal("decode message value failed", zap.ByteString("value", message.Value)) + log.Panic("decode message value failed", zap.ByteString("value", message.Value)) } c.appendDDL(ddl) case model.MqMessageTypeRow: row, err := batchDecoder.NextRowChangedEvent() if err != nil { - log.Fatal("decode message value failed", zap.ByteString("value", message.Value)) + log.Panic("decode message value failed", zap.ByteString("value", message.Value)) } globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) if row.CommitTs <= globalResolvedTs || row.CommitTs <= sink.resolvedTs { - log.Debug("filter fallback row", zap.ByteString("row", message.Key), + log.Debug("RowChangedEvent fallback row, ignore it", + zap.Uint64("commitTs", row.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs), zap.Uint64("sinkResolvedTs", sink.resolvedTs), - zap.Int32("partition", partition)) - break ClaimMessages + zap.Int32("partition", partition), + zap.ByteString("row", message.Key)) } // FIXME: hack to set start-ts in row changed event, as start-ts // is not contained in TiCDC open protocol @@ -439,7 +441,7 @@ ClaimMessages: c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID) err = sink.EmitRowChangedEvents(ctx, row) if err != nil { - log.Fatal("emit row changed event failed", zap.Error(err)) + log.Panic("emit row changed event failed", zap.Error(err)) } lastCommitTs, ok := sink.tablesMap.Load(row.Table.TableID) if !ok || lastCommitTs.(uint64) < row.CommitTs { @@ -448,21 +450,29 @@ ClaimMessages: case model.MqMessageTypeResolved: ts, err := batchDecoder.NextResolvedEvent() if err != nil { - log.Fatal("decode message value failed", zap.ByteString("value", message.Value)) + log.Panic("decode message value failed", zap.ByteString("value", message.Value)) } resolvedTs := atomic.LoadUint64(&sink.resolvedTs) - if resolvedTs < ts { + // `resolvedTs` should be monotonically increasing, it's allowed to receive redandunt one. + if ts < resolvedTs { + log.Panic("partition resolved ts fallback", + zap.Uint64("ts", ts), + zap.Uint64("resolvedTs", resolvedTs), + zap.Int32("partition", partition)) + } else if ts > resolvedTs { log.Debug("update sink resolved ts", zap.Uint64("ts", ts), zap.Int32("partition", partition)) atomic.StoreUint64(&sink.resolvedTs, ts) + } else { + log.Info("redundant sink resolved ts", zap.Uint64("ts", ts), zap.Int32("partition", partition)) } } session.MarkMessage(message, "") } if counter > kafkaMaxBatchSize { - log.Fatal("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize), + log.Panic("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize), zap.Int("actual-batch-size", counter)) } } @@ -477,8 +487,11 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) { return } globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) - if ddl.CommitTs <= globalResolvedTs { - log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) + if ddl.CommitTs < globalResolvedTs { + log.Panic("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) + } + if ddl.CommitTs == globalResolvedTs { + log.Warn("receive redundant ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) return } c.ddlList = append(c.ddlList, ddl) @@ -519,14 +532,15 @@ func (c *Consumer) forEachSink(fn func(sink *partitionSink) error) error { // Run runs the Consumer func (c *Consumer) Run(ctx context.Context) error { var lastGlobalResolvedTs uint64 + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() - default: + case <-ticker.C: } - time.Sleep(100 * time.Millisecond) - // handle ddl + // initialize the `globalResolvedTs` as min of all partition's `ResolvedTs` globalResolvedTs := uint64(math.MaxUint64) err := c.forEachSink(func(sink *partitionSink) error { resolvedTs := atomic.LoadUint64(&sink.resolvedTs) @@ -538,6 +552,7 @@ func (c *Consumer) Run(ctx context.Context) error { if err != nil { return errors.Trace(err) } + // handle ddl todoDDL := c.getFrontDDL() if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs { // flush DMLs @@ -560,18 +575,21 @@ func (c *Consumer) Run(ctx context.Context) error { if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs { globalResolvedTs = todoDDL.CommitTs } - if lastGlobalResolvedTs == globalResolvedTs { - continue + if lastGlobalResolvedTs > globalResolvedTs { + log.Panic("global ResolvedTs fallback") } - lastGlobalResolvedTs = globalResolvedTs - atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs) - log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs)) - err = c.forEachSink(func(sink *partitionSink) error { - return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs) - }) - if err != nil { - return errors.Trace(err) + if globalResolvedTs > lastGlobalResolvedTs { + lastGlobalResolvedTs = globalResolvedTs + atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs) + log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs)) + + err = c.forEachSink(func(sink *partitionSink) error { + return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs) + }) + if err != nil { + return errors.Trace(err) + } } } } From 19b9177c81c907e16194c5e7bc4d3bbc3b3857a8 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Thu, 24 Feb 2022 15:17:42 +0800 Subject: [PATCH 2/2] remove skipAutoCreate failpoint from kafka test. (#4673) close pingcap/tiflow#4675 --- cdc/sink/producer/kafka/kafka.go | 243 ++++++++++---------- cdc/sink/producer/kafka/kafka_test.go | 158 +++++++++---- pkg/kafka/cluster_admin_client.go | 46 ++++ pkg/kafka/cluster_admin_client_mock_impl.go | 107 +++++++++ pkg/kafka/config.go | 25 ++ 5 files changed, 413 insertions(+), 166 deletions(-) create mode 100644 pkg/kafka/cluster_admin_client.go create mode 100644 pkg/kafka/cluster_admin_client_mock_impl.go create mode 100644 pkg/kafka/config.go diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index a83bff541fa..3a1a10ea9aa 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/kafka" "github.com/pingcap/tiflow/pkg/notify" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/util" @@ -397,14 +398,23 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { } } -func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error { - // FIXME: find a way to remove this failpoint for workload the unit test - failpoint.Inject("SkipTopicAutoCreate", func() { - failpoint.Return(nil) - }) - admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig) +var ( + newSaramaConfigImpl = newSaramaConfig + // NewAdminClientImpl specifies the build method for the admin client. + NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient +) + +// NewKafkaSaramaProducer creates a kafka sarama producer +func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { + log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) + cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { - return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + return nil, err + } + + admin, err := NewAdminClientImpl(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } defer func() { if err := admin.Close(); err != nil { @@ -412,22 +422,98 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) } }() + if err := validateAndCreateTopic(admin, topic, config, cfg); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes) + + asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + notifier := new(notify.Notifier) + flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, err + } + k := &kafkaSaramaProducer{ + asyncClient: asyncClient, + syncClient: syncClient, + topic: topic, + partitionNum: config.PartitionNum, + partitionOffset: make([]struct { + flushed uint64 + sent uint64 + }, config.PartitionNum), + flushedNotifier: notifier, + flushedReceiver: flushedReceiver, + closeCh: make(chan struct{}), + failpointCh: make(chan error, 1), + closing: kafkaProducerRunning, + } + go func() { + if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled { + select { + case <-ctx.Done(): + return + case errCh <- err: + default: + log.Error("error channel is full", zap.Error(err)) + } + } + }() + return k, nil +} + +func init() { + sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB +} + +var ( + validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) + commonInvalidChar = regexp.MustCompile(`[\?:,"]`) +) + +func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) { + if configuredClientID != "" { + clientID = configuredClientID + } else { + clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID) + clientID = commonInvalidChar.ReplaceAllString(clientID, "_") + } + if !validClientID.MatchString(clientID) { + return "", cerror.ErrKafkaInvalidClientID.GenWithStackByArgs(clientID) + } + return +} + +func validateAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config) error { topics, err := admin.ListTopics() if err != nil { return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - info, created := topics[topic] + info, exists := topics[topic] // once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid. - if created { + if exists { // make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes` - topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info) + topicMaxMessageBytesStr, err := getTopicConfig(admin, info, kafka.TopicMaxMessageBytesConfigName, + kafka.BrokerMessageMaxBytesConfigName) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + topicMaxMessageBytes, err := strconv.Atoi(topicMaxMessageBytesStr) if err != nil { return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } if topicMaxMessageBytes < config.MaxMessageBytes { - log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+ + log.Warn("topic's `max.message.bytes` less than the `max-message-bytes`,"+ "use topic's `max.message.bytes` to initialize the Kafka producer", zap.Int("max.message.bytes", topicMaxMessageBytes), zap.Int("max-message-bytes", config.MaxMessageBytes)) @@ -440,7 +526,7 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) zap.String("topic", topic), zap.Any("detail", info)) } - if err := config.adjustPartitionNum(info.NumPartitions); err != nil { + if err := config.setPartitionNum(info.NumPartitions); err != nil { return errors.Trace(err) } @@ -451,25 +537,29 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found") } - brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin) + brokerMessageMaxBytesStr, err := getBrokerConfig(admin, kafka.BrokerMessageMaxBytesConfigName) if err != nil { log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration") return errors.Trace(err) } + brokerMessageMaxBytes, err := strconv.Atoi(brokerMessageMaxBytesStr) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } // when create the topic, `max.message.bytes` is decided by the broker, // it would use broker's `message.max.bytes` to set topic's `max.message.bytes`. // TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than // broker's `message.max.bytes`. if brokerMessageMaxBytes < config.MaxMessageBytes { - log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+ + log.Warn("broker's `message.max.bytes` less than the `max-message-bytes`,"+ "use broker's `message.max.bytes` to initialize the Kafka producer", zap.Int("message.max.bytes", brokerMessageMaxBytes), zap.Int("max-message-bytes", config.MaxMessageBytes)) saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes } - // topic not created yet, and user does not specify the `partition-num` in the sink uri. + // topic not exists yet, and user does not specify the `partition-num` in the sink uri. if config.PartitionNum == 0 { config.PartitionNum = defaultPartitionNum log.Warn("partition-num is not set, use the default partition count", @@ -492,87 +582,6 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) return nil } -var newSaramaConfigImpl = newSaramaConfig - -// NewKafkaSaramaProducer creates a kafka sarama producer -func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { - log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) - cfg, err := newSaramaConfigImpl(ctx, config) - if err != nil { - return nil, err - } - - if err := topicPreProcess(topic, config, cfg); err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes) - - asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - - notifier := new(notify.Notifier) - flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond) - if err != nil { - return nil, err - } - k := &kafkaSaramaProducer{ - asyncClient: asyncClient, - syncClient: syncClient, - topic: topic, - partitionNum: config.PartitionNum, - partitionOffset: make([]struct { - flushed uint64 - sent uint64 - }, config.PartitionNum), - flushedNotifier: notifier, - flushedReceiver: flushedReceiver, - closeCh: make(chan struct{}), - failpointCh: make(chan error, 1), - closing: kafkaProducerRunning, - } - go func() { - if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled { - select { - case <-ctx.Done(): - return - case errCh <- err: - default: - log.Error("error channel is full", zap.Error(err)) - } - } - }() - return k, nil -} - -func init() { - sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB -} - -var ( - validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) - commonInvalidChar = regexp.MustCompile(`[\?:,"]`) -) - -func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) { - if configuredClientID != "" { - clientID = configuredClientID - } else { - clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID) - clientID = commonInvalidChar.ReplaceAllString(clientID, "_") - } - if !validClientID.MatchString(clientID) { - return "", cerror.ErrKafkaInvalidClientID.GenWithStackByArgs(clientID) - } - return -} - -// NewSaramaConfig return the default config and set the according version and metrics func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config := sarama.NewConfig() @@ -666,50 +675,42 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { return config, err } -func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) { - target := "message.max.bytes" +// getBrokerConfig gets broker config by name. +func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (string, error) { _, controllerID, err := admin.DescribeCluster() if err != nil { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + return "", err } configEntries, err := admin.DescribeConfig(sarama.ConfigResource{ Type: sarama.BrokerResource, Name: strconv.Itoa(int(controllerID)), - ConfigNames: []string{target}, + ConfigNames: []string{brokerConfigName}, }) if err != nil { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + return "", err } - if len(configEntries) == 0 || configEntries[0].Name != target { - return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack( - "since cannot find the `message.max.bytes` from the broker's configuration, " + - "ticdc decline to create the topic and changefeed to prevent potential error") + if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName { + return "", errors.New(fmt.Sprintf( + "cannot find the `%s` from the broker's configuration", brokerConfigName)) } - result, err := strconv.Atoi(configEntries[0].Value) - if err != nil { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - - return result, nil + return configEntries[0].Value, nil } -func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) { - if a, ok := info.ConfigEntries["max.message.bytes"]; ok { - result, err := strconv.Atoi(*a) - if err != nil { - return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - return result, nil +// getTopicConfig gets topic config by name. +// If the topic does not have this configuration, we will try to get it from the broker's configuration. +// NOTICE: The configuration names of topic and broker may be different for the same configuration. +func getTopicConfig(admin kafka.ClusterAdminClient, detail sarama.TopicDetail, topicConfigName string, brokerConfigName string) (string, error) { + if a, ok := detail.ConfigEntries[topicConfigName]; ok { + return *a, nil } - return getBrokerMessageMaxBytes(admin) + return getBrokerConfig(admin, brokerConfigName) } -// adjust the partition-num by the topic's partition count -func (c *Config) adjustPartitionNum(realPartitionCount int32) error { +func (c *Config) setPartitionNum(realPartitionCount int32) error { // user does not specify the `partition-num` in the sink-uri if c.PartitionNum == 0 { c.PartitionNum = realPartitionCount diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index ebcd6093068..5bcd41118a0 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -25,10 +25,10 @@ import ( "github.com/Shopify/sarama" "github.com/pingcap/check" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/kafka" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/util/testleak" @@ -110,7 +110,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { defer testleak.AfterTest(c)() ctx, cancel := context.WithCancel(context.Background()) - topic := "unit_test_1" + topic := kafka.DefaultMockTopicName leader := sarama.NewMockBroker(c, 2) defer leader.Close() metadataResponse := new(sarama.MetadataResponse) @@ -146,11 +146,11 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { cfg.Producer.Flush.MaxMessages = 1 return cfg, err } - c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + NewAdminClientImpl = kafka.NewMockAdminClient defer func() { - newSaramaConfigImpl = newSaramaConfigImplBak - _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") + NewAdminClientImpl = kafka.NewSaramaAdminClient }() + opts := make(map[string]string) producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(err, check.IsNil) @@ -235,54 +235,112 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { } } -func (s *kafkaSuite) TestAdjustPartitionNum(c *check.C) { +func (s *kafkaSuite) TestSetPartitionNum(c *check.C) { defer testleak.AfterTest(c)() config := NewConfig() - err := config.adjustPartitionNum(2) + err := config.setPartitionNum(2) c.Assert(err, check.IsNil) c.Assert(config.PartitionNum, check.Equals, int32(2)) config.PartitionNum = 1 - err = config.adjustPartitionNum(2) + err = config.setPartitionNum(2) c.Assert(err, check.IsNil) c.Assert(config.PartitionNum, check.Equals, int32(1)) config.PartitionNum = 3 - err = config.adjustPartitionNum(2) + err = config.setPartitionNum(2) c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) } -func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { +func (s *kafkaSuite) TestValidateAndCreateTopic(c *check.C) { defer testleak.AfterTest(c) - topic := "unit_test_2" - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - broker := sarama.NewMockBroker(c, 1) - defer broker.Close() - metaResponse := sarama.NewMockMetadataResponse(c). - SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader(topic, 0, broker.BrokerID()). - SetLeader(topic, 1, broker.BrokerID()). - SetController(broker.BrokerID()) - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": metaResponse, - "DescribeConfigsRequest": sarama.NewMockDescribeConfigsResponse(c), - }) config := NewConfig() - config.PartitionNum = int32(0) - config.BrokerEndpoints = strings.Split(broker.Addr(), ",") - config.AutoCreate = false + adminClient := kafka.NewClusterAdminClientMockImpl() + defer func() { + _ = adminClient.Close() + }() + + // When topic exists and max message bytes is set correctly. + config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() + cfg, err := newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + err = validateAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg) + c.Assert(err, check.IsNil) - cfg, err := newSaramaConfigImpl(ctx, config) + // When topic exists and max message bytes is not set correctly. + // use the smaller one. + defaultMaxMessageBytes := adminClient.GetDefaultMaxMessageBytes() + config.MaxMessageBytes = defaultMaxMessageBytes + 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + err = validateAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) + + config.MaxMessageBytes = defaultMaxMessageBytes - 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + err = validateAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) - config.BrokerEndpoints = []string{""} - cfg.Metadata.Retry.Max = 1 + // When topic does not exist and auto-create is not enabled. + config.AutoCreate = false + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + err = validateAndCreateTopic(adminClient, "non-exist", config, cfg) + c.Assert( + errors.Cause(err), + check.ErrorMatches, + ".*auto-create-topic` is false, and topic not found.*", + ) + + // When the topic does not exist, use the broker's configuration to create the topic. + // It is less than the value of broker. + config.AutoCreate = true + config.MaxMessageBytes = defaultMaxMessageBytes - 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + err = validateAndCreateTopic(adminClient, "create-random1", config, cfg) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) + + // When the topic does not exist, use the broker's configuration to create the topic. + // It is larger than the value of broker. + config.MaxMessageBytes = defaultMaxMessageBytes + 1 + config.AutoCreate = true + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + err = validateAndCreateTopic(adminClient, "create-random2", config, cfg) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) + + // When the topic exists, but the topic does not store max message bytes info, + // the check of parameter succeeds. + // It is less than the value of broker. + config.MaxMessageBytes = defaultMaxMessageBytes - 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + detail := &sarama.TopicDetail{ + NumPartitions: 3, + // Does not contain max message bytes information. + ConfigEntries: make(map[string]*string), + } + err = adminClient.CreateTopic("test-topic", detail, false) + c.Assert(err, check.IsNil) + err = validateAndCreateTopic(adminClient, "test-topic", config, cfg) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) - err = topicPreProcess(topic, config, cfg) - c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers) + // When the topic exists, but the topic does not store max message bytes info, + // the check of parameter fails. + // It is larger than the value of broker. + config.MaxMessageBytes = defaultMaxMessageBytes + 1 + cfg, err = newSaramaConfigImpl(context.Background(), config) + c.Assert(err, check.IsNil) + err = validateAndCreateTopic(adminClient, "test-topic", config, cfg) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) } func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { @@ -349,16 +407,18 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { config.Version = "invalid" config.BrokerEndpoints = []string{"127.0.0.1:1111"} topic := "topic" - c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) - _, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) + NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + NewAdminClientImpl = kafka.NewSaramaAdminClient + }() + opts := make(map[string]string) + _, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") - - _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") } func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { defer testleak.AfterTest(c)() - topic := "unit_test_4" + topic := kafka.DefaultMockTopicName ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -380,7 +440,10 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { config.AutoCreate = false config.BrokerEndpoints = strings.Split(leader.Addr(), ",") - c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + NewAdminClientImpl = kafka.NewSaramaAdminClient + }() newSaramaConfigImplBak := newSaramaConfigImpl newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { @@ -396,9 +459,10 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { }() errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(opts, check.HasKey, "max-message-bytes") defer func() { - _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") err := producer.Close() c.Assert(err, check.IsNil) }() @@ -436,7 +500,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { defer testleak.AfterTest(c)() - topic := "unit_test_4" + topic := kafka.DefaultMockTopicName ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -458,14 +522,18 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { config.AutoCreate = false config.BrokerEndpoints = strings.Split(leader.Addr(), ",") - c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) + NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + NewAdminClientImpl = kafka.NewSaramaAdminClient + }() errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(opts, check.HasKey, "max-message-bytes") defer func() { err := producer.Close() c.Assert(err, check.IsNil) - _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") }() c.Assert(err, check.IsNil) diff --git a/pkg/kafka/cluster_admin_client.go b/pkg/kafka/cluster_admin_client.go new file mode 100644 index 00000000000..e9a138a82c8 --- /dev/null +++ b/pkg/kafka/cluster_admin_client.go @@ -0,0 +1,46 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "github.com/Shopify/sarama" +) + +// ClusterAdminClient is the administrative client for Kafka, which supports managing and inspecting topics, +// brokers, configurations and ACLs. +type ClusterAdminClient interface { + // ListTopics list the topics available in the cluster with the default options. + ListTopics() (map[string]sarama.TopicDetail, error) + // DescribeCluster gets information about the nodes in the cluster + DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) + // DescribeConfig gets the configuration for the specified resources. + DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error) + // CreateTopic creates a new topic. + CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error + // Close shuts down the admin and closes underlying client. + Close() error +} + +// ClusterAdminClientCreator defines the type of cluster admin client crater. +type ClusterAdminClientCreator func([]string, *sarama.Config) (ClusterAdminClient, error) + +// NewSaramaAdminClient constructs a ClusterAdminClient with sarama. +func NewSaramaAdminClient(addrs []string, conf *sarama.Config) (ClusterAdminClient, error) { + return sarama.NewClusterAdmin(addrs, conf) +} + +// NewMockAdminClient constructs a ClusterAdminClient with mock implementation. +func NewMockAdminClient(_ []string, _ *sarama.Config) (ClusterAdminClient, error) { + return NewClusterAdminClientMockImpl(), nil +} diff --git a/pkg/kafka/cluster_admin_client_mock_impl.go b/pkg/kafka/cluster_admin_client_mock_impl.go new file mode 100644 index 00000000000..c2e03395c90 --- /dev/null +++ b/pkg/kafka/cluster_admin_client_mock_impl.go @@ -0,0 +1,107 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "strconv" + + "github.com/Shopify/sarama" +) + +const ( + // DefaultMockTopicName specifies the default mock topic name. + DefaultMockTopicName = "mock_topic" + // defaultMockControllerID specifies the default mock controller ID. + defaultMockControllerID = 1 +) + +// defaultMaxMessageBytes specifies the default max message bytes. +var defaultMaxMessageBytes = "10485760" + +// ClusterAdminClientMockImpl mock implements the admin client interface. +type ClusterAdminClientMockImpl struct { + topics map[string]sarama.TopicDetail + // Cluster controller ID. + controllerID int32 + brokerConfigs []sarama.ConfigEntry +} + +// NewClusterAdminClientMockImpl news a ClusterAdminClientMockImpl struct with default configurations. +func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl { + topics := make(map[string]sarama.TopicDetail) + configEntries := make(map[string]*string) + configEntries[TopicMaxMessageBytesConfigName] = &defaultMaxMessageBytes + topics[DefaultMockTopicName] = sarama.TopicDetail{ + NumPartitions: 3, + ConfigEntries: configEntries, + } + + brokerConfigs := []sarama.ConfigEntry{ + { + Name: BrokerMessageMaxBytesConfigName, + Value: defaultMaxMessageBytes, + }, + } + + return &ClusterAdminClientMockImpl{ + topics: topics, + controllerID: defaultMockControllerID, + brokerConfigs: brokerConfigs, + } +} + +// ListTopics returns all topics directly. +func (c *ClusterAdminClientMockImpl) ListTopics() (map[string]sarama.TopicDetail, error) { + return c.topics, nil +} + +// DescribeCluster returns the controller ID. +func (c *ClusterAdminClientMockImpl) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) { + return nil, c.controllerID, nil +} + +// DescribeConfig return brokerConfigs directly. +func (c *ClusterAdminClientMockImpl) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error) { + var result []sarama.ConfigEntry + for _, name := range resource.ConfigNames { + for _, config := range c.brokerConfigs { + if name == config.Name { + result = append(result, config) + } + } + } + return result, nil +} + +// CreateTopic adds topic into map. +func (c *ClusterAdminClientMockImpl) CreateTopic(topic string, detail *sarama.TopicDetail, _ bool) error { + c.topics[topic] = *detail + return nil +} + +// Close do nothing. +func (c *ClusterAdminClientMockImpl) Close() error { + return nil +} + +// GetDefaultMockTopicName returns the default topic name +func (c *ClusterAdminClientMockImpl) GetDefaultMockTopicName() string { + return DefaultMockTopicName +} + +// GetDefaultMaxMessageBytes returns defaultMaxMessageBytes as a number. +func (c *ClusterAdminClientMockImpl) GetDefaultMaxMessageBytes() int { + topicMaxMessage, _ := strconv.Atoi(defaultMaxMessageBytes) + return topicMaxMessage +} diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go new file mode 100644 index 00000000000..ea7133215a6 --- /dev/null +++ b/pkg/kafka/config.go @@ -0,0 +1,25 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +const ( + // BrokerMessageMaxBytesConfigName specifies the largest record batch size allowed by + // Kafka brokers. + // See: https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes + BrokerMessageMaxBytesConfigName = "message.max.bytes" + // TopicMaxMessageBytesConfigName specifies the largest record batch size allowed by + // Kafka topics. + // See: https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes + TopicMaxMessageBytesConfigName = "max.message.bytes" +)