Skip to content

Commit

Permalink
feat: HA support for event sources and sensors (#1158)
Browse files Browse the repository at this point in the history
* feat: HA support for event sources and sensors

Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Apr 5, 2021
1 parent f945fb5 commit 85c4157
Show file tree
Hide file tree
Showing 61 changed files with 1,380 additions and 792 deletions.
28 changes: 26 additions & 2 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 24 additions & 2 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions controllers/eventsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"go.uber.org/zap"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -70,6 +71,9 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS
log.Info("deleting eventsource")
if controllerutil.ContainsFinalizer(eventSource, finalizerName) {
// Finalizer logic should be added here.
if err := r.finalize(ctx, eventSource); err != nil {
return err
}
controllerutil.RemoveFinalizer(eventSource, finalizerName)
}
return nil
Expand All @@ -93,6 +97,16 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS
return Reconcile(r.client, args, log)
}

func (r *reconciler) finalize(ctx context.Context, eventSource *v1alpha1.EventSource) error {
// Clean up Lease objects if there's any
if err := r.client.DeleteAllOf(ctx, &coordinationv1.Lease{},
client.InNamespace(eventSource.Namespace),
client.MatchingFields{"metadata.name": "eventsource-" + eventSource.Name}); err != nil {
return err
}
return nil
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.EventSource) bool {
if old == nil {
return true
Expand Down
15 changes: 5 additions & 10 deletions controllers/eventsource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,6 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
}

func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
singleReplica := int32(1)
replicas := singleReplica
if args.EventSource.Spec.Replica != nil {
replicas = *args.EventSource.Spec.Replica
}
if replicas < singleReplica {
replicas = singleReplica
}
eventSourceContainer := corev1.Container{
Image: args.Image,
ImagePullPolicy: corev1.PullAlways,
Expand All @@ -309,6 +301,7 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
podTemplateLabels[k] = v
}

replicas := args.EventSource.Spec.GetReplicas()
spec := &appv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: args.Labels,
Expand Down Expand Up @@ -351,8 +344,10 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
break
}
}
if recreates > 0 {
spec.Replicas = &singleReplica
if recreates > 0 && replicas == 1 {
// For those event types, if there's only 1 replica, use recreate strategy.
// If replicas > 1, which means HA is available for them, rolling update strategy
// is better.
spec.Strategy = appv1.DeploymentStrategy{
Type: appv1.RecreateDeploymentStrategyType,
}
Expand Down
14 changes: 14 additions & 0 deletions controllers/sensor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"go.uber.org/zap"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -86,6 +87,9 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err
log.Info("deleting sensor")
if controllerutil.ContainsFinalizer(sensor, finalizerName) {
// Finalizer logic should be added here.
if err := r.finalize(ctx, sensor); err != nil {
return err
}
controllerutil.RemoveFinalizer(sensor, finalizerName)
}
return nil
Expand All @@ -109,6 +113,16 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err
return Reconcile(r.client, args, log)
}

func (r *reconciler) finalize(ctx context.Context, sensor *v1alpha1.Sensor) error {
// Clean up Lease objects if there's any
if err := r.client.DeleteAllOf(ctx, &coordinationv1.Lease{},
client.InNamespace(sensor.Namespace),
client.MatchingFields{"metadata.name": "sensor-" + sensor.Name}); err != nil {
return err
}
return nil
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.Sensor) bool {
if old == nil {
return true
Expand Down
6 changes: 5 additions & 1 deletion controllers/sensor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
Name: common.EnvVarEventBusSubject,
Value: fmt.Sprintf("eventbus-%s", args.Sensor.Namespace),
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}},
},
}

busConfigBytes, err := json.Marshal(eventBus.Status.Config)
Expand Down Expand Up @@ -237,7 +241,7 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a
}

func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) {
replicas := int32(1)
replicas := args.Sensor.Spec.GetReplicas()
sensorContainer := corev1.Container{
Image: args.Image,
ImagePullPolicy: corev1.PullAlways,
Expand Down
7 changes: 4 additions & 3 deletions docs/concepts/trigger.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
# Trigger

A Trigger is the resource/workload executed by the sensor once the event dependencies are resolved.
A Trigger is the resource/workload executed by the sensor once the event
dependencies are resolved.

## Trigger Types

1. AWS Lambda
1. Apache OpenWhisk
1. Argo Rollouts
1. Argo Workflows
1. Custom - Build Your Own
1. Custom - Build Your Own
1. HTTP Requests - Serverless Workloads (OpenFaas, Kubeless, KNative etc.)
1. Kafka Messages
1. Log Message - for debugging
1. NATS Messages
1. Slack Notifications
1. Azure Event Hubs Messages
1. Create any Kubernetes Objects
1. Log (for debugging event bus messages)
18 changes: 7 additions & 11 deletions docs/dr_ha_recommendations.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,8 @@ Priority could be set through `spec.nats.native.priorityClassName` or

### Replicas

For below types of EventSources, `spec.replica` could be set to a number `>1` to
make them HA, see more detail [here](eventsources/deployment-strategies.md).

- AWS SNS
- AWS SQS
- Github
- Gitlab
- NetApp Storage GRID
- Slack
- Stripe
- Webhook
EventSources can run with HA by setting `spec.replicas` to a number `>1`, see
more detail [here](eventsources/ha.md).

### EventSource POD Node Selection

Expand All @@ -127,6 +118,11 @@ Priority could be set through `spec.template.priorityClassName` or

## Sensors

### Replicas

Sensors can run with HA by setting `spec.replicas` to a number `>1`, see more
detail [here](sensors/ha.md).

### Sensor POD Node Selection

Sensor POD `affinity`, `nodeSelector` and `tolerations` could also be set
Expand Down
Loading

0 comments on commit 85c4157

Please sign in to comment.