-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v2]adds consumer offset reset policy option to keda kafka scaler #925
[v2]adds consumer offset reset policy option to keda kafka scaler #925
Conversation
@grassiale thanks, this is great. Could you please open this PR against |
ce2b760
to
472a3b7
Compare
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
472a3b7
to
b0432a9
Compare
if consumerOffset == sarama.OffsetNewest || consumerOffset == sarama.OffsetOldest { | ||
lag = latestOffset | ||
if s.metadata.consumerOffsetReset == latest { | ||
lag = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be lag = latestOffset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when you have no offset committed and you create a new consumer with reset latest policy then the lag should be 0 since the consumer is aligned with the latest offset on the topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I am not a Kafka expert, but the properties naming is pretty confusing
@ppatierno PTAL if you have a time :) |
@grassiale could you please add a test that is covering this property? |
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
Converting to draft because I'm implementing tests. |
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
pkg/scalers/kafka_scaler.go
Outdated
@@ -57,6 +65,7 @@ const ( | |||
lagThresholdMetricName = "lagThreshold" | |||
kafkaMetricType = "External" | |||
defaultKafkaLagThreshold = 10 | |||
defaultOffsetReset = earliest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in Kafka the default is "latest", why are you setting "earliest" out of curiosity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right on the default in kafka. I set latest as a default because, right now, if you create a kafka scaler, it will behave as if we set earliest, if no offset is committed. So I would not break the current behavior for anyone upgrading from a previous version and not seeing the option.
But maybe, since this is a new major version of Keda, it could be safer to be coherent with Kafka defaults.
Let me know what you think, I can change it accordingly to the decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I would agree, let's see what @zroubalik thinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it makes sense to set it to Kafka default, a new major release is a great fit for a change like this. We could add a small note to the docs about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done the change and also changed the docs, let me know if it needs further clarification there.
pkg/scalers/kafka_scaler_test.go
Outdated
brokers []string | ||
group string | ||
topic string | ||
consumerOffsetReset string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use the custom type offsetResetPolicy
instead of string here.
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
pkg/scalers/kafka_scaler.go
Outdated
group string | ||
topic string | ||
lagThreshold int64 | ||
consumerOffsetReset offsetResetPolicy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we find a better name instead of consumerOffsetReset
. Actually the type name would be a great name for the struct field as well so offsetResetPolicy
. It's not the first time that a field name is the same as the type name, or?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than the comments I left :-)
@ppatierno thanks for the review! |
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
83f9c9b
to
f8a4648
Compare
I also started to add a small E2E test for this particular case, let me know if you don't think this is necessary. |
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
@grassiale e2e tests are highly appreciated. So would be really great if you could add a scenario for this, but it is not mandatory. But if you want to test functionality that is not that much relevant (or is not direct affected by) KEDA but it is more testing Kafka capability, you don't have to add it. |
Hi @zroubalik , I've been able to implement some e2e tests for this case, the thing is:
|
Agree, makes sense. |
* adds consumer offset reset policy option to keda kafka scaler Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
Signed-off-by: grassiale alessandro.grassi01@gmail.com
This PR is aimed at solving how Keda Kafka scaler handles the absence of committed offset with relation to the offset policy the consumer is following.
At the moment, if you create a scaledobject for a new consumer group having no committed offset, keda scaler will return a lag value equal to the latest offset in the topic it is consuming from. This often causes the scaler to scale relative deployment replicas to the max, because the latest offset in the topic is probably higher than the lagTreshold. This is fine for consumers having auto.offset.reset=earliest, since they will start to consume right-away all the messages in the topic; but is wrong for
auto.offset.reset=latest consumers, that will only consume new messages on the topic. The lag for this kind of consumer is now initially set to 0.
Checklist