Skip to content

Commit

Permalink
Patch sarama_kafka rebalance fix (#818)
Browse files Browse the repository at this point in the history
* Patch sarama_kafka rebalance fix

Issue: #817
Issue Explanation: IBM/sarama#2118
Fix reference: https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go#L177

Signed-off-by: nbajaj90 <nbajaj90@gmail.com>

* Adding comment as per code review comment

Signed-off-by: nbajaj90 <nbajaj90@gmail.com>

* Fixing typo

Signed-off-by: nbajaj90 <nbajaj90@gmail.com>

* Incorporated review comment to move comment

Signed-off-by: nbajaj90 <nbajaj90@gmail.com>

Signed-off-by: nbajaj90 <nbajaj90@gmail.com>
  • Loading branch information
nbajaj90 authored Nov 10, 2022
1 parent b888ba2 commit 9737f8e
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions protocol/kafka_sarama/v2/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,40 @@ func (r *Receiver) Close(context.Context) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Also the method should return when `session.Context()` is done.
// Refer - https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go#L177
func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
msg := message
m := NewMessageFromConsumerMessage(msg)

r.incoming <- msgErr{
msg: binding.WithFinish(m, func(err error) {
if protocol.IsACK(err) {
session.MarkMessage(msg, "")
}
}),
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case msg := <-claim.Messages():
m := NewMessageFromConsumerMessage(msg)

r.incoming <- msgErr{
msg: binding.WithFinish(m, func(err error) {
if protocol.IsACK(err) {
session.MarkMessage(msg, "")
}
}),
}

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
// https://github.com/Shopify/sarama/issues/2118
// Also checked Shopify/sarama code which calls this ConsumeClaim method, and don't see if there is any difference
// whether this method returns error or not. If it returns the error, as per current implementation, it could
// get printed in logs and later drained when the ConsumerGroup gets closed.
// For now, to be on safer side, returning nil instead of session.Context().Err() as suggested in
// https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go
case <-session.Context().Done():
return nil
}
}
return nil
}

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error) {
Expand Down

0 comments on commit 9737f8e

Please sign in to comment.