Skip to content

Commit

Permalink
Remove exotic from kafka eventbus spec
Browse files Browse the repository at this point in the history
Signed-off-by: David Farr <david_farr@intuit.com>
  • Loading branch information
dfarr committed Mar 20, 2023
1 parent 94d514c commit 7708c68
Show file tree
Hide file tree
Showing 18 changed files with 170 additions and 489 deletions.
39 changes: 4 additions & 35 deletions api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 4 additions & 38 deletions api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 1 addition & 10 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamConfig"
},
"kafka": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaConfig"
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaBus"
},
"nats": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.NATSConfig"
Expand Down Expand Up @@ -525,15 +525,6 @@
},
"io.argoproj.eventbus.v1alpha1.KafkaBus": {
"description": "KafkaBus holds the KafkaBus EventBus information",
"properties": {
"exotic": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaConfig",
"description": "Exotic holds an exotic Kafka config"
}
},
"type": "object"
},
"io.argoproj.eventbus.v1alpha1.KafkaConfig": {
"properties": {
"consumerGroup": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaConsumerGroup",
Expand Down
11 changes: 1 addition & 10 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, kubeClient
} else if js := eventBus.Spec.JetStream; js != nil {
return NewJetStreamInstaller(client, eventBus, config, getLabels(eventBus), kubeClient, logger), nil
} else if kafka := eventBus.Spec.Kafka; kafka != nil {
if kafka.Exotic != nil {
return NewExoticKafkaInstaller(eventBus, logger), nil
}
return NewExoticKafkaInstaller(eventBus, logger), nil
}
return nil, fmt.Errorf("invalid eventbus spec")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ func NewExoticKafkaInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLog

func (i *exoticKafkaInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
kafkaObj := i.eventBus.Spec.Kafka
if kafkaObj == nil || kafkaObj.Exotic == nil {
if kafkaObj == nil {
return nil, fmt.Errorf("invalid request")
}
if kafkaObj.Exotic.Topic == "" {
kafkaObj.Exotic.Topic = fmt.Sprintf("%s-%s", i.eventBus.Namespace, i.eventBus.Name)
if kafkaObj.Topic == "" {
kafkaObj.Topic = fmt.Sprintf("%s-%s", i.eventBus.Namespace, i.eventBus.Name)
}

i.eventBus.Status.MarkDeployed("Skipped", "Skip deployment because of using exotic config.")
i.logger.Info("use exotic config")
busConfig := &v1alpha1.BusConfig{
Kafka: kafkaObj.Exotic,
Kafka: kafkaObj,
}
return busConfig, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

const (
testExoticKafkaName = "test-kafka"
testExoticKafkaURL = "kafka:9092"
testKafkaName = "test-kafka"
testKafkaURL = "kafka:9092"
)

var (
Expand All @@ -24,13 +24,11 @@ var (
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testExoticKafkaName,
Name: testKafkaName,
},
Spec: v1alpha1.EventBusSpec{
Kafka: &v1alpha1.KafkaBus{
Exotic: &v1alpha1.KafkaConfig{
URL: testExoticKafkaURL,
},
URL: testKafkaURL,
},
},
}
Expand All @@ -42,7 +40,7 @@ func TestInstallationKafkaExotic(t *testing.T) {
conf, err := installer.Install(context.TODO())
assert.NoError(t, err)
assert.NotNil(t, conf.Kafka)
assert.Equal(t, conf.Kafka.URL, testExoticKafkaURL)
assert.Equal(t, conf.Kafka.URL, testKafkaURL)
})
}

Expand Down
7 changes: 2 additions & 5 deletions controllers/eventbus/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error {
}
}
if x := eb.Spec.Kafka; x != nil {
if x.Exotic == nil {
return fmt.Errorf("\"exotic\" must be defined")
}
if x.Exotic.URL == "" {
return fmt.Errorf("\"spec.kafka.exotic.url\" is missing")
if x.URL == "" {
return fmt.Errorf("\"spec.kafka.url\" is missing")
}
}
return nil
Expand Down
8 changes: 3 additions & 5 deletions controllers/eventbus/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ var (
},
Spec: v1alpha1.EventBusSpec{
Kafka: &v1alpha1.KafkaBus{
Exotic: &v1alpha1.KafkaConfig{
URL: "127.0.0.1:9092",
},
URL: "127.0.0.1:9092",
},
},
}
Expand Down Expand Up @@ -131,9 +129,9 @@ func TestValidate(t *testing.T) {

t.Run("test kafka eventbus no URL", func(t *testing.T) {
eb := testKafkaEventBus.DeepCopy()
eb.Spec.Kafka.Exotic.URL = ""
eb.Spec.Kafka.URL = ""
err := ValidateEventBus(eb)
assert.Error(t, err)
assert.Contains(t, err.Error(), "\"spec.kafka.exotic.url\" is missing")
assert.Contains(t, err.Error(), "\"spec.kafka.url\" is missing")
})
}
4 changes: 2 additions & 2 deletions eventbus/kafka/base/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

type Kafka struct {
Logger *zap.SugaredLogger
config *eventbusv1alpha1.KafkaConfig
config *eventbusv1alpha1.KafkaBus
}

func NewKafka(config *eventbusv1alpha1.KafkaConfig, logger *zap.SugaredLogger) *Kafka {
func NewKafka(config *eventbusv1alpha1.KafkaBus, logger *zap.SugaredLogger) *Kafka {
// set defaults
if config.ConsumerGroup == nil {
config.ConsumerGroup = &eventbusv1alpha1.KafkaConsumerGroup{}
Expand Down
2 changes: 1 addition & 1 deletion eventbus/kafka/eventsource/source_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type KafkaSource struct {
topic string
}

func NewKafkaSource(config *eventbusv1alpha1.KafkaConfig, logger *zap.SugaredLogger) *KafkaSource {
func NewKafkaSource(config *eventbusv1alpha1.KafkaBus, logger *zap.SugaredLogger) *KafkaSource {
return &KafkaSource{
Kafka: base.NewKafka(config, logger),
topic: config.Topic,
Expand Down
2 changes: 1 addition & 1 deletion eventbus/kafka/sensor/kafka_sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type KafkaSensor struct {
connected bool
}

func NewKafkaSensor(kafkaConfig *eventbusv1alpha1.KafkaConfig, sensor *sensorv1alpha1.Sensor, hostname string, logger *zap.SugaredLogger) *KafkaSensor {
func NewKafkaSensor(kafkaConfig *eventbusv1alpha1.KafkaBus, sensor *sensorv1alpha1.Sensor, hostname string, logger *zap.SugaredLogger) *KafkaSensor {
topics := &Topics{
event: kafkaConfig.Topic,
trigger: fmt.Sprintf("%s-%s-%s", kafkaConfig.Topic, sensor.Name, "trigger"),
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/eventbus/v1alpha1/eventbus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type BusConfig struct {
// +optional
JetStream *JetStreamConfig `json:"jetstream,omitempty" protobuf:"bytes,2,opt,name=jetstream"`
// +optional
Kafka *KafkaConfig `json:"kafka,omitempty" protobuf:"bytes,3,opt,name=kafka"`
Kafka *KafkaBus `json:"kafka,omitempty" protobuf:"bytes,3,opt,name=kafka"`
}

const (
Expand Down
Loading

0 comments on commit 7708c68

Please sign in to comment.