Skip to content

Commit

Permalink
Provide type-safe consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Rieb, Elias committed Oct 30, 2023
1 parent b46b04c commit 3470440
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pkg/aukafka/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/twmb/franz-go/pkg/sasl/scram"
)

func defaultTopicOptions(config TopicConfig) []kgo.Opt {
func defaultTopicOptions(logKey string, config TopicConfig) []kgo.Opt {
tlsDialer := &tls.Dialer{
NetDialer: &net.Dialer{Timeout: 10 * time.Second},
Config: &tls.Config{InsecureSkipVerify: true},
Expand Down Expand Up @@ -40,7 +40,7 @@ func defaultTopicOptions(config TopicConfig) []kgo.Opt {
kgo.SessionTimeout(30 * time.Second),
kgo.RequestRetries(2),
kgo.RetryTimeout(5 * time.Second),
kgo.WithLogger(Logger{}),
kgo.WithLogger(Logger{Key: logKey}),
}

return opts
Expand Down
49 changes: 26 additions & 23 deletions pkg/aukafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ import (
"context"
"encoding/json"
"fmt"
"time"

aulogging "github.com/StephanHCB/go-autumn-logging"
"github.com/twmb/franz-go/pkg/kgo"
)

type Consumer[E any] struct {
client *kgo.Client

receiveCallback func(ctx context.Context, event E) error
client *kgo.Client
receiveCallback func(ctx context.Context, key *string, event *E, timestamp time.Time) error
}

func CreateConsumer[E any](
ctx context.Context,
config TopicConfig,
receiveCallback func(context.Context, E) error,
receiveCallback func(context.Context, *string, *E, time.Time) error,
customOpts ...kgo.Opt,
) (*Consumer[E], error) {
opts := defaultTopicOptions(config)
opts := defaultTopicOptions(fmt.Sprintf("%s consumer", config.Topic), config)
opts = append(opts, customOpts...)
opts = append(opts, kgo.ConsumerGroup(*config.ConsumerGroup), kgo.ConsumeTopics(config.Topic))

Expand All @@ -44,32 +44,35 @@ func (c *Consumer[E]) Stop() {
c.client.Close()
}

func (c *Consumer[E]) run(ctx context.Context) error {
func (c *Consumer[E]) run(
ctx context.Context,
) {
for {
fetches := c.client.PollFetches(context.Background())
fetches := c.client.PollFetches(ctx)
if fetches.IsClientClosed() {
aulogging.Logger.NoCtx().Info().Print("receive loop ending, kafka client was closed")
return nil
aulogging.Logger.NoCtx().Info().Print("kafka client closed, stopping consumer")
return
}
aulogging.Logger.NoCtx().Debug().Printf("receive loop found %d fetches", len(fetches))
aulogging.Logger.NoCtx().Debug().Printf("received %s fetches", len(fetches))

var firstError error = nil
fetches.EachError(func(t string, p int32, err error) {
if firstError == nil {
firstError = fmt.Errorf("receive loop fetch error topic %s partition %d: %v", t, p, err)
}
aulogging.Logger.NoCtx().Error().WithErr(err).Printf("fetch error occurred for partition %d of topic %s", p, t)
})
if firstError != nil {
aulogging.Logger.NoCtx().Error().WithErr(firstError).Print("receive loop terminated abnormally: %v", firstError)
return firstError
}

fetches.EachRecord(func(record *kgo.Record) {
aulogging.Logger.NoCtx().Info().Printf("received kafka message: %s", string(record.Value))
var event E
if err := json.Unmarshal(record.Value, &event); err != nil {
aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Printf("failed to parse kafka message: %s", string(record.Value))
c.receiveCallback(context.TODO(), event)
key := new(string)
if record.Key != nil {
*key = string(record.Key)
}
event := new(E)
if record.Value != nil {
if err := json.Unmarshal(record.Value, &event); err != nil {
aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Printf("failed to unmarshal event")
return
}
}
if err := c.receiveCallback(ctx, key, event, record.Timestamp); err != nil {
aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Printf("failed to perform event callback")
}
})
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/aukafka/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

type Logger struct{}
type Logger struct {
Key string
}

func (l Logger) Level() kgo.LogLevel {
return kgo.LogLevelInfo // set to Debug to see all output
return kgo.LogLevelDebug // set to Debug to see all output
}

func (l Logger) Log(level kgo.LogLevel, msg string, keyvals ...any) {
func (l Logger) Log(level kgo.LogLevel, msg string, _ ...any) {
switch level {
case kgo.LogLevelError:
aulogging.Logger.NoCtx().Error().Print("kgo error: " + msg)
aulogging.Logger.NoCtx().Error().Printf("kgo %s error: %s", l.Key, msg)
return
case kgo.LogLevelWarn:
aulogging.Logger.NoCtx().Warn().Print("kgo warning: " + msg)
aulogging.Logger.NoCtx().Warn().Printf("kgo %s warning: %s", l.Key, msg)
return
case kgo.LogLevelInfo:
aulogging.Logger.NoCtx().Info().Print("kgo info: " + msg)
aulogging.Logger.NoCtx().Info().Printf("kgo %s info: %s", l.Key, msg)
return
case kgo.LogLevelDebug:
aulogging.Logger.NoCtx().Debug().Print("kgo debug: " + msg)
aulogging.Logger.NoCtx().Debug().Printf("kgo %s debug: %s", l.Key, msg)
return
default:
return
Expand Down
3 changes: 2 additions & 1 deletion pkg/aukafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aukafka
import (
"context"
"encoding/json"
"fmt"

aulogging "github.com/StephanHCB/go-autumn-logging"
"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -17,7 +18,7 @@ func CreateProducer[V any](
config TopicConfig,
customOpts ...kgo.Opt,
) (*Producer[V], error) {
opts := defaultTopicOptions(config)
opts := defaultTopicOptions(fmt.Sprintf("%s producer", config.Topic), config)
opts = append(opts, customOpts...)
opts = append(opts, kgo.DefaultProduceTopic(config.Topic), kgo.ProducerBatchCompression(kgo.NoCompression()))

Expand Down

0 comments on commit 3470440

Please sign in to comment.