-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Question] How to Gracefully shutdown Kafka consumer process #1776
Comments
I don't think you need to check the OS signals on the processing of the messages. I think you can simply defer the consumer group close function and make the OS signals handler end the execution of the main go routine. something like: func yourMainMethod() {
...
group, err := sarama.NewConsumerGroup...
if err != nil {
logrus.WithError(err).Error("Couldn't create a Kafka consumer")
return
}
defer func() {
err = group.Close()
if err != nil {
logrus.WithError(err).Error("Error closing consumer group")
return
}
}()
signal handling here
} signal handling can be similar to https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go#L116-L126 having |
you could also add a channel to your |
Thanks for the suggestions. @d1egoaz (i will explore them) Something like: func (c consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
err := c.processMessage(session, message)
}
}
func processMessage(session, message) error {
// make memoryMessages until the batch threshold reach
for id, data := range memoryMessages {
select {
default:
message := data.(*sarama.ConsumerMessage)
b.handleMessage(message, id)
case <-b.session.ctx.Done():
return nil
}
}
return nil
} Thoughts? Does it look good? |
We should definitely have an example for this in examples. Since had to add context handling here (and many other places also). I am thinking there could be a better way to do this. for message := range claim.Messages() {
select {
case <-session.Context().Done():
klog.Infof("Gracefully shutdown. Stopped taking new messages.")
return nil
default:
err := c.processMessage(session, message)
if err != nil {
klog.Errorf("Error processing: value:%s, timestamp:%v, topic:%s", string(message.Value), message.Timestamp, message.Topic)
continue
}
}
} |
@alok87 PRs Welcomes :D it should be a great addition to our examples 🚀 |
Sure I will add that may be we can take the discussion of what would be better way to handle signal shutdown there... |
Hey recenty I obeserved an issue in which the context I passed to sarama is active. But sarama session context When can sarama session context get closed? i think session gets closed and its context closed when the Please suggest an alternate way. |
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
Please help out here. Passing context as parameter is the right way based on Golang documentation. golang/go#22602 I have to go around this by putting context in the struct. |
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
Closed in favour of #1892 |
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
How to gracefully shutdown the
processMessage()
on OS signals.Requirement
the
processMessage()
batches messages in memory and on reaching 100, it does some processing, uploads to s3 and then do a commit.Problem
I want to shutdown this
processMessage()
gracefully by listening to OS signals in every loop of processing of message. But since the context signals are not passed to theConsumeClaim()
, I am not able to forward them to theprocessMessage()
routine.Is not it possible to have context
ctx
in ConsumeClaim() defination ?consumerGroupSession
has ctx. But the idiomatic way in Go is to passctx
(context) in functions and not use it fromstruct{}
.Please let me know your thoughts on this.
Thank you for open sourcing the library 👍
The text was updated successfully, but these errors were encountered: