Skip to content

Commit

Permalink
Replace session context with main context
Browse files Browse the repository at this point in the history
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge.
Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around.

IBM/sarama#1776
golang/go#22602
  • Loading branch information
alok87 committed Feb 18, 2021
1 parent 75bb54c commit ddbe9b2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
1 change: 1 addition & 0 deletions cmd/redshiftbatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func run(cmd *cobra.Command, args []string) {
groupConfig,
redshiftbatcher.NewConsumer(
ready,
ctx,
groupConfig.Kafka,
groupConfig.Sarama,
groupConfig.LoaderTopicPrefix,
Expand Down
17 changes: 12 additions & 5 deletions pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type batchProcessor struct {
// session is required to commit the offsets on succesfull processing
session sarama.ConsumerGroupSession

// mainContext for cancellations
mainContext context.Context

// s3Sink
s3sink *s3sink.S3Sink

Expand Down Expand Up @@ -100,6 +103,7 @@ func newBatchProcessor(
topic string,
partition int32,
session sarama.ConsumerGroupSession,
mainContext context.Context,
kafkaConfig kafka.KafkaConfig,
saramaConfig kafka.SaramaConfig,
kafkaLoaderTopicPrefix string,
Expand Down Expand Up @@ -146,6 +150,7 @@ func newBatchProcessor(
partition: partition,
autoCommit: saramaConfig.AutoCommit,
session: session,
mainContext: mainContext,
s3sink: sink,
s3BucketDir: viper.GetString("s3sink.bucketDir"),
bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)),
Expand Down Expand Up @@ -179,8 +184,10 @@ func removeEmptyNullValues(value map[string]*string) map[string]*string {
// TODO: get rid of this https://github.com/herryg91/gobatch/issues/2
func (b *batchProcessor) ctxCancelled() bool {
select {
case <-b.session.Context().Done():
klog.Infof(
case <-b.mainContext.Done():
err := b.mainContext.Err()
klog.Warningf("Batch processing stopped, mainContext done, ctxErr: %v", err)
klog.V(2).Infof(
"topic:%s, batchId:%d, lastCommittedOffset:%d: Cancelled.\n",
b.topic, b.batchId, b.lastCommittedOffset,
)
Expand Down Expand Up @@ -393,12 +400,12 @@ func (b *batchProcessor) processMessage(message *serializer.Message, id int) {
// otherwise return false in case of gracefull shutdown signals being captured,
// this helps in cleanly shutting down the batch processing.
func (b *batchProcessor) processBatch(
ctx context.Context, datas []interface{}) bool {
mainContext context.Context, datas []interface{}) bool {

b.s3Key = ""
for id, data := range datas {
select {
case <-ctx.Done():
case <-mainContext.Done():
return false
default:
b.processMessage(data.(*serializer.Message), id)
Expand All @@ -423,7 +430,7 @@ func (b *batchProcessor) process(workerID int, datas []interface{}) {
b.topic, b.batchId, len(datas),
)

done := b.processBatch(b.session.Context(), datas)
done := b.processBatch(b.mainContext, datas)
if !done {
b.handleShutdown()
return
Expand Down
13 changes: 9 additions & 4 deletions pkg/redshiftbatcher/consumer.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package redshiftbatcher

import (
"context"
"github.com/Shopify/sarama"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/kafka"
)

func NewConsumer(ready chan bool, kafkaConfig kafka.KafkaConfig, saramaConfig kafka.SaramaConfig, loaderPrefix string) consumer {
func NewConsumer(ready chan bool, mainContext context.Context, kafkaConfig kafka.KafkaConfig, saramaConfig kafka.SaramaConfig, loaderPrefix string) consumer {
return consumer{
ready: ready,
mainContext: mainContext,
kafkaConfig: kafkaConfig,
saramaConfig: saramaConfig,
kafkaLoaderTopicPrefix: loaderPrefix,
Expand All @@ -23,6 +25,7 @@ func NewConsumer(ready chan bool, kafkaConfig kafka.KafkaConfig, saramaConfig ka
type consumer struct {
// Ready is used to signal the main thread about the readiness
ready chan bool
mainContext context.Context
kafkaConfig kafka.KafkaConfig
saramaConfig kafka.SaramaConfig
kafkaLoaderTopicPrefix string
Expand Down Expand Up @@ -60,6 +63,7 @@ func (c consumer) processMessage(
message.Topic,
message.Partition,
session,
c.mainContext,
c.kafkaConfig,
c.saramaConfig,
c.kafkaLoaderTopicPrefix,
Expand All @@ -69,8 +73,9 @@ func (c consumer) processMessage(
c.batcher.processor.session = session

select {
case <-c.batcher.processor.session.Context().Done():
klog.Info("Graceful shutdown requested, not inserting in batch")
case <-c.mainContext.Done():
err := c.mainContext.Err()
klog.Warningf("Batch insert stopped, mainContext done, ctxErr: %v", err)
return nil
default:
c.batcher.Insert(message)
Expand Down Expand Up @@ -98,7 +103,7 @@ func (c consumer) ConsumeClaim(session sarama.ConsumerGroupSession,
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
select {
case <-session.Context().Done():
case <-c.mainContext.Done():
klog.Infof(
"%s: Gracefully shutdown. Stopped taking new messages.",
claim.Topic(),
Expand Down

0 comments on commit ddbe9b2

Please sign in to comment.