Skip to content

Commit

Permalink
feat: Different deployment update strategy for different event sources (
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Jul 18, 2020
1 parent a8afcb7 commit 2363cc1
Show file tree
Hide file tree
Showing 19 changed files with 945 additions and 58 deletions.
2 changes: 1 addition & 1 deletion controllers/eventsource/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func main() {
}
// A controller with DefaultControllerRateLimiter
c, err := controller.New(eventsource.ControllerName, mgr, controller.Options{
Reconciler: eventsource.NewReconciler(mgr.GetClient(), mgr.GetScheme(), namespace, eventSourceImage, log.WithName("reconciler")),
Reconciler: eventsource.NewReconciler(mgr.GetClient(), mgr.GetScheme(), eventSourceImage, log.WithName("reconciler")),
})
if err != nil {
mainLog.Error(err, "unable to set up individual controller")
Expand Down
5 changes: 1 addition & 4 deletions controllers/eventsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ type reconciler struct {
client client.Client
scheme *runtime.Scheme

// controller namespace
namespace string
eventSourceImage string
logger logr.Logger
}

// NewReconciler returns a new reconciler
func NewReconciler(client client.Client, scheme *runtime.Scheme, namespace, eventSourceImage string, logger logr.Logger) reconcile.Reconciler {
func NewReconciler(client client.Client, scheme *runtime.Scheme, eventSourceImage string, logger logr.Logger) reconcile.Reconciler {
return &reconciler{client: client, scheme: scheme, eventSourceImage: eventSourceImage, logger: logger}
}

Expand Down Expand Up @@ -82,7 +80,6 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS
return err
}
args := &AdaptorArgs{
Namespace: r.namespace,
Image: r.eventSourceImage,
EventSource: eventSource,
Labels: map[string]string{
Expand Down
181 changes: 181 additions & 0 deletions controllers/eventsource/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package eventsource

import (
"context"
"testing"

appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
"github.com/stretchr/testify/assert"
)

const (
testEventSourceName = "test-name"
testNamespace = "test-ns"
testSecretName = "test-secret"
testSecretKey = "test-secret-key"
testConfigMapName = "test-cm"
testConfigMapKey = "test-cm-key"
)

var (
testSecretSelector = &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: testSecretName,
},
Key: testSecretKey,
}

testConfigMapSelector = &corev1.ConfigMapKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: testConfigMapName,
},
Key: testConfigMapKey,
}

fakeEventBus = &eventbusv1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: eventbusv1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: "default",
},
Spec: eventbusv1alpha1.EventBusSpec{
NATS: &eventbusv1alpha1.NATSBus{
Native: &eventbusv1alpha1.NativeStrategy{
Auth: &eventbusv1alpha1.AuthStrategyToken,
},
},
},
Status: eventbusv1alpha1.EventBusStatus{
Config: eventbusv1alpha1.BusConfig{
NATS: &eventbusv1alpha1.NATSConfig{
URL: "nats://xxxx",
Auth: &eventbusv1alpha1.AuthStrategyToken,
AccessSecret: &corev1.SecretKeySelector{
Key: "test-key",
LocalObjectReference: corev1.LocalObjectReference{
Name: "test-name",
},
},
},
},
},
}
)

func fakeEmptyEventSource() *v1alpha1.EventSource {
return &v1alpha1.EventSource{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testEventSourceName,
},
Spec: v1alpha1.EventSourceSpec{},
}
}

func fakeCalendarEventSourceMap(name string) map[string]v1alpha1.CalendarEventSource {
return map[string]v1alpha1.CalendarEventSource{name: {Schedule: "*/5 * * * *"}}
}

func fakeWebhookEventSourceMap(name string) map[string]v1alpha1.WebhookContext {
return map[string]v1alpha1.WebhookContext{
name: {
URL: "http://a.b",
Endpoint: "/abc",
Port: "1234",
},
}
}

func fakeKafkaEventSourceMap(name string) map[string]v1alpha1.KafkaEventSource {
return map[string]v1alpha1.KafkaEventSource{
name: {
URL: "a.b",
Partition: "abc",
Topic: "topic",
},
}
}

func fakeMQTTEventSourceMap(name string) map[string]v1alpha1.MQTTEventSource {
return map[string]v1alpha1.MQTTEventSource{
name: {
URL: "a.b",
ClientID: "cid",
Topic: "topic",
},
}
}

func fakeHDFSEventSourceMap(name string) map[string]v1alpha1.HDFSEventSource {
return map[string]v1alpha1.HDFSEventSource{
name: {
Type: "t",
CheckInterval: "aa",
Addresses: []string{"ad1"},
HDFSUser: "user",
KrbCCacheSecret: testSecretSelector,
KrbKeytabSecret: testSecretSelector,
KrbUsername: "user",
KrbRealm: "llss",
KrbConfigConfigMap: testConfigMapSelector,
KrbServicePrincipalName: "name",
},
}
}

func init() {
_ = v1alpha1.AddToScheme(scheme.Scheme)
_ = eventbusv1alpha1.AddToScheme(scheme.Scheme)
_ = appv1.AddToScheme(scheme.Scheme)
_ = corev1.AddToScheme(scheme.Scheme)
}

func TestReconcile(t *testing.T) {
t.Run("test reconcile without eventbus", func(t *testing.T) {
testEventSource := fakeEmptyEventSource()
testEventSource.Spec.Calendar = fakeCalendarEventSourceMap("test")
ctx := context.TODO()
cl := fake.NewFakeClient(testEventSource)
r := &reconciler{
client: cl,
scheme: scheme.Scheme,
eventSourceImage: "test-image",
logger: ctrl.Log.WithName("test"),
}
err := r.reconcile(ctx, testEventSource)
assert.Error(t, err)
assert.False(t, testEventSource.Status.IsReady())
})

t.Run("test reconcile with eventbus", func(t *testing.T) {
testEventSource := fakeEmptyEventSource()
testEventSource.Spec.Calendar = fakeCalendarEventSourceMap("test")
ctx := context.TODO()
cl := fake.NewFakeClient(testEventSource)
testBus := fakeEventBus.DeepCopy()
testBus.Status.MarkDeployed("test", "test")
testBus.Status.MarkConfigured()
err := cl.Create(ctx, testBus)
assert.Nil(t, err)
r := &reconciler{
client: cl,
scheme: scheme.Scheme,
eventSourceImage: "test-image",
logger: ctrl.Log.WithName("test"),
}
err = r.reconcile(ctx, testEventSource)
assert.NoError(t, err)
assert.True(t, testEventSource.Status.IsReady())
})
}
45 changes: 39 additions & 6 deletions controllers/eventsource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/argoproj/argo-events/common"
controllerscommon "github.com/argoproj/argo-events/controllers/common"
"github.com/argoproj/argo-events/eventsources"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
)
Expand All @@ -32,8 +34,6 @@ var (

// AdaptorArgs are the args needed to create a sensor deployment
type AdaptorArgs struct {
// controller namespace
Namespace string
Image string
EventSource *v1alpha1.EventSource
Labels map[string]string
Expand Down Expand Up @@ -268,7 +268,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
}

func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
replicas := int32(1)
singleReplica := int32(1)
replicas := singleReplica
if args.EventSource.Spec.Replica != nil {
replicas = *args.EventSource.Spec.Replica
}
if replicas < singleReplica {
replicas = singleReplica
}
eventSourceContainer := corev1.Container{
Image: args.Image,
ImagePullPolicy: corev1.PullAlways,
Expand All @@ -279,7 +286,7 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
}
}
eventSourceContainer.Name = "main"
return &appv1.DeploymentSpec{
spec := &appv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: args.Labels,
},
Expand All @@ -297,7 +304,26 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
SecurityContext: args.EventSource.Spec.Template.SecurityContext,
},
},
}, nil
}
allEventTypes := eventsources.GetEventingServers(args.EventSource)
recreateTypes := make(map[apicommon.EventSourceType]bool)
for _, esType := range apicommon.RecreateStrategyEventSources {
recreateTypes[esType] = true
}
recreates := 0
for eventType := range allEventTypes {
if _, ok := recreateTypes[eventType]; ok {
recreates++
break
}
}
if recreates > 0 {
spec.Replicas = &singleReplica
spec.Strategy = appv1.DeploymentStrategy{
Type: appv1.RecreateDeploymentStrategyType,
}
}
return spec, nil
}

func getService(ctx context.Context, cl client.Client, args *AdaptorArgs) (*corev1.Service, error) {
Expand Down Expand Up @@ -370,7 +396,14 @@ func envFromSources(eventSource *v1alpha1.EventSource, t reflect.Type) []corev1.
r := []corev1.EnvFromSource{}
keys := make(map[string]bool)
for _, e := range result {
entry := e.SecretRef.Name
var entry string
switch t {
case secretKeySelectorType:
entry = e.SecretRef.Name
case configMapKeySelectorType:
entry = e.ConfigMapRef.Name
default:
}
if _, value := keys[entry]; !value {
keys[entry] = true
r = append(r, e)
Expand Down
Loading

0 comments on commit 2363cc1

Please sign in to comment.