Skip to content

Commit

Permalink
Merge pull request #26 from intuit-data-os/feature/kafka-eventbus-upd…
Browse files Browse the repository at this point in the history
…ates

Signed-off-by: David Farr <david_farr@intuit.com>
  • Loading branch information
dfarr authored Mar 21, 2023
2 parents 7cee7eb + f1aad5a commit 1cdc903
Show file tree
Hide file tree
Showing 31 changed files with 506 additions and 561 deletions.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ test:
go test $(shell go list ./... | grep -v /vendor/ | grep -v /test/e2e/) -race -short -v

test-functional:
ifeq ($(EventBusDriver),kafka)
kubectl -n argo-events apply -k test/manifests/kafka
kubectl -n argo-events wait -l statefulset.kubernetes.io/pod-name=kafka-0 --for=condition=ready pod --timeout=60s
endif
go test -v -timeout 20m -count 1 --tags functional -p 1 ./test/e2e
ifeq ($(EventBusDriver),kafka)
kubectl -n argo-events delete -k test/manifests/kafka
endif

# to run just one of the functional e2e tests by name (i.e. 'make TestMetricsWithWebhook'):
Test%:
Expand Down Expand Up @@ -141,8 +148,6 @@ docs/assets/diagram.png: go-diagrams/diagram.dot
start: image
kubectl apply -f test/manifests/argo-events-ns.yaml
kubectl kustomize test/manifests | sed 's@quay.io/argoproj/@$(IMAGE_NAMESPACE)/@' | sed 's/:$(BASE_VERSION)/:$(VERSION)/' | kubectl -n argo-events apply -l app.kubernetes.io/part-of=argo-events --prune=false --force -f -

sleep 10
kubectl -n argo-events wait --for=condition=Ready --timeout 60s pod --all

$(GOPATH)/bin/golangci-lint:
Expand Down
39 changes: 4 additions & 35 deletions api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 4 additions & 38 deletions api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 1 addition & 10 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamConfig"
},
"kafka": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaConfig"
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaBus"
},
"nats": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.NATSConfig"
Expand Down Expand Up @@ -525,15 +525,6 @@
},
"io.argoproj.eventbus.v1alpha1.KafkaBus": {
"description": "KafkaBus holds the KafkaBus EventBus information",
"properties": {
"exotic": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaConfig",
"description": "Exotic holds an exotic Kafka config"
}
},
"type": "object"
},
"io.argoproj.eventbus.v1alpha1.KafkaConfig": {
"properties": {
"consumerGroup": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaConsumerGroup",
Expand Down
11 changes: 1 addition & 10 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, kubeClient
} else if js := eventBus.Spec.JetStream; js != nil {
return NewJetStreamInstaller(client, eventBus, config, getLabels(eventBus), kubeClient, logger), nil
} else if kafka := eventBus.Spec.Kafka; kafka != nil {
if kafka.Exotic != nil {
return NewExoticKafkaInstaller(eventBus, logger), nil
}
return NewExoticKafkaInstaller(eventBus, logger), nil
}
return nil, fmt.Errorf("invalid eventbus spec")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ func NewExoticKafkaInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLog

func (i *exoticKafkaInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
kafkaObj := i.eventBus.Spec.Kafka
if kafkaObj == nil || kafkaObj.Exotic == nil {
if kafkaObj == nil {
return nil, fmt.Errorf("invalid request")
}
if kafkaObj.Exotic.Topic == "" {
kafkaObj.Exotic.Topic = fmt.Sprintf("%s-%s", i.eventBus.Namespace, i.eventBus.Name)
if kafkaObj.Topic == "" {
kafkaObj.Topic = fmt.Sprintf("%s-%s", i.eventBus.Namespace, i.eventBus.Name)
}

i.eventBus.Status.MarkDeployed("Skipped", "Skip deployment because of using exotic config.")
i.logger.Info("use exotic config")
busConfig := &v1alpha1.BusConfig{
Kafka: kafkaObj.Exotic,
Kafka: kafkaObj,
}
return busConfig, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

const (
testExoticKafkaName = "test-kafka"
testExoticKafkaURL = "kafka:9092"
testKafkaName = "test-kafka"
testKafkaURL = "kafka:9092"
)

var (
Expand All @@ -24,13 +24,11 @@ var (
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testExoticKafkaName,
Name: testKafkaName,
},
Spec: v1alpha1.EventBusSpec{
Kafka: &v1alpha1.KafkaBus{
Exotic: &v1alpha1.KafkaConfig{
URL: testExoticKafkaURL,
},
URL: testKafkaURL,
},
},
}
Expand All @@ -42,7 +40,7 @@ func TestInstallationKafkaExotic(t *testing.T) {
conf, err := installer.Install(context.TODO())
assert.NoError(t, err)
assert.NotNil(t, conf.Kafka)
assert.Equal(t, conf.Kafka.URL, testExoticKafkaURL)
assert.Equal(t, conf.Kafka.URL, testKafkaURL)
})
}

Expand Down
7 changes: 2 additions & 5 deletions controllers/eventbus/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error {
}
}
if x := eb.Spec.Kafka; x != nil {
if x.Exotic == nil {
return fmt.Errorf("\"exotic\" must be defined")
}
if x.Exotic.URL == "" {
return fmt.Errorf("\"spec.kafka.exotic.url\" is missing")
if x.URL == "" {
return fmt.Errorf("\"spec.kafka.url\" is missing")
}
}
return nil
Expand Down
8 changes: 3 additions & 5 deletions controllers/eventbus/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ var (
},
Spec: v1alpha1.EventBusSpec{
Kafka: &v1alpha1.KafkaBus{
Exotic: &v1alpha1.KafkaConfig{
URL: "127.0.0.1:9092",
},
URL: "127.0.0.1:9092",
},
},
}
Expand Down Expand Up @@ -131,9 +129,9 @@ func TestValidate(t *testing.T) {

t.Run("test kafka eventbus no URL", func(t *testing.T) {
eb := testKafkaEventBus.DeepCopy()
eb.Spec.Kafka.Exotic.URL = ""
eb.Spec.Kafka.URL = ""
err := ValidateEventBus(eb)
assert.Error(t, err)
assert.Contains(t, err.Error(), "\"spec.kafka.exotic.url\" is missing")
assert.Contains(t, err.Error(), "\"spec.kafka.url\" is missing")
})
}
46 changes: 46 additions & 0 deletions controllers/eventsource/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,52 @@ var (
},
},
}

fakeEventBusJetstream = &eventbusv1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: eventbusv1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: common.DefaultEventBusName,
},
Spec: eventbusv1alpha1.EventBusSpec{
JetStream: &eventbusv1alpha1.JetStreamBus{
Version: "x.x.x",
},
},
Status: eventbusv1alpha1.EventBusStatus{
Config: eventbusv1alpha1.BusConfig{
JetStream: &eventbusv1alpha1.JetStreamConfig{
URL: "nats://xxxx",
},
},
},
}

fakeEventBusKafka = &eventbusv1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: eventbusv1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: common.DefaultEventBusName,
},
Spec: eventbusv1alpha1.EventBusSpec{
Kafka: &eventbusv1alpha1.KafkaBus{
URL: "localhost:9092",
},
},
Status: eventbusv1alpha1.EventBusStatus{
Config: eventbusv1alpha1.BusConfig{
Kafka: &eventbusv1alpha1.KafkaBus{
URL: "localhost:9092",
},
},
},
}
)

func fakeEmptyEventSource() *v1alpha1.EventSource {
Expand Down
Loading

0 comments on commit 1cdc903

Please sign in to comment.