-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Get rid of sarama-cluster, use sarama consumer group instead. #2009
Conversation
3675972
to
8864b34
Compare
Signed-off-by: Ruben Vargas <ruben.vp8510@gmail.com>
8864b34
to
5688027
Compare
// Builder builds a new kafka consumer | ||
type Builder interface { | ||
NewConsumer() (Consumer, error) | ||
} | ||
|
||
type Consumer interface { | ||
Consume(ctx context.Context, handler sarama.ConsumerGroupHandler) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consume what? please document new interfaces and methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This starts the consumer loop , which consume the kafka partitions, I'll document it, but escentialy this is an interface that wraps sarama.ConsumerGroup
, so I borrow the names from the ConsumerGroup interface. What I did here is an interface to omit the topic, because I configure the consumer topic here and not in other parts of the code.
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory), | ||
}, nil | ||
} | ||
|
||
// Start begins consuming messages in a go routine | ||
func (c *Consumer) Start() { | ||
c.deadlockDetector.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this was added after the first implementation because of a specific problem that Uber experienced, although I don't remember what the problem was exactly. Are you aware of that? Is it safe to remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm aware of that, and still not sure if this should be removed, Is something I'm investigating.. it seems like the problem was solved in this implementation. but still not 100% sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe deprecate as Juca suggests, but only start the deadlock detector if internal > 0 (I think 0 is the default and means no deadlock detection used)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this is not necesary anymore, the issue was fixed on sarama. IBM/sarama#1156
Anyway, I agree with the idea of preserve this deadlockDetector, just in case, and mark it as deprecated. I'll reintroduce it.
for { | ||
err := c.internalConsumer.Consume(ctx, &handler) | ||
if err != nil { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's rude. Do you really want a panic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, not really, I'll remove it. I copied from an example of how to use the consumer group, but it shouldn't be here.
@@ -81,7 +80,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit | |||
ProcessorFactory: *processorFactory, | |||
MetricsFactory: metricsFactory, | |||
Logger: logger, | |||
DeadlockCheckInterval: options.DeadlockInterval, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, this can be configured via CLI. You might need to mark it as "deprecated" in the CLI help text + add an entry to the changelog.
I'm quite wary of this change because I feel that the new consumer group functionality in Sarama hasn't seen as much adoption and use as bsm/sarama-cluster. From a quick look at the project issues, things like IBM/sarama#1554 and IBM/sarama#1516 are concerning. Re: #1819 - rebalances are affected by load/session timeouts/heartbeat config/etc. If we feel that this is a widespread user facing issue, we could circle back and focus effort on determining the root cause. |
Hi, I already determined the root cause of this, (or what I think is the cause) it seems like this happens because a Kafka issue: https://issues.apache.org/jira/browse/KAFKA-8653 This is the situation: The implementation of sarama send rebalance timeout, and sarama-cluster don't, there is no way to fix this without modify sarama-cluster which is a deprecated library. So, the users that use kafka 2.3.0 will face #1819 So in my opinion these are our options:
|
@rubenvp8510 Thanks for the details! It seems that the bug with Kafka was fixed with 2.3.1. I prefer that we go with option (3) for now. (2) might be difficult because bsm/sarama-cluster is marked as read-only I think the decision to change the consumer library is a big one and that we'd need to evaluate the current contenders (confluentinc/confluent-kafka-go, segmentio/kafka-go, Shopify/sarama) more deeply before committing. |
I'll close this PR (see comments above) but I still think that we need to migrate to another library at some point. |
Which problem is this PR solving?
Short description of the changes