Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro committed Mar 30, 2020
1 parent 9ae4a56 commit a45b47c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 40 deletions.
40 changes: 0 additions & 40 deletions cdc/sink/mqProducer/binlog_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,46 +143,6 @@ func NewKafkaBinlogSaramaProducer(ctx context.Context, address string, topic str

log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))

// get partition number or create topic automatically
admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg)
if err != nil {
return nil, errors.Trace(err)
}
topics, err := admin.ListTopics()
if err != nil {
return nil, errors.Trace(err)
}
partitionNum := config.PartitionNum
topicDetail, exist := topics[topic]
if exist {
log.Info("get partition number of topic", zap.String("topic", topic), zap.Int32("partition_num", topicDetail.NumPartitions))
if partitionNum == 0 {
partitionNum = topicDetail.NumPartitions
} else if partitionNum < topicDetail.NumPartitions {
log.Warn("partition number assigned in sink-uri is less than that of topic")
} else if partitionNum > topicDetail.NumPartitions {
return nil, errors.Errorf("partition number(%d) assigned in sink-uri is more than that of topic(%d)", partitionNum, topicDetail.NumPartitions)
}
} else {
if partitionNum == 0 {
partitionNum = 4
log.Warn("topic not found and partition number is not specified, using default partition number", zap.String("topic", topic), zap.Int32("partition_num", partitionNum))
}
log.Info("create a topic", zap.String("topic", topic), zap.Int32("partition_num", partitionNum), zap.Int16("replication_factor", config.ReplicationFactor))
err := admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: partitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
if err != nil {
return nil, errors.Trace(err)
}
}

err = admin.Close()
if err != nil {
return nil, errors.Trace(err)
}

executor := &KafkaSyncer{
addr: strings.Split(address, ","),
topic: topic,
Expand Down
3 changes: 3 additions & 0 deletions cdc/sink/mqProducer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
}

syncer, err := NewKafkaBinlogSaramaProducer(ctx, address, topic, config)
if err != nil {
return nil, errors.Trace(err)
}

return &kafkaSaramaProducer{
//asyncClient: asyncClient,
Expand Down

0 comments on commit a45b47c

Please sign in to comment.