From 8fa14c653ddb71772cbacf24f1f5505a762b5a46 Mon Sep 17 00:00:00 2001 From: "Rieb, Elias" Date: Wed, 6 Dec 2023 09:20:41 +0100 Subject: [PATCH] Revert package renaming for now and add backwards compatible methods --- pkg/{kafka => aukafka}/config.go | 36 ++++++++++++++++++++----- pkg/{kafka => aukafka}/consumer.go | 12 ++++----- pkg/{kafka => aukafka}/producer_sync.go | 6 ++--- pkg/{kafka => aukafka}/scramclient.go | 2 +- 4 files changed, 39 insertions(+), 17 deletions(-) rename pkg/{kafka => aukafka}/config.go (74%) rename pkg/{kafka => aukafka}/consumer.go (94%) rename pkg/{kafka => aukafka}/producer_sync.go (95%) rename pkg/{kafka => aukafka}/scramclient.go (98%) diff --git a/pkg/kafka/config.go b/pkg/aukafka/config.go similarity index 74% rename from pkg/kafka/config.go rename to pkg/aukafka/config.go index c0074db..cc14069 100644 --- a/pkg/kafka/config.go +++ b/pkg/aukafka/config.go @@ -1,4 +1,4 @@ -package kafka +package aukafka import ( "encoding/json" @@ -71,7 +71,7 @@ func DefaultConfigItems() []auconfigapi.ConfigItem { Default: "{}", Description: "configuration consisting of topic keys (not necessarily the topic name, rather the key used by the application to produce events for or consume of specific topics) and their respective authentication", Validate: func(value string) error { - _, err := parseTopicConfigs(value) + _, err := ParseTopicConfigs(value) return err }, }, @@ -88,13 +88,13 @@ func ObtainDefaultConfig(provider ValuesProvider) (*DefaultConfigImpl, error) { return nil, fmt.Errorf("failed to obtain configuration values: %s", err.Error()) } - vTopicConfigs, _ := parseTopicConfigs(values[DefaultKeyKafkaTopicsConfig]) + vTopicConfigs, _ := ParseTopicConfigs(values[DefaultKeyKafkaTopicsConfig]) return &DefaultConfigImpl{ vTopicConfigs: vTopicConfigs, }, nil } -func parseTopicConfigs(jsonString string) (map[string]TopicConfig, error) { +func ParseTopicConfigs(jsonString string) (map[string]TopicConfig, error) { rawConfigs := make(map[string]rawTopicConfig) if err := json.Unmarshal([]byte(jsonString), &rawConfigs); err != nil { return nil, err @@ -107,15 +107,15 @@ func parseTopicConfigs(jsonString string) (map[string]TopicConfig, error) { if rawConfig.PasswordEnvVar != nil { password = auconfigenv.Get(*rawConfig.PasswordEnvVar) if password == "" { - return nil, fmt.Errorf("kafka-topic %s password environment variable %s is empty", rawConfig.Topic, rawConfig.PasswordEnvVar) + return nil, fmt.Errorf("aukafka-topic %s password environment variable %s is empty", rawConfig.Topic, *rawConfig.PasswordEnvVar) } } else if rawConfig.Password != nil { password = auconfigenv.Get(*rawConfig.PasswordEnvVar) if password == "" { - return nil, fmt.Errorf("kafka-topic %s password value is empty", rawConfig.Topic) + return nil, fmt.Errorf("aukafka-topic %s password value is empty", rawConfig.Topic) } } else { - return nil, fmt.Errorf("kafka-topic %s neither password environment variable or password value is set", rawConfig.Topic) + return nil, fmt.Errorf("aukafka-topic %s neither password environment variable or password value is set", rawConfig.Topic) } topicConfigs[key] = TopicConfig{ @@ -129,3 +129,25 @@ func parseTopicConfigs(jsonString string) (map[string]TopicConfig, error) { } return topicConfigs, nil } + +// DEPRECATED + +func (c *DefaultConfigImpl) ConfigItems() []auconfigapi.ConfigItem { + return []auconfigapi.ConfigItem{ + { + Key: DefaultKeyKafkaTopicsConfig, + EnvName: DefaultKeyKafkaTopicsConfig, + Default: "{}", + Description: "configuration consisting of topic keys (not necessarily the topic name, rather the key used by the application to produce events for or consume of specific topics) and their respective authentication", + Validate: func(key string) error { + value := auconfigenv.Get(key) + _, err := ParseTopicConfigs(value) + return err + }, + }, + } +} + +func (c *DefaultConfigImpl) Obtain(getter func(key string) string) { + c.vTopicConfigs, _ = ParseTopicConfigs(getter(DefaultKeyKafkaTopicsConfig)) +} diff --git a/pkg/kafka/consumer.go b/pkg/aukafka/consumer.go similarity index 94% rename from pkg/kafka/consumer.go rename to pkg/aukafka/consumer.go index 4e52d6f..1eaadf6 100644 --- a/pkg/kafka/consumer.go +++ b/pkg/aukafka/consumer.go @@ -1,4 +1,4 @@ -package kafka +package aukafka import ( "context" @@ -17,7 +17,7 @@ type Consumer[E any] struct { receiveCallback func(ctx context.Context, key *string, event *E, timestamp time.Time) error } -func NewConsumer[E any]( +func CreateConsumer[E any]( ctx context.Context, topicConfig TopicConfig, receiveCallback func(context.Context, *string, *E, time.Time) error, @@ -50,7 +50,7 @@ func NewConsumer[E any]( func (c *Consumer[E]) Close(ctx context.Context) { err := c.client.Close() if err != nil { - aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close kafka consumer") + aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close aukafka consumer") } } @@ -58,7 +58,7 @@ func (c *Consumer[E]) run(ctx context.Context) { defer func() { r := recover() if err, ok := r.(error); ok { - aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("caught panic in kafka consumer") + aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("caught panic in aukafka consumer") } }() @@ -70,7 +70,7 @@ func (c *Consumer[E]) run(ctx context.Context) { if errors.Is(err, sarama.ErrClosedConsumerGroup) { return } - aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("kafka consumer returned with error") + aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("aukafka consumer returned with error") } } } @@ -124,7 +124,7 @@ func (c *Consumer[E]) ConsumeClaim( session.MarkMessage(message, "") } // Should return when `session.Context()` is done. - // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when aukafka rebalance. see: // https://github.com/IBM/sarama/issues/1192 case <-session.Context().Done(): return nil diff --git a/pkg/kafka/producer_sync.go b/pkg/aukafka/producer_sync.go similarity index 95% rename from pkg/kafka/producer_sync.go rename to pkg/aukafka/producer_sync.go index e16f6be..409dc43 100644 --- a/pkg/kafka/producer_sync.go +++ b/pkg/aukafka/producer_sync.go @@ -1,4 +1,4 @@ -package kafka +package aukafka import ( "context" @@ -14,7 +14,7 @@ type SyncProducer[V any] struct { topic string } -func NewSyncProducer[V any]( +func CreateSyncProducer[V any]( _ context.Context, topicConfig TopicConfig, configPreset *sarama.Config, @@ -68,6 +68,6 @@ func (p *SyncProducer[V]) Produce( func (p *SyncProducer[E]) Close(ctx context.Context) { err := p.client.Close() if err != nil { - aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close kafka producer") + aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close aukafka producer") } } diff --git a/pkg/kafka/scramclient.go b/pkg/aukafka/scramclient.go similarity index 98% rename from pkg/kafka/scramclient.go rename to pkg/aukafka/scramclient.go index 08b98a8..db9b4df 100644 --- a/pkg/kafka/scramclient.go +++ b/pkg/aukafka/scramclient.go @@ -1,4 +1,4 @@ -package kafka +package aukafka import ( "crypto/sha256"