Skip to content

Commit

Permalink
Revert package renaming for now and add backwards compatible methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Rieb, Elias committed Dec 6, 2023
1 parent e13949a commit 8fa14c6
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 17 deletions.
36 changes: 29 additions & 7 deletions pkg/kafka/config.go → pkg/aukafka/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package aukafka

import (
"encoding/json"
Expand Down Expand Up @@ -71,7 +71,7 @@ func DefaultConfigItems() []auconfigapi.ConfigItem {
Default: "{}",
Description: "configuration consisting of topic keys (not necessarily the topic name, rather the key used by the application to produce events for or consume of specific topics) and their respective authentication",
Validate: func(value string) error {
_, err := parseTopicConfigs(value)
_, err := ParseTopicConfigs(value)
return err
},
},
Expand All @@ -88,13 +88,13 @@ func ObtainDefaultConfig(provider ValuesProvider) (*DefaultConfigImpl, error) {
return nil, fmt.Errorf("failed to obtain configuration values: %s", err.Error())
}

vTopicConfigs, _ := parseTopicConfigs(values[DefaultKeyKafkaTopicsConfig])
vTopicConfigs, _ := ParseTopicConfigs(values[DefaultKeyKafkaTopicsConfig])
return &DefaultConfigImpl{
vTopicConfigs: vTopicConfigs,
}, nil
}

func parseTopicConfigs(jsonString string) (map[string]TopicConfig, error) {
func ParseTopicConfigs(jsonString string) (map[string]TopicConfig, error) {
rawConfigs := make(map[string]rawTopicConfig)
if err := json.Unmarshal([]byte(jsonString), &rawConfigs); err != nil {
return nil, err
Expand All @@ -107,15 +107,15 @@ func parseTopicConfigs(jsonString string) (map[string]TopicConfig, error) {
if rawConfig.PasswordEnvVar != nil {
password = auconfigenv.Get(*rawConfig.PasswordEnvVar)
if password == "" {
return nil, fmt.Errorf("kafka-topic %s password environment variable %s is empty", rawConfig.Topic, rawConfig.PasswordEnvVar)
return nil, fmt.Errorf("aukafka-topic %s password environment variable %s is empty", rawConfig.Topic, *rawConfig.PasswordEnvVar)
}
} else if rawConfig.Password != nil {
password = auconfigenv.Get(*rawConfig.PasswordEnvVar)
if password == "" {
return nil, fmt.Errorf("kafka-topic %s password value is empty", rawConfig.Topic)
return nil, fmt.Errorf("aukafka-topic %s password value is empty", rawConfig.Topic)
}
} else {
return nil, fmt.Errorf("kafka-topic %s neither password environment variable or password value is set", rawConfig.Topic)
return nil, fmt.Errorf("aukafka-topic %s neither password environment variable or password value is set", rawConfig.Topic)
}

topicConfigs[key] = TopicConfig{
Expand All @@ -129,3 +129,25 @@ func parseTopicConfigs(jsonString string) (map[string]TopicConfig, error) {
}
return topicConfigs, nil
}

// DEPRECATED

func (c *DefaultConfigImpl) ConfigItems() []auconfigapi.ConfigItem {
return []auconfigapi.ConfigItem{
{
Key: DefaultKeyKafkaTopicsConfig,
EnvName: DefaultKeyKafkaTopicsConfig,
Default: "{}",
Description: "configuration consisting of topic keys (not necessarily the topic name, rather the key used by the application to produce events for or consume of specific topics) and their respective authentication",
Validate: func(key string) error {
value := auconfigenv.Get(key)
_, err := ParseTopicConfigs(value)
return err
},
},
}
}

func (c *DefaultConfigImpl) Obtain(getter func(key string) string) {
c.vTopicConfigs, _ = ParseTopicConfigs(getter(DefaultKeyKafkaTopicsConfig))
}
12 changes: 6 additions & 6 deletions pkg/kafka/consumer.go → pkg/aukafka/consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package aukafka

import (
"context"
Expand All @@ -17,7 +17,7 @@ type Consumer[E any] struct {
receiveCallback func(ctx context.Context, key *string, event *E, timestamp time.Time) error
}

func NewConsumer[E any](
func CreateConsumer[E any](
ctx context.Context,
topicConfig TopicConfig,
receiveCallback func(context.Context, *string, *E, time.Time) error,
Expand Down Expand Up @@ -50,15 +50,15 @@ func NewConsumer[E any](
func (c *Consumer[E]) Close(ctx context.Context) {
err := c.client.Close()
if err != nil {
aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close kafka consumer")
aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close aukafka consumer")
}
}

func (c *Consumer[E]) run(ctx context.Context) {
defer func() {
r := recover()
if err, ok := r.(error); ok {
aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("caught panic in kafka consumer")
aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("caught panic in aukafka consumer")
}
}()

Expand All @@ -70,7 +70,7 @@ func (c *Consumer[E]) run(ctx context.Context) {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("kafka consumer returned with error")
aulogging.Logger.Ctx(ctx).Error().WithErr(err).Print("aukafka consumer returned with error")
}
}
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *Consumer[E]) ConsumeClaim(
session.MarkMessage(message, "")
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when aukafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/kafka/producer_sync.go → pkg/aukafka/producer_sync.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package aukafka

import (
"context"
Expand All @@ -14,7 +14,7 @@ type SyncProducer[V any] struct {
topic string
}

func NewSyncProducer[V any](
func CreateSyncProducer[V any](
_ context.Context,
topicConfig TopicConfig,
configPreset *sarama.Config,
Expand Down Expand Up @@ -68,6 +68,6 @@ func (p *SyncProducer[V]) Produce(
func (p *SyncProducer[E]) Close(ctx context.Context) {
err := p.client.Close()
if err != nil {
aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close kafka producer")
aulogging.Logger.Ctx(ctx).Warn().WithErr(err).Print("failed to close aukafka producer")
}
}
2 changes: 1 addition & 1 deletion pkg/kafka/scramclient.go → pkg/aukafka/scramclient.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package aukafka

import (
"crypto/sha256"
Expand Down

0 comments on commit 8fa14c6

Please sign in to comment.