From 3470440cd755832af05e93c23ab37698b7a44d11 Mon Sep 17 00:00:00 2001 From: "Rieb, Elias" Date: Mon, 30 Oct 2023 13:39:48 +0100 Subject: [PATCH] Provide type-safe consumer --- pkg/aukafka/common.go | 4 ++-- pkg/aukafka/consumer.go | 49 ++++++++++++++++++++++------------------- pkg/aukafka/logger.go | 16 ++++++++------ pkg/aukafka/producer.go | 3 ++- 4 files changed, 39 insertions(+), 33 deletions(-) diff --git a/pkg/aukafka/common.go b/pkg/aukafka/common.go index d3e1a9d..0d18ae7 100644 --- a/pkg/aukafka/common.go +++ b/pkg/aukafka/common.go @@ -11,7 +11,7 @@ import ( "github.com/twmb/franz-go/pkg/sasl/scram" ) -func defaultTopicOptions(config TopicConfig) []kgo.Opt { +func defaultTopicOptions(logKey string, config TopicConfig) []kgo.Opt { tlsDialer := &tls.Dialer{ NetDialer: &net.Dialer{Timeout: 10 * time.Second}, Config: &tls.Config{InsecureSkipVerify: true}, @@ -40,7 +40,7 @@ func defaultTopicOptions(config TopicConfig) []kgo.Opt { kgo.SessionTimeout(30 * time.Second), kgo.RequestRetries(2), kgo.RetryTimeout(5 * time.Second), - kgo.WithLogger(Logger{}), + kgo.WithLogger(Logger{Key: logKey}), } return opts diff --git a/pkg/aukafka/consumer.go b/pkg/aukafka/consumer.go index 14cfdd4..bb15348 100644 --- a/pkg/aukafka/consumer.go +++ b/pkg/aukafka/consumer.go @@ -4,24 +4,24 @@ import ( "context" "encoding/json" "fmt" + "time" aulogging "github.com/StephanHCB/go-autumn-logging" "github.com/twmb/franz-go/pkg/kgo" ) type Consumer[E any] struct { - client *kgo.Client - - receiveCallback func(ctx context.Context, event E) error + client *kgo.Client + receiveCallback func(ctx context.Context, key *string, event *E, timestamp time.Time) error } func CreateConsumer[E any]( ctx context.Context, config TopicConfig, - receiveCallback func(context.Context, E) error, + receiveCallback func(context.Context, *string, *E, time.Time) error, customOpts ...kgo.Opt, ) (*Consumer[E], error) { - opts := defaultTopicOptions(config) + opts := defaultTopicOptions(fmt.Sprintf("%s consumer", config.Topic), config) opts = append(opts, customOpts...) opts = append(opts, kgo.ConsumerGroup(*config.ConsumerGroup), kgo.ConsumeTopics(config.Topic)) @@ -44,32 +44,35 @@ func (c *Consumer[E]) Stop() { c.client.Close() } -func (c *Consumer[E]) run(ctx context.Context) error { +func (c *Consumer[E]) run( + ctx context.Context, +) { for { - fetches := c.client.PollFetches(context.Background()) + fetches := c.client.PollFetches(ctx) if fetches.IsClientClosed() { - aulogging.Logger.NoCtx().Info().Print("receive loop ending, kafka client was closed") - return nil + aulogging.Logger.NoCtx().Info().Print("kafka client closed, stopping consumer") + return } - aulogging.Logger.NoCtx().Debug().Printf("receive loop found %d fetches", len(fetches)) + aulogging.Logger.NoCtx().Debug().Printf("received %s fetches", len(fetches)) - var firstError error = nil fetches.EachError(func(t string, p int32, err error) { - if firstError == nil { - firstError = fmt.Errorf("receive loop fetch error topic %s partition %d: %v", t, p, err) - } + aulogging.Logger.NoCtx().Error().WithErr(err).Printf("fetch error occurred for partition %d of topic %s", p, t) }) - if firstError != nil { - aulogging.Logger.NoCtx().Error().WithErr(firstError).Print("receive loop terminated abnormally: %v", firstError) - return firstError - } fetches.EachRecord(func(record *kgo.Record) { - aulogging.Logger.NoCtx().Info().Printf("received kafka message: %s", string(record.Value)) - var event E - if err := json.Unmarshal(record.Value, &event); err != nil { - aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Printf("failed to parse kafka message: %s", string(record.Value)) - c.receiveCallback(context.TODO(), event) + key := new(string) + if record.Key != nil { + *key = string(record.Key) + } + event := new(E) + if record.Value != nil { + if err := json.Unmarshal(record.Value, &event); err != nil { + aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Printf("failed to unmarshal event") + return + } + } + if err := c.receiveCallback(ctx, key, event, record.Timestamp); err != nil { + aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Printf("failed to perform event callback") } }) } diff --git a/pkg/aukafka/logger.go b/pkg/aukafka/logger.go index 117dbd6..8a8c070 100644 --- a/pkg/aukafka/logger.go +++ b/pkg/aukafka/logger.go @@ -5,25 +5,27 @@ import ( "github.com/twmb/franz-go/pkg/kgo" ) -type Logger struct{} +type Logger struct { + Key string +} func (l Logger) Level() kgo.LogLevel { - return kgo.LogLevelInfo // set to Debug to see all output + return kgo.LogLevelDebug // set to Debug to see all output } -func (l Logger) Log(level kgo.LogLevel, msg string, keyvals ...any) { +func (l Logger) Log(level kgo.LogLevel, msg string, _ ...any) { switch level { case kgo.LogLevelError: - aulogging.Logger.NoCtx().Error().Print("kgo error: " + msg) + aulogging.Logger.NoCtx().Error().Printf("kgo %s error: %s", l.Key, msg) return case kgo.LogLevelWarn: - aulogging.Logger.NoCtx().Warn().Print("kgo warning: " + msg) + aulogging.Logger.NoCtx().Warn().Printf("kgo %s warning: %s", l.Key, msg) return case kgo.LogLevelInfo: - aulogging.Logger.NoCtx().Info().Print("kgo info: " + msg) + aulogging.Logger.NoCtx().Info().Printf("kgo %s info: %s", l.Key, msg) return case kgo.LogLevelDebug: - aulogging.Logger.NoCtx().Debug().Print("kgo debug: " + msg) + aulogging.Logger.NoCtx().Debug().Printf("kgo %s debug: %s", l.Key, msg) return default: return diff --git a/pkg/aukafka/producer.go b/pkg/aukafka/producer.go index a3af65a..b23be6f 100644 --- a/pkg/aukafka/producer.go +++ b/pkg/aukafka/producer.go @@ -3,6 +3,7 @@ package aukafka import ( "context" "encoding/json" + "fmt" aulogging "github.com/StephanHCB/go-autumn-logging" "github.com/twmb/franz-go/pkg/kgo" @@ -17,7 +18,7 @@ func CreateProducer[V any]( config TopicConfig, customOpts ...kgo.Opt, ) (*Producer[V], error) { - opts := defaultTopicOptions(config) + opts := defaultTopicOptions(fmt.Sprintf("%s producer", config.Topic), config) opts = append(opts, customOpts...) opts = append(opts, kgo.DefaultProduceTopic(config.Topic), kgo.ProducerBatchCompression(kgo.NoCompression()))