Skip to content

Commit

Permalink
Kafka Validations (#16)
Browse files Browse the repository at this point in the history
* webhook validations

Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com>

* add Exotic Kafka installer

* Change order of EventBus reconcilation

Calling client.Status().Update() before calling client.Update()
persists the status for exotic evnetbuses, otherwise the status
is not perisisted.

Signed-off-by: David Farr <david_farr@intuit.com>

* requeue reconciler

Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com>

* Make additional copy of obj to update eb status

Signed-off-by: David Farr <david_farr@intuit.com>

* oxford comma

Co-authored-by: David Farr <david_farr@intuit.com>

* stylistic change

Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com>

Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Co-authored-by: David Farr <david_farr@intuit.com>
  • Loading branch information
bilalba and dfarr authored Jan 26, 2023
1 parent 5dcd9e1 commit 07fb87b
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 5 deletions.
8 changes: 7 additions & 1 deletion controllers/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,20 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if reconcileErr != nil {
log.Errorw("reconcile error", zap.Error(reconcileErr))
}

// client.Update() mutates busCopy so we need to make an
// additional copy in order to update the status.
statusCopy := busCopy.DeepCopy()

if r.needsUpdate(eventBus, busCopy) {
if err := r.client.Update(ctx, busCopy); err != nil {
return reconcile.Result{}, err
}
}
if err := r.client.Status().Update(ctx, busCopy); err != nil {
if err := r.client.Status().Update(ctx, statusCopy); err != nil {
return reconcile.Result{}, err
}

return ctrl.Result{}, reconcileErr
}

Expand Down
43 changes: 43 additions & 0 deletions controllers/eventbus/installer/exotic_kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package installer

import (
"context"
"fmt"

"go.uber.org/zap"

"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

// exoticKafkaInstaller is an inalleration implementation of exotic kafka config.
type exoticKafkaInstaller struct {
eventBus *v1alpha1.EventBus

logger *zap.SugaredLogger
}

// NewExoticKafkaInstaller return a new exoticKafkaInstaller
func NewExoticKafkaInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLogger) Installer {
return &exoticKafkaInstaller{
eventBus: eventBus,
logger: logger.Named("exotic-kafka"),
}
}

func (i *exoticKafkaInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
kafkaObj := i.eventBus.Spec.Kafka
if kafkaObj == nil || kafkaObj.Exotic == nil {
return nil, fmt.Errorf("invalid request")
}
i.eventBus.Status.MarkDeployed("Skipped", "Skip deployment because of using exotic config.")
i.logger.Info("use exotic config")
busConfig := &v1alpha1.BusConfig{
Kafka: kafkaObj.Exotic,
}
return busConfig, nil
}

func (i *exoticKafkaInstaller) Uninstall(ctx context.Context) error {
i.logger.Info("nothing to uninstall")
return nil
}
55 changes: 55 additions & 0 deletions controllers/eventbus/installer/exotic_kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package installer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

const (
testExoticKafkaName = "test-kafka"
testExoticKafkaURL = "kafka:9092"
)

var (
testKafkaExoticBus = &v1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testExoticKafkaName,
},
Spec: v1alpha1.EventBusSpec{
Kafka: &v1alpha1.KafkaBus{
Exotic: &v1alpha1.KafkaConfig{
URL: testExoticKafkaURL,
},
},
},
}
)

func TestInstallationKafkaExotic(t *testing.T) {
t.Run("installation with exotic kafka config", func(t *testing.T) {
installer := NewExoticKafkaInstaller(testKafkaExoticBus, logging.NewArgoEventsLogger())
conf, err := installer.Install(context.TODO())
assert.NoError(t, err)
assert.NotNil(t, conf.Kafka)
assert.Equal(t, conf.Kafka.URL, testExoticKafkaURL)
})
}

func TestUninstallationKafkaExotic(t *testing.T) {
t.Run("uninstallation with exotic kafka config", func(t *testing.T) {
installer := NewExoticKafkaInstaller(testKafkaExoticBus, logging.NewArgoEventsLogger())
err := installer.Uninstall(context.TODO())
assert.NoError(t, err)
})
}
4 changes: 4 additions & 0 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, config *con
}
} else if js := eventBus.Spec.JetStream; js != nil {
return NewJetStreamInstaller(client, eventBus, config, getLabels(eventBus), logger), nil
} else if kafka := eventBus.Spec.Kafka; kafka != nil {
if kafka.Exotic != nil {
return NewExoticKafkaInstaller(eventBus, logger), nil
}
}
return nil, fmt.Errorf("invalid eventbus spec")
}
Expand Down
12 changes: 10 additions & 2 deletions controllers/eventbus/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

// ValidateEventBus accepts an EventBus and performs validation against it
func ValidateEventBus(eb *v1alpha1.EventBus) error {
if eb.Spec.NATS == nil && eb.Spec.JetStream == nil {
return fmt.Errorf("invalid spec: either \"nats\" or \"jetstream\" needs to be specified")
if eb.Spec.NATS == nil && eb.Spec.JetStream == nil && eb.Spec.Kafka == nil {
return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\", or \"kafka\" needs to be specified")
}
if x := eb.Spec.NATS; x != nil {
if x.Native != nil && x.Exotic != nil {
Expand All @@ -36,5 +36,13 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error {
return fmt.Errorf("invalid spec: a jetstream eventbus requires at least 3 replicas")
}
}
if x := eb.Spec.Kafka; x != nil {
if x.Exotic == nil {
return fmt.Errorf("\"exotic\" must be defined")
}
if x.Exotic.URL == "" {
return fmt.Errorf("\"spec.kafka.exotic.url\" is missing")
}
}
return nil
}
27 changes: 27 additions & 0 deletions controllers/eventbus/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ var (
},
},
}

testKafkaEventBus = &v1alpha1.EventBus{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: common.DefaultEventBusName,
},
Spec: v1alpha1.EventBusSpec{
Kafka: &v1alpha1.KafkaBus{
Exotic: &v1alpha1.KafkaConfig{
URL: "127.0.0.1:9092",
},
},
},
}
)

func TestValidate(t *testing.T) {
Expand All @@ -51,6 +65,11 @@ func TestValidate(t *testing.T) {
assert.NoError(t, err)
})

t.Run("test good kafka eventbus", func(t *testing.T) {
err := ValidateEventBus(testKafkaEventBus)
assert.NoError(t, err)
})

t.Run("test bad eventbus", func(t *testing.T) {
eb := testNatsEventBus.DeepCopy()
eb.Spec.NATS = nil
Expand Down Expand Up @@ -109,4 +128,12 @@ func TestValidate(t *testing.T) {
err = ValidateEventBus(eb)
assert.NoError(t, err)
})

t.Run("test kafka eventbus no URL", func(t *testing.T) {
eb := testKafkaEventBus.DeepCopy()
eb.Spec.Kafka.Exotic.URL = ""
err := ValidateEventBus(eb)
assert.Error(t, err)
assert.Contains(t, err.Error(), "\"spec.kafka.exotic.url\" is missing")
})
}
6 changes: 4 additions & 2 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,10 @@ func (w *When) WaitForEventBusReady() *When {
if err := testutil.WaitForEventBusReady(ctx, w.eventBusClient, w.eventBus.Name, defaultTimeout); err != nil {
w.t.Fatal(err)
}
if err := testutil.WaitForEventBusStatefulSetReady(ctx, w.kubeClient, Namespace, w.eventBus.Name, 2*time.Minute); err != nil {
w.t.Fatal(err)
if w.eventBus.Spec.Kafka == nil { // not needed for kafka (exotic only)
if err := testutil.WaitForEventBusStatefulSetReady(ctx, w.kubeClient, Namespace, w.eventBus.Name, 2*time.Minute); err != nil {
w.t.Fatal(err)
}
}
return w
}
Expand Down

0 comments on commit 07fb87b

Please sign in to comment.