diff --git a/common/leaderelection/leaderelection.go b/common/leaderelection/leaderelection.go new file mode 100644 index 0000000000..eb2d8d6180 --- /dev/null +++ b/common/leaderelection/leaderelection.go @@ -0,0 +1,159 @@ +package leaderelection + +import ( + "context" + + "github.com/fsnotify/fsnotify" + "github.com/nats-io/graft" + nats "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/argoproj/argo-events/common" + "github.com/argoproj/argo-events/common/logging" + eventbusdriver "github.com/argoproj/argo-events/eventbus/driver" + apicommon "github.com/argoproj/argo-events/pkg/apis/common" + eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" +) + +type Elector interface { + RunOrDie(context.Context, LeaderCallbacks) +} + +type LeaderCallbacks struct { + OnStartedLeading func(context.Context) + OnStoppedLeading func() +} + +func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int) (Elector, error) { + logger := logging.FromContext(ctx) + var eventBusType apicommon.EventBusType + var eventBusAuth *eventbusv1alpha1.AuthStrategy + if eventBusConfig.NATS != nil { + eventBusType = apicommon.EventBusNATS + eventBusAuth = eventBusConfig.NATS.Auth + } else { + return nil, errors.New("invalid event bus") + } + var auth *eventbusdriver.Auth + cred := &eventbusdriver.AuthCredential{} + if eventBusAuth == nil || *eventBusAuth == eventbusv1alpha1.AuthStrategyNone { + auth = &eventbusdriver.Auth{ + Strategy: eventbusv1alpha1.AuthStrategyNone, + } + } else { + v := viper.New() + v.SetConfigName("auth") + v.SetConfigType("yaml") + v.AddConfigPath(common.EventBusAuthFileMountPath) + err := v.ReadInConfig() + if err != nil { + return nil, errors.Errorf("failed to load auth.yaml. err: %+v", err) + } + err = v.Unmarshal(cred) + if err != nil { + logger.Errorw("failed to unmarshal auth.yaml", zap.Error(err)) + return nil, err + } + v.WatchConfig() + v.OnConfigChange(func(e fsnotify.Event) { + logger.Info("eventbus auth config file changed.") + err = v.Unmarshal(cred) + if err != nil { + logger.Errorw("failed to unmarshal auth.yaml after reloading", zap.Error(err)) + } + }) + auth = &eventbusdriver.Auth{ + Strategy: *eventBusAuth, + Crendential: cred, + } + } + var elector Elector + switch eventBusType { + case apicommon.EventBusNATS: + elector = &natsEventBusElector{ + clusterName: clusterName, + size: clusterSize, + url: eventBusConfig.NATS.URL, + auth: auth, + } + default: + return nil, errors.New("invalid eventbus type") + } + return elector, nil +} + +type natsEventBusElector struct { + clusterName string + size int + url string + auth *eventbusdriver.Auth +} + +func (e *natsEventBusElector) RunOrDie(ctx context.Context, callbacks LeaderCallbacks) { + log := logging.FromContext(ctx) + ci := graft.ClusterInfo{Name: e.clusterName, Size: e.size} + opts := &nats.DefaultOptions + opts.Url = e.url + if e.auth.Strategy == eventbusv1alpha1.AuthStrategyToken { + opts.Token = e.auth.Crendential.Token + } + rpc, err := graft.NewNatsRpc(opts) + if err != nil { + log.Fatalw("failed to new Nats Rpc", zap.Error(err)) + } + errChan := make(chan error) + stateChangeChan := make(chan graft.StateChange) + handler := graft.NewChanHandler(stateChangeChan, errChan) + node, err := graft.New(ci, handler, rpc, "/tmp/graft.log") + if err != nil { + log.Fatalw("failed to new a node", zap.Error(err)) + } + defer node.Close() + + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + if node.State() == graft.LEADER { + log.Info("I'm the LEADER, starting ...") + go callbacks.OnStartedLeading(cctx) + } else { + log.Info("Not the LEADER, stand by ...") + } + + handleStateChange := func(sc graft.StateChange) { + switch sc.To { + case graft.LEADER: + log.Info("I'm the LEADER, starting ...") + go callbacks.OnStartedLeading(cctx) + case graft.FOLLOWER, graft.CANDIDATE: + log.Infof("Becoming a %v, stand by ...", sc.To) + if sc.From == graft.LEADER { + cancel() + callbacks.OnStoppedLeading() + cctx, cancel = context.WithCancel(ctx) + } + case graft.CLOSED: + if sc.From == graft.LEADER { + cancel() + callbacks.OnStoppedLeading() + } + log.Fatal("Leader elector connection was CLOSED") + default: + log.Fatalf("Unknown state: %s", sc.To) + } + } + + for { + select { + case <-ctx.Done(): + log.Info("exiting...") + return + case sc := <-stateChangeChan: + handleStateChange(sc) + case err := <-errChan: + log.Errorw("Error happened", zap.Error(err)) + } + } +} diff --git a/controllers/eventsource/controller.go b/controllers/eventsource/controller.go index 4d85b73263..767c177046 100644 --- a/controllers/eventsource/controller.go +++ b/controllers/eventsource/controller.go @@ -4,7 +4,6 @@ import ( "context" "go.uber.org/zap" - coordinationv1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -71,9 +70,6 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS log.Info("deleting eventsource") if controllerutil.ContainsFinalizer(eventSource, finalizerName) { // Finalizer logic should be added here. - if err := r.finalize(ctx, eventSource); err != nil { - return err - } controllerutil.RemoveFinalizer(eventSource, finalizerName) } return nil @@ -97,16 +93,6 @@ func (r *reconciler) reconcile(ctx context.Context, eventSource *v1alpha1.EventS return Reconcile(r.client, args, log) } -func (r *reconciler) finalize(ctx context.Context, eventSource *v1alpha1.EventSource) error { - // Clean up Lease objects if there's any - if err := r.client.DeleteAllOf(ctx, &coordinationv1.Lease{}, - client.InNamespace(eventSource.Namespace), - client.MatchingFields{"metadata.name": "eventsource-" + eventSource.Name}); err != nil { - return err - } - return nil -} - func (r *reconciler) needsUpdate(old, new *v1alpha1.EventSource) bool { if old == nil { return true diff --git a/controllers/eventsource/resource.go b/controllers/eventsource/resource.go index 0533a2a961..b65ea0ceae 100644 --- a/controllers/eventsource/resource.go +++ b/controllers/eventsource/resource.go @@ -20,8 +20,6 @@ import ( "github.com/argoproj/argo-events/common" controllerscommon "github.com/argoproj/argo-events/controllers/common" - "github.com/argoproj/argo-events/eventsources" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1" ) @@ -221,9 +219,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a }, }, }) + emptyDirVolName := "tmp" + volumes = append(volumes, corev1.Volume{ + Name: emptyDirVolName, VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}, + }) deploymentSpec.Template.Spec.Volumes = volumes volumeMounts := deploymentSpec.Template.Spec.Containers[0].VolumeMounts volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: "auth-volume", MountPath: common.EventBusAuthFileMountPath}) + volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: emptyDirVolName, MountPath: "/tmp"}) deploymentSpec.Template.Spec.Containers[0].VolumeMounts = volumeMounts } } else { @@ -332,26 +335,6 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) { spec.Template.Spec.PriorityClassName = args.EventSource.Spec.Template.PriorityClassName spec.Template.Spec.Priority = args.EventSource.Spec.Template.Priority } - allEventTypes := eventsources.GetEventingServers(args.EventSource, nil) - recreateTypes := make(map[apicommon.EventSourceType]bool) - for _, esType := range apicommon.RecreateStrategyEventSources { - recreateTypes[esType] = true - } - recreates := 0 - for eventType := range allEventTypes { - if _, ok := recreateTypes[eventType]; ok { - recreates++ - break - } - } - if recreates > 0 && replicas == 1 { - // For those event types, if there's only 1 replica, use recreate strategy. - // If replicas > 1, which means HA is available for them, rolling update strategy - // is better. - spec.Strategy = appv1.DeploymentStrategy{ - Type: appv1.RecreateDeploymentStrategyType, - } - } return spec, nil } diff --git a/controllers/sensor/controller.go b/controllers/sensor/controller.go index f563deaf81..93c735c7f2 100644 --- a/controllers/sensor/controller.go +++ b/controllers/sensor/controller.go @@ -20,7 +20,6 @@ import ( "context" "go.uber.org/zap" - coordinationv1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -87,9 +86,6 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err log.Info("deleting sensor") if controllerutil.ContainsFinalizer(sensor, finalizerName) { // Finalizer logic should be added here. - if err := r.finalize(ctx, sensor); err != nil { - return err - } controllerutil.RemoveFinalizer(sensor, finalizerName) } return nil @@ -113,16 +109,6 @@ func (r *reconciler) reconcile(ctx context.Context, sensor *v1alpha1.Sensor) err return Reconcile(r.client, args, log) } -func (r *reconciler) finalize(ctx context.Context, sensor *v1alpha1.Sensor) error { - // Clean up Lease objects if there's any - if err := r.client.DeleteAllOf(ctx, &coordinationv1.Lease{}, - client.InNamespace(sensor.Namespace), - client.MatchingFields{"metadata.name": "sensor-" + sensor.Name}); err != nil { - return err - } - return nil -} - func (r *reconciler) needsUpdate(old, new *v1alpha1.Sensor) bool { if old == nil { return true diff --git a/controllers/sensor/resource.go b/controllers/sensor/resource.go index cd733186f1..78e5102efc 100644 --- a/controllers/sensor/resource.go +++ b/controllers/sensor/resource.go @@ -185,9 +185,14 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a }, }, }) + emptyDirVolName := "tmp" + volumes = append(volumes, corev1.Volume{ + Name: emptyDirVolName, VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}, + }) deploymentSpec.Template.Spec.Volumes = volumes volumeMounts := deploymentSpec.Template.Spec.Containers[0].VolumeMounts volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: "auth-volume", MountPath: common.EventBusAuthFileMountPath}) + volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: emptyDirVolName, MountPath: "/tmp"}) deploymentSpec.Template.Spec.Containers[0].VolumeMounts = volumeMounts } } else { @@ -270,10 +275,6 @@ func buildDeploymentSpec(args *AdaptorArgs) (*appv1.DeploymentSpec, error) { MatchLabels: args.Labels, }, Replicas: &replicas, - Strategy: appv1.DeploymentStrategy{ - // Event bus does not allow multiple clients with same clientID to connect to the server at the same time. - Type: appv1.RecreateDeploymentStrategyType, - }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: podTemplateLabels, diff --git a/docs/eventsources/ha.md b/docs/eventsources/ha.md index a761a14de5..dde7cc56da 100644 --- a/docs/eventsources/ha.md +++ b/docs/eventsources/ha.md @@ -51,31 +51,6 @@ old one is gone. - Calendar - Generic -### RBAC - -To achieve `Active-Passive` strategy for these EventSources, a Service Account -with extra RBAC settings is needed. The Service Account needs to be bound to a -Role like following, and specified in the spec through -`spec.template.serviceAccountName`. - -```yaml -apiVersion: rbac.authorization.k8s.io/v1 -kind: Role -metadata: - name: lease-role -rules: - - apiGroups: - - coordination.k8s.io - resources: - - leases - resourceNames: - - eventsource-{event-source-name} - verbs: - - "*" -``` - -**NOTE: This is not requried if `spec.replicas = 1`.** - ## More Check [this](../dr_ha_recommendations.md) out to learn more information about diff --git a/docs/sensors/ha.md b/docs/sensors/ha.md index 9779517024..3507344de6 100644 --- a/docs/sensors/ha.md +++ b/docs/sensors/ha.md @@ -9,30 +9,6 @@ elected to be active if the old one is gone. **Please DO NOT manually scale up the replicas, that might cause unexpected behaviors!** -## RBAC - -To achieve HA for Sensor Pods, a Service Account with extra RBAC settings is -needed. The Service Account needs to be bound to a Role like following, and -specified in the spec through `spec.template.serviceAccountName`. - -```yaml -apiVersion: rbac.authorization.k8s.io/v1 -kind: Role -metadata: - name: lease-role -rules: - - apiGroups: - - coordination.k8s.io - resources: - - leases - resourceNames: - - sensor-{sensor-name) - verbs: - - "*" -``` - -**NOTE: This is not requried if `spec.replicas = 1`.** - ## More Check [this](../dr_ha_recommendations.md) out to learn more information about diff --git a/docs/service-accounts.md b/docs/service-accounts.md index 40a79fb9ed..49d13653ff 100644 --- a/docs/service-accounts.md +++ b/docs/service-accounts.md @@ -4,10 +4,9 @@ A `Service Account` can be specified in the EventSource object with `spec.template.serviceAccountName`, however it is not needed for all the -EventSource types except `resource`, unless you want to achieve -[HA](eventsources/ha.md) for some of them. For a `resource` EventSource, you -need to specify a Service Accout and give it `list` and `watch` permissions for -the resource being watched. +EventSource types except `resource`. For a `resource` EventSource, you need to +specify a Service Accout and give it `list` and `watch` permissions for the +resource being watched. For example, if you want to watch actions on `Deployment` objects, you need to: @@ -31,8 +30,7 @@ For example, if you want to watch actions on `Deployment` objects, you need to: A `Service Account` also can be specified in a Sensor object via `spec.template.serviceAccountName`, this is only needed when `k8s` trigger or -`argoWorkflow` trigger is defined in the Sensor object, or you want to run the -Sensor with [HA](sensors/ha.md). +`argoWorkflow` trigger is defined in the Sensor object. The sensor examples provided by us use `argo-events-sa` service account to execute the triggers, but it has more permissions than needed, and you may want diff --git a/eventsources/eventing.go b/eventsources/eventing.go index e2f76c9303..eb5c7b52e9 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "math/rand" - "os" "strings" "sync" "time" @@ -14,12 +13,9 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "go.uber.org/zap" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" "github.com/argoproj/argo-events/common" + "github.com/argoproj/argo-events/common/leaderelection" "github.com/argoproj/argo-events/common/logging" "github.com/argoproj/argo-events/eventbus" eventbusdriver "github.com/argoproj/argo-events/eventbus/driver" @@ -284,54 +280,24 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error { // EventSource object use the same type of deployment strategy break } - if !isRecreatType || e.eventSource.Spec.GetReplicas() == 1 { + if !isRecreatType { return e.run(ctx, servers) } - kubeConfig, _ := os.LookupEnv(common.EnvVarKubeConfig) - restConfig, err := common.GetClientConfig(kubeConfig) + custerName := fmt.Sprintf("%s-eventsource-%s", e.eventSource.Namespace, e.eventSource.Name) + elector, err := leaderelection.NewEventBusElector(ctx, *e.eventBusConfig, custerName, int(e.eventSource.Spec.GetReplicas())) if err != nil { - return errors.Wrapf(err, "failed to get a K8s rest config for the event source %s", e.eventSource.Name) + log.Errorw("failed to get an elector", zap.Error(err)) + return err } - kubeClient := kubernetes.NewForConfigOrDie(restConfig) - - leaseName := "eventsource-" + e.eventSource.Name - lock := &resourcelock.LeaseLock{ - LeaseMeta: metav1.ObjectMeta{ - Name: leaseName, - Namespace: e.eventSource.Namespace, - }, - Client: kubeClient.CoordinationV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: e.hostname, + elector.RunOrDie(ctx, leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + if err := e.run(ctx, servers); err != nil { + log.Errorw("failed to start", zap.Error(err)) + } }, - } - - ctx, cancel := context.WithCancel(ctx) - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - ReleaseOnCancel: true, - LeaseDuration: 60 * time.Second, - RenewDeadline: 15 * time.Second, - RetryPeriod: 5 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - if err := e.run(ctx, servers); err != nil { - log.Errorw("failed to start", zap.Error(err)) - cancel() - } - }, - OnStoppedLeading: func() { - log.Infof("leader lost: %s", e.hostname) - cancel() - }, - OnNewLeader: func(identity string) { - if identity == e.hostname { - log.Infof("I am the new leader: %s", identity) - return - } - log.Infof("stand by, new leader elected: %s", identity) - }, + OnStoppedLeading: func() { + log.Infof("leader lost: %s", e.hostname) }, }) diff --git a/go.mod b/go.mod index 0cee9cdbb0..1dc4f855c8 100644 --- a/go.mod +++ b/go.mod @@ -52,9 +52,9 @@ require ( github.com/mitchellh/reflectwalk v1.0.1 // indirect github.com/nats-io/gnatsd v1.4.1 // indirect github.com/nats-io/go-nats v1.7.2 + github.com/nats-io/graft v0.0.0-20200605173148-348798afea05 github.com/nats-io/nats-streaming-server v0.17.0 // indirect - github.com/nats-io/nats.go v1.9.1 - github.com/nats-io/nkeys v0.1.4 // indirect + github.com/nats-io/nats.go v1.10.0 github.com/nats-io/stan.go v0.6.0 github.com/nicksnyder/go-i18n v1.10.1-0.20190510212457-b280125b035a // indirect github.com/nsqio/go-nsq v1.0.8 diff --git a/go.sum b/go.sum index 89c7f8928a..74442e470f 100644 --- a/go.sum +++ b/go.sum @@ -739,16 +739,20 @@ github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44= github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= github.com/nats-io/go-nats v1.7.2 h1:cJujlwCYR8iMz5ofZSD/p2WLW8FabhkQ2lIEVbSvNSA= github.com/nats-io/go-nats v1.7.2/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= +github.com/nats-io/graft v0.0.0-20200605173148-348798afea05 h1:wF/dApMICOCM+/c/1dpFxooYGwmSUvclQMT9CRjnEbM= +github.com/nats-io/graft v0.0.0-20200605173148-348798afea05/go.mod h1:idnzXeCwCx69FMg+R0DyD4/OhrF1A+v3BqF5xSz+tS4= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= -github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8poVDQ06g= github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg= +github.com/nats-io/nats-server/v2 v2.1.7 h1:jCoQwDvRYJy3OpOTHeYfvIPLP46BMeDmH7XEJg/r42I= +github.com/nats-io/nats-server/v2 v2.1.7/go.mod h1:rbRrRE/Iv93O/rUvZ9dh4NfT0Cm9HWjW/BqOWLGgYiE= github.com/nats-io/nats-streaming-server v0.17.0 h1:eYhSmjRmRsCYNsoUshmZ+RgKbhq6B+7FvMHXo3M5yMs= github.com/nats-io/nats-streaming-server v0.17.0/go.mod h1:ewPBEsmp62Znl3dcRsYtlcfwudxHEdYMtYqUQSt4fE0= -github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= +github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= +github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= diff --git a/manifests/cluster-install/rbac/argo-events-cluster-role.yaml b/manifests/cluster-install/rbac/argo-events-cluster-role.yaml index 284abfd3a7..5cc567b6bb 100644 --- a/manifests/cluster-install/rbac/argo-events-cluster-role.yaml +++ b/manifests/cluster-install/rbac/argo-events-cluster-role.yaml @@ -89,9 +89,3 @@ rules: - update - patch - delete - - apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - "*" \ No newline at end of file diff --git a/manifests/install.yaml b/manifests/install.yaml index fc43fed834..2a264618b4 100644 --- a/manifests/install.yaml +++ b/manifests/install.yaml @@ -297,12 +297,6 @@ rules: - update - patch - delete -- apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - '*' --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/manifests/namespace-install.yaml b/manifests/namespace-install.yaml index 1eff3ee1ec..1b732bb02f 100644 --- a/manifests/namespace-install.yaml +++ b/manifests/namespace-install.yaml @@ -202,12 +202,6 @@ rules: - update - patch - delete -- apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - '*' --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/manifests/namespace-install/rbac/argo-events-role.yaml b/manifests/namespace-install/rbac/argo-events-role.yaml index 11542abe65..f893e36435 100644 --- a/manifests/namespace-install/rbac/argo-events-role.yaml +++ b/manifests/namespace-install/rbac/argo-events-role.yaml @@ -75,9 +75,3 @@ rules: - update - patch - delete - - apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - "*" \ No newline at end of file diff --git a/sensors/listener.go b/sensors/listener.go index d1ccdb4a99..a039d46f5f 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -31,10 +31,9 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" "github.com/argoproj/argo-events/common" + "github.com/argoproj/argo-events/common/leaderelection" "github.com/argoproj/argo-events/common/logging" "github.com/argoproj/argo-events/eventbus" eventbusdriver "github.com/argoproj/argo-events/eventbus/driver" @@ -65,51 +64,23 @@ func (sensorCtx *SensorContext) getGroupAndClientID(depExpression string) (strin } func (sensorCtx *SensorContext) Start(ctx context.Context) error { - if sensorCtx.sensor.Spec.GetReplicas() == 1 { - return sensorCtx.listenEvents(ctx) + log := logging.FromContext(ctx) + custerName := fmt.Sprintf("%s-sensor-%s", sensorCtx.sensor.Namespace, sensorCtx.sensor.Name) + elector, err := leaderelection.NewEventBusElector(ctx, *sensorCtx.eventBusConfig, custerName, int(sensorCtx.sensor.Spec.GetReplicas())) + if err != nil { + log.Errorw("failed to get an elector", zap.Error(err)) + return err } - - leaseName := "sensor-" + sensorCtx.sensor.Name - lock := &resourcelock.LeaseLock{ - LeaseMeta: metav1.ObjectMeta{ - Name: leaseName, - Namespace: sensorCtx.sensor.Namespace, + elector.RunOrDie(ctx, leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + if err := sensorCtx.listenEvents(ctx); err != nil { + log.Errorw("failed to start", zap.Error(err)) + } }, - Client: sensorCtx.kubeClient.CoordinationV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: sensorCtx.hostname, - }, - } - - log := logging.FromContext(ctx) - ctx, cancel := context.WithCancel(ctx) - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - ReleaseOnCancel: true, - LeaseDuration: 60 * time.Second, - RenewDeadline: 15 * time.Second, - RetryPeriod: 5 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - if err := sensorCtx.listenEvents(ctx); err != nil { - log.Errorw("failed to start", zap.Error(err)) - cancel() - } - }, - OnStoppedLeading: func() { - log.Infof("leader lost: %s", sensorCtx.hostname) - cancel() - }, - OnNewLeader: func(identity string) { - if identity == sensorCtx.hostname { - log.Infof("I am the new leader: %s", identity) - return - } - log.Infof("stand by, new leader elected: %s", identity) - }, + OnStoppedLeading: func() { + log.Infof("leader lost: %s", sensorCtx.hostname) }, }) - return nil } diff --git a/test/e2e/testdata/sensor-log-ha.yaml b/test/e2e/testdata/sensor-log-ha.yaml index 4a53a5cfc0..d52ee3d832 100644 --- a/test/e2e/testdata/sensor-log-ha.yaml +++ b/test/e2e/testdata/sensor-log-ha.yaml @@ -13,4 +13,4 @@ spec: triggers: - template: name: log-trigger - log: {} \ No newline at end of file + log: {} diff --git a/test/util/util.go b/test/util/util.go index 4d287ddbce..32e3dc0a07 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -8,9 +8,7 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" - coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -253,28 +251,8 @@ func EventSourcePodLogContains(ctx context.Context, kubeClient kubernetes.Interf if err != nil { return false, fmt.Errorf("error getting event source pod name: %w", err) } - podName := podList.Items[0].GetName() - if len(podList.Items) > 1 { - // HA - var l *coordinationv1.Lease - for { - l, err = kubeClient.CoordinationV1().Leases(namespace).Get(ctx, "eventsource-"+eventSourceName, metav1.GetOptions{}) - if err != nil { - if !apierrors.IsNotFound(err) { - return false, fmt.Errorf("error getting event source Lease information: %w", err) - } else if err != nil && apierrors.IsNotFound(err) { - time.Sleep(2 * time.Second) - continue - } - } - break - } - podName = *l.Spec.HolderIdentity - } - fmt.Printf("EventSource POD name: %s\n", podName) - cctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - return podLogContains(cctx, kubeClient, namespace, podName, regex) + + return PodsLogContains(ctx, kubeClient, namespace, regex, podList, timeout), nil } func SensorPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, sensorName, regex string, timeout time.Duration) (bool, error) { @@ -283,28 +261,41 @@ func SensorPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, if err != nil { return false, fmt.Errorf("error getting sensor pod name: %w", err) } - podName := podList.Items[0].GetName() - if len(podList.Items) > 1 { - // HA - var l *coordinationv1.Lease - for { - l, err = kubeClient.CoordinationV1().Leases(namespace).Get(ctx, "sensor-"+sensorName, metav1.GetOptions{}) + + return PodsLogContains(ctx, kubeClient, namespace, regex, podList, timeout), nil +} + +func PodsLogContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, regex string, podList *corev1.PodList, timeout time.Duration) bool { + cctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + errChan := make(chan error) + resultChan := make(chan bool) + for _, p := range podList.Items { + go func(podName string) { + fmt.Printf("Watching POD: %s\n", podName) + contains, err := podLogContains(cctx, kubeClient, namespace, podName, regex) if err != nil { - if !apierrors.IsNotFound(err) { - return false, fmt.Errorf("error getting sensor Lease information: %w", err) - } else if err != nil && apierrors.IsNotFound(err) { - time.Sleep(2 * time.Second) - continue - } + errChan <- err + return } - break + if contains { + resultChan <- true + } + }(p.Name) + } + + for { + select { + case <-cctx.Done(): + return false + case result := <-resultChan: + if result { + return true + } + case err := <-errChan: + fmt.Printf("error: %v", err) } - podName = *l.Spec.HolderIdentity } - fmt.Printf("Sensor POD name: %s\n", podName) - cctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - return podLogContains(cctx, kubeClient, namespace, podName, regex) } func podLogContains(ctx context.Context, client kubernetes.Interface, namespace, podName, regex string) (bool, error) {