diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 7df2163c8b6..678efedd11d 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -26,10 +26,11 @@ type kafkaScaler struct { } type kafkaMetadata struct { - bootstrapServers []string - group string - topic string - lagThreshold int64 + bootstrapServers []string + group string + topic string + lagThreshold int64 + consumerOffsetReset offsetResetPolicy // auth authMode kafkaAuthMode @@ -42,6 +43,13 @@ type kafkaMetadata struct { ca string } +type offsetResetPolicy string + +const ( + latest offsetResetPolicy = "latest" + earliest offsetResetPolicy = "earliest" +) + type kafkaAuthMode string const ( @@ -57,6 +65,7 @@ const ( lagThresholdMetricName = "lagThreshold" kafkaMetricType = "External" defaultKafkaLagThreshold = 10 + defaultOffsetReset = latest ) var kafkaLog = logf.Log.WithName("kafka_scaler") @@ -108,6 +117,16 @@ func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (ka } meta.topic = metadata["topic"] + meta.consumerOffsetReset = defaultOffsetReset + + if metadata["consumerOffsetReset"] != "" { + policy := offsetResetPolicy(metadata["offsetReset"]) + if policy != earliest && policy != latest { + return meta, fmt.Errorf("err offsetReset policy %s given", policy) + } + meta.consumerOffsetReset = policy + } + meta.lagThreshold = defaultKafkaLagThreshold if val, ok := metadata[lagThresholdMetricName]; ok { @@ -300,11 +319,13 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset } var lag int64 - // For now, assume a consumer group that has no committed - // offset will read all messages from the topic. This may be - // something we want to allow users to configure. + if consumerOffset == sarama.OffsetNewest || consumerOffset == sarama.OffsetOldest { - lag = latestOffset + if s.metadata.consumerOffsetReset == latest { + lag = 0 + } else { + lag = latestOffset + } } else { lag = latestOffset - consumerOffset }