Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Apache Kafka scaler to scale using sum of lag for all topics within a consumer group #2409

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
- **PostgreSQL Scaler:** Assign PostgreSQL `userName` to correct attribute ([#2432](https://github.com/kedacore/keda/pull/2432))|([#2433](https://github.com/kedacore/keda/pull/2433))
- **Kafka Scaler:** concurrently query brokers for consumer and producer offsets ([#2405](https://github.com/kedacore/keda/pull/2405))
- **External Scaler:** fix wrong calculation of retry backoff duration ([#2416](https://github.com/kedacore/keda/pull/2416))
- **Kafka Scaler:** allow flag `topic` to be optional, where lag of all topics within the consumer group will be used for scaling ([#2409](https://github.com/kedacore/keda/pull/2409))

### Breaking Changes

Expand Down
173 changes: 103 additions & 70 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
case config.TriggerMetadata["topic"] != "":
meta.topic = config.TriggerMetadata["topic"]
default:
return meta, errors.New("no topic given")
meta.topic = ""
kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

meta.offsetResetPolicy = defaultOffsetResetPolicy
Expand Down Expand Up @@ -210,29 +212,30 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {

// IsActive determines if we need to scale from zero
func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
partitions, err := s.getPartitions()
topicPartitions, err := s.getTopicPartitions()
if err != nil {
return false, err
}

consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions)
consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions)
if err != nil {
return false, err
}

for _, partition := range partitions {
lag, err := s.getLagForPartition(partition, consumerOffsets, producerOffsets)
if err != nil && lag == invalidOffset {
return true, nil
}
kafkaLog.V(1).Info(fmt.Sprintf("Group %s has a lag of %d for topic %s and partition %d\n", s.metadata.group, lag, s.metadata.topic, partition))
for topic, partitionsOffsets := range producerOffsets {
for partitionID := range partitionsOffsets {
lag, err := s.getLagForPartition(topic, partitionID, consumerOffsets, producerOffsets)
if err != nil && lag == invalidOffset {
return true, nil
}
kafkaLog.V(1).Info(fmt.Sprintf("Group %s has a lag of %d for topic %s and partition %d\n", s.metadata.group, lag, topic, partitionID))

// Return as soon as a lag was detected for any partition
if lag > 0 {
return true, nil
// Return as soon as a lag was detected for any partitionID
if lag > 0 {
return true, nil
}
}
}

return false, nil
}

Expand Down Expand Up @@ -285,49 +288,67 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
return client, admin, nil
}

func (s *kafkaScaler) getPartitions() ([]int32, error) {
topicsMetadata, err := s.admin.DescribeTopics([]string{s.metadata.topic})
func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) {
var topicsToDescribe = make([]string, 0)

// when no topic is specified, query to cg group to fetch all subscribed topics
if s.metadata.topic == "" {
listCGOffsetResponse, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, nil)
if err != nil {
return nil, fmt.Errorf("error listing cg offset: %s", err)
}
for topicName := range listCGOffsetResponse.Blocks {
topicsToDescribe = append(topicsToDescribe, topicName)
}
} else {
topicsToDescribe = []string{s.metadata.topic}
}

topicsMetadata, err := s.admin.DescribeTopics(topicsToDescribe)
if err != nil {
return nil, fmt.Errorf("error describing topics: %s", err)
}
if len(topicsMetadata) != 1 {

if s.metadata.topic != "" && len(topicsMetadata) != 1 {
return nil, fmt.Errorf("expected only 1 topic metadata, got %d", len(topicsMetadata))
}

partitionMetadata := topicsMetadata[0].Partitions
partitions := make([]int32, len(partitionMetadata))
for i, p := range partitionMetadata {
partitions[i] = p.ID
topicPartitions := make(map[string][]int32, len(topicsMetadata))
for _, topicMetadata := range topicsMetadata {
partitionMetadata := topicMetadata.Partitions
partitions := make([]int32, len(partitionMetadata))
for i, p := range partitionMetadata {
partitions[i] = p.ID
}
topicPartitions[topicMetadata.Name] = partitions
}

return partitions, nil
return topicPartitions, nil
}

func (s *kafkaScaler) getConsumerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) {
offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, map[string][]int32{
s.metadata.topic: partitions,
})

func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, topicPartitions)
if err != nil {
return nil, fmt.Errorf("error listing consumer group offsets: %s", err)
}

return offsets, nil
}

func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.OffsetFetchResponse, topicOffsets map[int32]int64) (int64, error) {
block := offsets.GetBlock(s.metadata.topic, partition)
func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) {
block := offsets.GetBlock(topic, partitionID)
if block == nil {
kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition), "")
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition)
kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID), "")
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
}
consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partitionID)
}

latestOffset := topicOffsets[partition]
if _, found := topicPartitionOffsets[topic]; !found {
return 0, fmt.Errorf("error finding parition offset for topic %s", topic)
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
return latestOffset, nil
}
Expand All @@ -347,9 +368,17 @@ func (s *kafkaScaler) Close(context.Context) error {

func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI)

var metricName string
if s.metadata.topic != "" {
metricName = fmt.Sprintf("kafka-%s", s.metadata.topic)
} else {
metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.group)
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("kafka-%s", s.metadata.topic))),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(metricName)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand All @@ -366,20 +395,20 @@ type consumerOffsetResult struct {
}

type producerOffsetResult struct {
producerOffsets map[int32]int64
producerOffsets map[string]map[int32]int64
err error
}

func (s *kafkaScaler) getConsumerAndProducerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, map[int32]int64, error) {
func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, map[string]map[int32]int64, error) {
consumerChan := make(chan consumerOffsetResult, 1)
go func() {
consumerOffsets, err := s.getConsumerOffsets(partitions)
consumerOffsets, err := s.getConsumerOffsets(topicPartitions)
consumerChan <- consumerOffsetResult{consumerOffsets, err}
}()

producerChan := make(chan producerOffsetResult, 1)
go func() {
producerOffsets, err := s.getProducerOffsets(partitions)
producerOffsets, err := s.getProducerOffsets(topicPartitions)
producerChan <- producerOffsetResult{producerOffsets, err}
}()

Expand All @@ -398,29 +427,32 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(partitions []int32) (*sarama

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
partitions, err := s.getPartitions()
topicPartitions, err := s.getTopicPartitions()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions)
consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

totalLag := int64(0)
for _, partition := range partitions {
lag, _ := s.getLagForPartition(partition, consumerOffsets, producerOffsets)
totalTopicPartitions := int64(0)

totalLag += lag
for topic, partitionsOffsets := range producerOffsets {
for partition := range partitionsOffsets {
lag, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets)
totalLag += lag
}
totalTopicPartitions += (int64)(len(partitionsOffsets))
}

kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, partitions %v, threshold %v", totalLag, len(partitions), s.metadata.lagThreshold))
kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, len(topicPartitions), s.metadata.lagThreshold))

if !s.metadata.allowIdleConsumers {
// don't scale out beyond the number of partitions
if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) {
totalLag = int64(len(partitions)) * s.metadata.lagThreshold
// don't scale out beyond the number of topicPartitions
if (totalLag / s.metadata.lagThreshold) > totalTopicPartitions {
totalLag = totalTopicPartitions * s.metadata.lagThreshold
}
}

Expand All @@ -438,7 +470,7 @@ type brokerOffsetResult struct {
err error
}

func (s *kafkaScaler) getProducerOffsets(partitions []int32) (map[int32]int64, error) {
func (s *kafkaScaler) getProducerOffsets(topicPartitions map[string][]int32) (map[string]map[int32]int64, error) {
version := int16(0)
if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) {
version = 1
Expand All @@ -447,22 +479,22 @@ func (s *kafkaScaler) getProducerOffsets(partitions []int32) (map[int32]int64, e
// Step 1: build one OffsetRequest instance per broker.
requests := make(map[*sarama.Broker]*sarama.OffsetRequest)

for _, partitionID := range partitions {
broker, err := s.client.Leader(s.metadata.topic, partitionID)
if err != nil {
return nil, err
}

request, ok := requests[broker]
if !ok {
request = &sarama.OffsetRequest{Version: version}
requests[broker] = request
for topic, partitions := range topicPartitions {
for _, partitionID := range partitions {
broker, err := s.client.Leader(topic, partitionID)
if err != nil {
return nil, err
}
request, ok := requests[broker]
if !ok {
request = &sarama.OffsetRequest{Version: version}
requests[broker] = request
}
request.AddBlock(topic, partitionID, sarama.OffsetNewest, 1)
}

request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1)
}

// Step 2: send requests, one per broker, and collect offsets
// Step 2: send requests, one per broker, and collect topicPartitionsOffsets
resultCh := make(chan brokerOffsetResult, len(requests))
var wg sync.WaitGroup
wg.Add(len(requests))
Expand All @@ -477,23 +509,24 @@ func (s *kafkaScaler) getProducerOffsets(partitions []int32) (map[int32]int64, e
wg.Wait()
close(resultCh)

offsets := make(map[int32]int64)

topicPartitionsOffsets := make(map[string]map[int32]int64)
for brokerOffsetRes := range resultCh {
if brokerOffsetRes.err != nil {
return nil, brokerOffsetRes.err
}

for _, blocks := range brokerOffsetRes.offsetResp.Blocks {
for topic, blocks := range brokerOffsetRes.offsetResp.Blocks {
if _, found := topicPartitionsOffsets[topic]; !found {
topicPartitionsOffsets[topic] = make(map[int32]int64)
}
for partitionID, block := range blocks {
if block.Err != sarama.ErrNoError {
return nil, block.Err
}

offsets[partitionID] = block.Offset
topicPartitionsOffsets[topic][partitionID] = block.Offset
}
}
}

return offsets, nil
return topicPartitionsOffsets, nil
}
6 changes: 4 additions & 2 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{}, true, 0, nil, "", "", "", false},
// failure, no consumer group
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false},
// failure, no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false},
// success, no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false},
// failure, version not supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success
Expand Down Expand Up @@ -118,6 +118,7 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
{&parseKafkaMetadataTestDataset[4], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[4], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"},
}

func TestGetBrokers(t *testing.T) {
Expand Down Expand Up @@ -190,6 +191,7 @@ func TestKafkaAuthParams(t *testing.T) {
}
}
}

func TestKafkaGetMetricSpecForScaling(t *testing.T) {
for _, testData := range kafkaMetricIdentifiers {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams, ScalerIndex: testData.scalerIndex})
Expand Down
Loading