From 317e98d1861627093c0f8563df09b318d7edbd98 Mon Sep 17 00:00:00 2001 From: Bilal Bakht Ahmad Date: Tue, 24 Jan 2023 17:48:06 -0800 Subject: [PATCH 1/7] webhook validations Signed-off-by: Bilal Bakht Ahmad --- controllers/eventbus/validate.go | 13 +++++++++++-- controllers/eventbus/validate_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/controllers/eventbus/validate.go b/controllers/eventbus/validate.go index 9af22c4520..83136616fd 100644 --- a/controllers/eventbus/validate.go +++ b/controllers/eventbus/validate.go @@ -8,8 +8,8 @@ import ( // ValidateEventBus accepts an EventBus and performs validation against it func ValidateEventBus(eb *v1alpha1.EventBus) error { - if eb.Spec.NATS == nil && eb.Spec.JetStream == nil { - return fmt.Errorf("invalid spec: either \"nats\" or \"jetstream\" needs to be specified") + if eb.Spec.NATS == nil && eb.Spec.JetStream == nil && eb.Spec.Kafka == nil { + return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\" or \"kafka\" needs to be specified") } if x := eb.Spec.NATS; x != nil { if x.Native != nil && x.Exotic != nil { @@ -36,5 +36,14 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error { return fmt.Errorf("invalid spec: a jetstream eventbus requires at least 3 replicas") } } + if x := eb.Spec.Kafka; x != nil { + if x.Exotic == nil { + return fmt.Errorf("\"exotic\" must be defined") + } + e := x.Exotic + if e.URL == "" { + return fmt.Errorf("\"spec.kafka.exotic.url\" is missing") + } + } return nil } diff --git a/controllers/eventbus/validate_test.go b/controllers/eventbus/validate_test.go index e6ce1bd763..3a16855e29 100644 --- a/controllers/eventbus/validate_test.go +++ b/controllers/eventbus/validate_test.go @@ -38,6 +38,20 @@ var ( }, }, } + + testKafkaEventBus = &v1alpha1.EventBus{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + Name: common.DefaultEventBusName, + }, + Spec: v1alpha1.EventBusSpec{ + Kafka: &v1alpha1.KafkaBus{ + Exotic: &v1alpha1.KafkaConfig{ + URL: "127.0.0.1:9092", + }, + }, + }, + } ) func TestValidate(t *testing.T) { @@ -51,6 +65,11 @@ func TestValidate(t *testing.T) { assert.NoError(t, err) }) + t.Run("test good kafka eventbus", func(t *testing.T) { + err := ValidateEventBus(testKafkaEventBus) + assert.NoError(t, err) + }) + t.Run("test bad eventbus", func(t *testing.T) { eb := testNatsEventBus.DeepCopy() eb.Spec.NATS = nil @@ -109,4 +128,12 @@ func TestValidate(t *testing.T) { err = ValidateEventBus(eb) assert.NoError(t, err) }) + + t.Run("test kafka eventbus no URL", func(t *testing.T) { + eb := testKafkaEventBus.DeepCopy() + eb.Spec.Kafka.Exotic.URL = "" + err := ValidateEventBus(eb) + assert.Error(t, err) + assert.Contains(t, err.Error(), "\"spec.kafka.exotic.url\" is missing") + }) } From 3a088818724b27c6c2b3a06603213f80d5990537 Mon Sep 17 00:00:00 2001 From: Bilal Bakht Ahmad Date: Tue, 24 Jan 2023 18:59:43 -0800 Subject: [PATCH 2/7] add Exotic Kafka installer --- .../eventbus/installer/exotic_kafka.go | 43 +++++++++++++++ .../eventbus/installer/exotic_kafka_test.go | 55 +++++++++++++++++++ controllers/eventbus/installer/installer.go | 4 ++ 3 files changed, 102 insertions(+) create mode 100644 controllers/eventbus/installer/exotic_kafka.go create mode 100644 controllers/eventbus/installer/exotic_kafka_test.go diff --git a/controllers/eventbus/installer/exotic_kafka.go b/controllers/eventbus/installer/exotic_kafka.go new file mode 100644 index 0000000000..c3032b740e --- /dev/null +++ b/controllers/eventbus/installer/exotic_kafka.go @@ -0,0 +1,43 @@ +package installer + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" +) + +// exoticKafkaInstaller is an inalleration implementation of exotic kafka config. +type exoticKafkaInstaller struct { + eventBus *v1alpha1.EventBus + + logger *zap.SugaredLogger +} + +// NewExoticKafkaInstaller return a new exoticKafkaInstaller +func NewExoticKafkaInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLogger) Installer { + return &exoticKafkaInstaller{ + eventBus: eventBus, + logger: logger.Named("exotic-kafka"), + } +} + +func (i *exoticKafkaInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) { + kafkaObj := i.eventBus.Spec.Kafka + if kafkaObj == nil || kafkaObj.Exotic == nil { + return nil, fmt.Errorf("invalid request") + } + i.eventBus.Status.MarkDeployed("Skipped", "Skip deployment because of using exotic config.") + i.logger.Info("use exotic config") + busConfig := &v1alpha1.BusConfig{ + Kafka: kafkaObj.Exotic, + } + return busConfig, nil +} + +func (i *exoticKafkaInstaller) Uninstall(ctx context.Context) error { + i.logger.Info("nothing to uninstall") + return nil +} diff --git a/controllers/eventbus/installer/exotic_kafka_test.go b/controllers/eventbus/installer/exotic_kafka_test.go new file mode 100644 index 0000000000..94887cc4bf --- /dev/null +++ b/controllers/eventbus/installer/exotic_kafka_test.go @@ -0,0 +1,55 @@ +package installer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/argoproj/argo-events/common/logging" + "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" +) + +const ( + testExoticKafkaName = "test-kafka" + testExoticKafkaURL = "kafka:9092" +) + +var ( + testKafkaExoticBus = &v1alpha1.EventBus{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "EventBus", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: testExoticKafkaName, + }, + Spec: v1alpha1.EventBusSpec{ + Kafka: &v1alpha1.KafkaBus{ + Exotic: &v1alpha1.KafkaConfig{ + URL: testExoticKafkaURL, + }, + }, + }, + } +) + +func TestInstallationKafkaExotic(t *testing.T) { + t.Run("installation with exotic kafka config", func(t *testing.T) { + installer := NewExoticKafkaInstaller(testKafkaExoticBus, logging.NewArgoEventsLogger()) + conf, err := installer.Install(context.TODO()) + assert.NoError(t, err) + assert.NotNil(t, conf.Kafka) + assert.Equal(t, conf.Kafka.URL, testExoticKafkaURL) + }) +} + +func TestUninstallationKafkaExotic(t *testing.T) { + t.Run("uninstallation with exotic kafka config", func(t *testing.T) { + installer := NewExoticKafkaInstaller(testKafkaExoticBus, logging.NewArgoEventsLogger()) + err := installer.Uninstall(context.TODO()) + assert.NoError(t, err) + }) +} diff --git a/controllers/eventbus/installer/installer.go b/controllers/eventbus/installer/installer.go index 29b11b898a..2eaa3f5bc2 100644 --- a/controllers/eventbus/installer/installer.go +++ b/controllers/eventbus/installer/installer.go @@ -48,6 +48,10 @@ func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, config *con } } else if js := eventBus.Spec.JetStream; js != nil { return NewJetStreamInstaller(client, eventBus, config, getLabels(eventBus), logger), nil + } else if kafka := eventBus.Spec.Kafka; kafka != nil { + if kafka.Exotic != nil { + return NewExoticKafkaInstaller(eventBus, logger), nil + } } return nil, fmt.Errorf("invalid eventbus spec") } From b3232b92da9958e2cdc205341c3768e05a433a6a Mon Sep 17 00:00:00 2001 From: David Farr Date: Wed, 25 Jan 2023 18:53:13 -0800 Subject: [PATCH 3/7] Change order of EventBus reconcilation Calling client.Status().Update() before calling client.Update() persists the status for exotic evnetbuses, otherwise the status is not perisisted. Signed-off-by: David Farr --- controllers/eventbus/controller.go | 6 +++--- test/e2e/fixtures/when.go | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/controllers/eventbus/controller.go b/controllers/eventbus/controller.go index 2903a313f1..f47500620f 100644 --- a/controllers/eventbus/controller.go +++ b/controllers/eventbus/controller.go @@ -55,14 +55,14 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if reconcileErr != nil { log.Errorw("reconcile error", zap.Error(reconcileErr)) } + if err := r.client.Status().Update(ctx, busCopy); err != nil { + return reconcile.Result{}, err + } if r.needsUpdate(eventBus, busCopy) { if err := r.client.Update(ctx, busCopy); err != nil { return reconcile.Result{}, err } } - if err := r.client.Status().Update(ctx, busCopy); err != nil { - return reconcile.Result{}, err - } return ctrl.Result{}, reconcileErr } diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index c77fe02b9d..9be842f216 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -153,8 +153,10 @@ func (w *When) WaitForEventBusReady() *When { if err := testutil.WaitForEventBusReady(ctx, w.eventBusClient, w.eventBus.Name, defaultTimeout); err != nil { w.t.Fatal(err) } - if err := testutil.WaitForEventBusStatefulSetReady(ctx, w.kubeClient, Namespace, w.eventBus.Name, 2*time.Minute); err != nil { - w.t.Fatal(err) + if w.eventBus.Spec.Kafka == nil { // not needed for kafka (exotic only) + if err := testutil.WaitForEventBusStatefulSetReady(ctx, w.kubeClient, Namespace, w.eventBus.Name, 2*time.Minute); err != nil { + w.t.Fatal(err) + } } return w } From c9b28040183c982f46324f7140e632b11a9f740a Mon Sep 17 00:00:00 2001 From: Bilal Bakht Ahmad Date: Thu, 26 Jan 2023 00:36:22 -0800 Subject: [PATCH 4/7] requeue reconciler Signed-off-by: Bilal Bakht Ahmad --- controllers/eventbus/controller.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/controllers/eventbus/controller.go b/controllers/eventbus/controller.go index f47500620f..7019661379 100644 --- a/controllers/eventbus/controller.go +++ b/controllers/eventbus/controller.go @@ -55,13 +55,14 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if reconcileErr != nil { log.Errorw("reconcile error", zap.Error(reconcileErr)) } - if err := r.client.Status().Update(ctx, busCopy); err != nil { - return reconcile.Result{}, err - } if r.needsUpdate(eventBus, busCopy) { if err := r.client.Update(ctx, busCopy); err != nil { return reconcile.Result{}, err } + return ctrl.Result{Requeue: true}, nil + } + if err := r.client.Status().Update(ctx, busCopy); err != nil { + return reconcile.Result{}, err } return ctrl.Result{}, reconcileErr } From dbfb59d98cb32f498a5295b593bb1ac396d74e31 Mon Sep 17 00:00:00 2001 From: David Farr Date: Thu, 26 Jan 2023 09:47:31 -0800 Subject: [PATCH 5/7] Make additional copy of obj to update eb status Signed-off-by: David Farr --- controllers/eventbus/controller.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/controllers/eventbus/controller.go b/controllers/eventbus/controller.go index 7019661379..024cdc927f 100644 --- a/controllers/eventbus/controller.go +++ b/controllers/eventbus/controller.go @@ -55,15 +55,20 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if reconcileErr != nil { log.Errorw("reconcile error", zap.Error(reconcileErr)) } + + // client.Update() mutates busCopy so we need to make an + // additional copy in order to update the status. + statusCopy := busCopy.DeepCopy() + if r.needsUpdate(eventBus, busCopy) { if err := r.client.Update(ctx, busCopy); err != nil { return reconcile.Result{}, err } - return ctrl.Result{Requeue: true}, nil } - if err := r.client.Status().Update(ctx, busCopy); err != nil { + if err := r.client.Status().Update(ctx, statusCopy); err != nil { return reconcile.Result{}, err } + return ctrl.Result{}, reconcileErr } From 3a4bad0f020114d1c2a9bbf2bd11e9886f5e6221 Mon Sep 17 00:00:00 2001 From: Bilal Bakht Ahmad Date: Thu, 26 Jan 2023 11:55:08 -0800 Subject: [PATCH 6/7] oxford comma Co-authored-by: David Farr --- controllers/eventbus/validate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controllers/eventbus/validate.go b/controllers/eventbus/validate.go index 83136616fd..33697cb80d 100644 --- a/controllers/eventbus/validate.go +++ b/controllers/eventbus/validate.go @@ -9,7 +9,7 @@ import ( // ValidateEventBus accepts an EventBus and performs validation against it func ValidateEventBus(eb *v1alpha1.EventBus) error { if eb.Spec.NATS == nil && eb.Spec.JetStream == nil && eb.Spec.Kafka == nil { - return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\" or \"kafka\" needs to be specified") + return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\", or \"kafka\" needs to be specified") } if x := eb.Spec.NATS; x != nil { if x.Native != nil && x.Exotic != nil { From 685d04e03d2ff387a08205f7ecebbc55fd8dd19a Mon Sep 17 00:00:00 2001 From: Bilal Bakht Ahmad Date: Thu, 26 Jan 2023 11:58:57 -0800 Subject: [PATCH 7/7] stylistic change Signed-off-by: Bilal Bakht Ahmad --- controllers/eventbus/validate.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/controllers/eventbus/validate.go b/controllers/eventbus/validate.go index 33697cb80d..73a45983e8 100644 --- a/controllers/eventbus/validate.go +++ b/controllers/eventbus/validate.go @@ -40,8 +40,7 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error { if x.Exotic == nil { return fmt.Errorf("\"exotic\" must be defined") } - e := x.Exotic - if e.URL == "" { + if x.Exotic.URL == "" { return fmt.Errorf("\"spec.kafka.exotic.url\" is missing") } }