From 07fb87b1961be29fcf05126b28be2f8623fa9115 Mon Sep 17 00:00:00 2001 From: Bilal Bakht Ahmad Date: Thu, 26 Jan 2023 13:31:20 -0800 Subject: [PATCH] Kafka Validations (#16) * webhook validations Signed-off-by: Bilal Bakht Ahmad * add Exotic Kafka installer * Change order of EventBus reconcilation Calling client.Status().Update() before calling client.Update() persists the status for exotic evnetbuses, otherwise the status is not perisisted. Signed-off-by: David Farr * requeue reconciler Signed-off-by: Bilal Bakht Ahmad * Make additional copy of obj to update eb status Signed-off-by: David Farr * oxford comma Co-authored-by: David Farr * stylistic change Signed-off-by: Bilal Bakht Ahmad Signed-off-by: Bilal Bakht Ahmad Signed-off-by: David Farr Co-authored-by: David Farr --- controllers/eventbus/controller.go | 8 ++- .../eventbus/installer/exotic_kafka.go | 43 +++++++++++++++ .../eventbus/installer/exotic_kafka_test.go | 55 +++++++++++++++++++ controllers/eventbus/installer/installer.go | 4 ++ controllers/eventbus/validate.go | 12 +++- controllers/eventbus/validate_test.go | 27 +++++++++ test/e2e/fixtures/when.go | 6 +- 7 files changed, 150 insertions(+), 5 deletions(-) create mode 100644 controllers/eventbus/installer/exotic_kafka.go create mode 100644 controllers/eventbus/installer/exotic_kafka_test.go diff --git a/controllers/eventbus/controller.go b/controllers/eventbus/controller.go index 2903a313f1..024cdc927f 100644 --- a/controllers/eventbus/controller.go +++ b/controllers/eventbus/controller.go @@ -55,14 +55,20 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if reconcileErr != nil { log.Errorw("reconcile error", zap.Error(reconcileErr)) } + + // client.Update() mutates busCopy so we need to make an + // additional copy in order to update the status. + statusCopy := busCopy.DeepCopy() + if r.needsUpdate(eventBus, busCopy) { if err := r.client.Update(ctx, busCopy); err != nil { return reconcile.Result{}, err } } - if err := r.client.Status().Update(ctx, busCopy); err != nil { + if err := r.client.Status().Update(ctx, statusCopy); err != nil { return reconcile.Result{}, err } + return ctrl.Result{}, reconcileErr } diff --git a/controllers/eventbus/installer/exotic_kafka.go b/controllers/eventbus/installer/exotic_kafka.go new file mode 100644 index 0000000000..c3032b740e --- /dev/null +++ b/controllers/eventbus/installer/exotic_kafka.go @@ -0,0 +1,43 @@ +package installer + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" +) + +// exoticKafkaInstaller is an inalleration implementation of exotic kafka config. +type exoticKafkaInstaller struct { + eventBus *v1alpha1.EventBus + + logger *zap.SugaredLogger +} + +// NewExoticKafkaInstaller return a new exoticKafkaInstaller +func NewExoticKafkaInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLogger) Installer { + return &exoticKafkaInstaller{ + eventBus: eventBus, + logger: logger.Named("exotic-kafka"), + } +} + +func (i *exoticKafkaInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) { + kafkaObj := i.eventBus.Spec.Kafka + if kafkaObj == nil || kafkaObj.Exotic == nil { + return nil, fmt.Errorf("invalid request") + } + i.eventBus.Status.MarkDeployed("Skipped", "Skip deployment because of using exotic config.") + i.logger.Info("use exotic config") + busConfig := &v1alpha1.BusConfig{ + Kafka: kafkaObj.Exotic, + } + return busConfig, nil +} + +func (i *exoticKafkaInstaller) Uninstall(ctx context.Context) error { + i.logger.Info("nothing to uninstall") + return nil +} diff --git a/controllers/eventbus/installer/exotic_kafka_test.go b/controllers/eventbus/installer/exotic_kafka_test.go new file mode 100644 index 0000000000..94887cc4bf --- /dev/null +++ b/controllers/eventbus/installer/exotic_kafka_test.go @@ -0,0 +1,55 @@ +package installer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/argoproj/argo-events/common/logging" + "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" +) + +const ( + testExoticKafkaName = "test-kafka" + testExoticKafkaURL = "kafka:9092" +) + +var ( + testKafkaExoticBus = &v1alpha1.EventBus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "EventBus", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testExoticKafkaName, + }, + Spec: v1alpha1.EventBusSpec{ + Kafka: &v1alpha1.KafkaBus{ + Exotic: &v1alpha1.KafkaConfig{ + URL: testExoticKafkaURL, + }, + }, + }, + } +) + +func TestInstallationKafkaExotic(t *testing.T) { + t.Run("installation with exotic kafka config", func(t *testing.T) { + installer := NewExoticKafkaInstaller(testKafkaExoticBus, logging.NewArgoEventsLogger()) + conf, err := installer.Install(context.TODO()) + assert.NoError(t, err) + assert.NotNil(t, conf.Kafka) + assert.Equal(t, conf.Kafka.URL, testExoticKafkaURL) + }) +} + +func TestUninstallationKafkaExotic(t *testing.T) { + t.Run("uninstallation with exotic kafka config", func(t *testing.T) { + installer := NewExoticKafkaInstaller(testKafkaExoticBus, logging.NewArgoEventsLogger()) + err := installer.Uninstall(context.TODO()) + assert.NoError(t, err) + }) +} diff --git a/controllers/eventbus/installer/installer.go b/controllers/eventbus/installer/installer.go index 29b11b898a..2eaa3f5bc2 100644 --- a/controllers/eventbus/installer/installer.go +++ b/controllers/eventbus/installer/installer.go @@ -48,6 +48,10 @@ func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, config *con } } else if js := eventBus.Spec.JetStream; js != nil { return NewJetStreamInstaller(client, eventBus, config, getLabels(eventBus), logger), nil + } else if kafka := eventBus.Spec.Kafka; kafka != nil { + if kafka.Exotic != nil { + return NewExoticKafkaInstaller(eventBus, logger), nil + } } return nil, fmt.Errorf("invalid eventbus spec") } diff --git a/controllers/eventbus/validate.go b/controllers/eventbus/validate.go index 9af22c4520..73a45983e8 100644 --- a/controllers/eventbus/validate.go +++ b/controllers/eventbus/validate.go @@ -8,8 +8,8 @@ import ( // ValidateEventBus accepts an EventBus and performs validation against it func ValidateEventBus(eb *v1alpha1.EventBus) error { - if eb.Spec.NATS == nil && eb.Spec.JetStream == nil { - return fmt.Errorf("invalid spec: either \"nats\" or \"jetstream\" needs to be specified") + if eb.Spec.NATS == nil && eb.Spec.JetStream == nil && eb.Spec.Kafka == nil { + return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\", or \"kafka\" needs to be specified") } if x := eb.Spec.NATS; x != nil { if x.Native != nil && x.Exotic != nil { @@ -36,5 +36,13 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error { return fmt.Errorf("invalid spec: a jetstream eventbus requires at least 3 replicas") } } + if x := eb.Spec.Kafka; x != nil { + if x.Exotic == nil { + return fmt.Errorf("\"exotic\" must be defined") + } + if x.Exotic.URL == "" { + return fmt.Errorf("\"spec.kafka.exotic.url\" is missing") + } + } return nil } diff --git a/controllers/eventbus/validate_test.go b/controllers/eventbus/validate_test.go index e6ce1bd763..3a16855e29 100644 --- a/controllers/eventbus/validate_test.go +++ b/controllers/eventbus/validate_test.go @@ -38,6 +38,20 @@ var ( }, }, } + + testKafkaEventBus = &v1alpha1.EventBus{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + Name: common.DefaultEventBusName, + }, + Spec: v1alpha1.EventBusSpec{ + Kafka: &v1alpha1.KafkaBus{ + Exotic: &v1alpha1.KafkaConfig{ + URL: "127.0.0.1:9092", + }, + }, + }, + } ) func TestValidate(t *testing.T) { @@ -51,6 +65,11 @@ func TestValidate(t *testing.T) { assert.NoError(t, err) }) + t.Run("test good kafka eventbus", func(t *testing.T) { + err := ValidateEventBus(testKafkaEventBus) + assert.NoError(t, err) + }) + t.Run("test bad eventbus", func(t *testing.T) { eb := testNatsEventBus.DeepCopy() eb.Spec.NATS = nil @@ -109,4 +128,12 @@ func TestValidate(t *testing.T) { err = ValidateEventBus(eb) assert.NoError(t, err) }) + + t.Run("test kafka eventbus no URL", func(t *testing.T) { + eb := testKafkaEventBus.DeepCopy() + eb.Spec.Kafka.Exotic.URL = "" + err := ValidateEventBus(eb) + assert.Error(t, err) + assert.Contains(t, err.Error(), "\"spec.kafka.exotic.url\" is missing") + }) } diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index c77fe02b9d..9be842f216 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -153,8 +153,10 @@ func (w *When) WaitForEventBusReady() *When { if err := testutil.WaitForEventBusReady(ctx, w.eventBusClient, w.eventBus.Name, defaultTimeout); err != nil { w.t.Fatal(err) } - if err := testutil.WaitForEventBusStatefulSetReady(ctx, w.kubeClient, Namespace, w.eventBus.Name, 2*time.Minute); err != nil { - w.t.Fatal(err) + if w.eventBus.Spec.Kafka == nil { // not needed for kafka (exotic only) + if err := testutil.WaitForEventBusStatefulSetReady(ctx, w.kubeClient, Namespace, w.eventBus.Name, 2*time.Minute); err != nil { + w.t.Fatal(err) + } } return w }