Skip to content

Commit

Permalink
kafka: optimize getLagForPartition (#1464)
Browse files Browse the repository at this point in the history
Signed-off-by: messense <messense@icloud.com>

Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com>
  • Loading branch information
messense and zroubalik authored Jan 4, 2021
1 parent 5819a1f commit 2db20b0
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381))
- Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323))
- Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413))
- Optimize Kafka scaler's `getLagForPartition` function. ([#1464](https://github.com/kedacore/keda/pull/1464))
- Reduce unnecessary /scale requests from ScaledObject controller ([#1453](https://github.com/kedacore/keda/pull/1453))

### Breaking Changes
Expand Down
13 changes: 6 additions & 7 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,20 +298,19 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition)
}
consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)
}
latestOffset, err := s.client.GetOffset(s.metadata.topic, partition, sarama.OffsetNewest)
if err != nil {
kafkaLog.Error(err, fmt.Sprintf("error finding latest offset for topic %s and partition %d\n", s.metadata.topic, partition))
return 0, fmt.Errorf("error finding latest offset for topic %s and partition %d", s.metadata.topic, partition)
}

if consumerOffset == invalidOffset {
if s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)
}
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
return latestOffset, nil
}
return (latestOffset - consumerOffset), nil
return latestOffset - consumerOffset, nil
}

// Close closes the kafka admin and client
Expand Down

0 comments on commit 2db20b0

Please sign in to comment.