Skip to content

Commit

Permalink
adds consumer offset reset policy option to keda kafka scaler
Browse files Browse the repository at this point in the history
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
  • Loading branch information
grassiale committed Jul 9, 2020
1 parent 9fdc76b commit 472a3b7
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type kafkaScaler struct {
}

type kafkaMetadata struct {
bootstrapServers []string
group string
topic string
lagThreshold int64
bootstrapServers []string
group string
topic string
lagThreshold int64
consumerOffsetReset offsetResetPolicy

// auth
authMode kafkaAuthMode
Expand All @@ -42,6 +43,13 @@ type kafkaMetadata struct {
ca string
}

type offsetResetPolicy string

const (
latest offsetResetPolicy = "latest"
earliest offsetResetPolicy = "earliest"
)

type kafkaAuthMode string

const (
Expand All @@ -57,6 +65,7 @@ const (
lagThresholdMetricName = "lagThreshold"
kafkaMetricType = "External"
defaultKafkaLagThreshold = 10
defaultOffsetReset = latest
)

var kafkaLog = logf.Log.WithName("kafka_scaler")
Expand Down Expand Up @@ -108,6 +117,16 @@ func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (ka
}
meta.topic = metadata["topic"]

meta.consumerOffsetReset = defaultOffsetReset

if metadata["consumerOffsetReset"] != "" {
policy := offsetResetPolicy(metadata["offsetReset"])
if policy != earliest && policy != latest {
return meta, fmt.Errorf("err offsetReset policy %s given", policy)
}
meta.consumerOffsetReset = policy
}

meta.lagThreshold = defaultKafkaLagThreshold

if val, ok := metadata[lagThresholdMetricName]; ok {
Expand Down Expand Up @@ -300,11 +319,13 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset
}

var lag int64
// For now, assume a consumer group that has no committed
// offset will read all messages from the topic. This may be
// something we want to allow users to configure.

if consumerOffset == sarama.OffsetNewest || consumerOffset == sarama.OffsetOldest {
lag = latestOffset
if s.metadata.consumerOffsetReset == latest {
lag = 0
} else {
lag = latestOffset
}
} else {
lag = latestOffset - consumerOffset
}
Expand Down

0 comments on commit 472a3b7

Please sign in to comment.