From 7bc21d811ff237071a719d0c1865fe253354193e Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Mon, 30 Dec 2019 16:54:17 -0500 Subject: [PATCH] feat: Simplify event dispatch (#448) * feat(simplify-event-dispatch): added subscriber clients * feat(simplify-event-dispatch): fix tests * fix(simplify-event-dispatch): persist updates --- api/event-source.html | 2 +- api/event-source.md | 2 +- api/gateway.html | 176 +-------- api/gateway.md | 349 +----------------- api/sensor.html | 2 +- api/sensor.md | 2 +- common/events.go | 73 ---- common/events_test.go | 55 --- controllers/gateway/controller.go | 17 +- controllers/gateway/operator.go | 51 +-- controllers/gateway/resource_test.go | 9 +- controllers/gateway/validate.go | 27 -- controllers/sensor/controller.go | 19 +- controllers/sensor/operator.go | 46 +-- gateways/client/cloud-events.go | 57 +-- gateways/client/context.go | 35 +- gateways/client/event-source_test.go | 10 +- gateways/client/event-sources.go | 96 ++--- gateways/client/state.go | 122 +++--- gateways/client/state_test.go | 78 ++-- gateways/client/watcher.go | 6 +- .../gateway/v1alpha1/openapi_generated.go | 159 ++------ pkg/apis/gateway/v1alpha1/types.go | 53 +-- .../gateway/v1alpha1/zz_generated.deepcopy.go | 66 +--- sensors/triggers/params.go | 2 +- sensors/triggers/params_test.go | 14 +- 26 files changed, 288 insertions(+), 1240 deletions(-) delete mode 100644 common/events.go delete mode 100644 common/events_test.go diff --git a/api/event-source.html b/api/event-source.html index 7f4c945168..03f9db3e20 100644 --- a/api/event-source.html +++ b/api/event-source.html @@ -1751,5 +1751,5 @@

StorageGridFilter

Generated with gen-crd-api-reference-docs -on git commit c580a25. +on git commit 78e79e0.

diff --git a/api/event-source.md b/api/event-source.md index 7ef59a2c58..fe1c83f763 100644 --- a/api/event-source.md +++ b/api/event-source.md @@ -3484,6 +3484,6 @@ Description

Generated with gen-crd-api-reference-docs on git -commit c580a25. +commit 78e79e0.

diff --git a/api/gateway.html b/api/gateway.html index 76d7e6241b..138fa5957d 100644 --- a/api/gateway.html +++ b/api/gateway.html @@ -159,18 +159,14 @@

Gateway -watchers
+subscribers
- -NotificationWatchers - +[]string -

Watchers are components which are interested listening to notifications from this gateway -These only need to be specified when gateway dispatch mechanism is through HTTP POST notifications. -In future, support for NATS, KAFKA will be added as a means to dispatch notifications in which case -specifying watchers would be unnecessary.

+(Optional) +

Subscribers are HTTP endpoints to send events to.

@@ -211,70 +207,6 @@

Gateway -

GatewayNotificationWatcher -

-

-(Appears on: -NotificationWatchers) -

-

-

GatewayNotificationWatcher is the gateway interested in listening to notifications from this gateway

-

- - - - - - - - - - - - - - - - - - - - - - - - - -
FieldDescription
-name
- -string - -
-

Name is the gateway name

-
-port
- -string - -
-

Port is http server port on which gateway is running

-
-endpoint
- -string - -
-

Endpoint is REST API endpoint to post event to. -Events are sent using HTTP POST method to this endpoint.

-
-namespace
- -string - -
-

Namespace of the gateway

-

GatewayResource

@@ -392,18 +324,14 @@

GatewaySpec -watchers
+subscribers
- -NotificationWatchers - +[]string -

Watchers are components which are interested listening to notifications from this gateway -These only need to be specified when gateway dispatch mechanism is through HTTP POST notifications. -In future, support for NATS, KAFKA will be added as a means to dispatch notifications in which case -specifying watchers would be unnecessary.

+(Optional) +

Subscribers are HTTP endpoints to send events to.

@@ -638,94 +566,8 @@

NodeStatus -

NotificationWatchers -

-

-(Appears on: -GatewaySpec) -

-

-

NotificationWatchers are components which are interested listening to notifications from this gateway

-

- - - - - - - - - - - - - - - - - -
FieldDescription
-gateways
- - -[]GatewayNotificationWatcher - - -
-

Gateways is the list of gateways interested in listening to notifications from this gateway

-
-sensors
- - -[]SensorNotificationWatcher - - -
-

Sensors is the list of sensors interested in listening to notifications from this gateway

-
-

SensorNotificationWatcher -

-

-(Appears on: -NotificationWatchers) -

-

-

SensorNotificationWatcher is the sensor interested in listening to notifications from this gateway

-

- - - - - - - - - - - - - - - - - -
FieldDescription
-name
- -string - -
-

Name is the name of the sensor

-
-namespace
- -string - -
-

Namespace of the sensor

-

Generated with gen-crd-api-reference-docs -on git commit c580a25. +on git commit 78e79e0.

diff --git a/api/gateway.md b/api/gateway.md index 9a2a403e56..66f0c8d685 100644 --- a/api/gateway.md +++ b/api/gateway.md @@ -313,21 +313,17 @@ Service is the specifications of the service to expose the gateway Refer -watchers
- -NotificationWatchers +subscribers
\[\]string +(Optional) +

-Watchers are components which are interested listening to notifications -from this gateway These only need to be specified when gateway dispatch -mechanism is through HTTP POST notifications. In future, support for -NATS, KAFKA will be added as a means to dispatch notifications in which -case specifying watchers would be unnecessary. +Subscribers are HTTP endpoints to send events to.

@@ -408,139 +404,6 @@ Replica is the gateway deployment replicas -

- -GatewayNotificationWatcher - -

- -

- -(Appears on: -NotificationWatchers) - -

- -

- -

- -GatewayNotificationWatcher is the gateway interested in listening to -notifications from this gateway - -

- -

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- -Field - - - -Description - -
- -name
string - -
- -

- -Name is the gateway name - -

- -
- -port
string - -
- -

- -Port is http server port on which gateway is running - -

- -
- -endpoint
string - -
- -

- -Endpoint is REST API endpoint to post event to. Events are sent using -HTTP POST method to this endpoint. - -

- -
- -namespace
string - -
- -

- -Namespace of the gateway - -

- -
-

GatewayResource @@ -778,21 +641,17 @@ Service is the specifications of the service to expose the gateway Refer -watchers
- -NotificationWatchers +subscribers
\[\]string +(Optional) +

-Watchers are components which are interested listening to notifications -from this gateway These only need to be specified when gateway dispatch -mechanism is through HTTP POST notifications. In future, support for -NATS, KAFKA will be added as a means to dispatch notifications in which -case specifying watchers would be unnecessary. +Subscribers are HTTP endpoints to send events to.

@@ -1251,201 +1110,11 @@ UpdateTime is the time when node(gateway configuration) was updated -

- -NotificationWatchers - -

- -

- -(Appears on: -GatewaySpec) - -

- -

- -

- -NotificationWatchers are components which are interested listening to -notifications from this gateway - -

- -

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- -Field - - - -Description - -
- -gateways
- -\[\]GatewayNotificationWatcher - -
- -

- -Gateways is the list of gateways interested in listening to -notifications from this gateway - -

- -
- -sensors
- -\[\]SensorNotificationWatcher - -
- -

- -Sensors is the list of sensors interested in listening to notifications -from this gateway - -

- -
- -

- -SensorNotificationWatcher - -

- -

- -(Appears on: -NotificationWatchers) - -

- -

- -

- -SensorNotificationWatcher is the sensor interested in listening to -notifications from this gateway - -

- -

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- -Field - - - -Description - -
- -name
string - -
- -

- -Name is the name of the sensor - -

- -
- -namespace
string - -
- -

- -Namespace of the sensor - -

- -
-

Generated with gen-crd-api-reference-docs on git -commit c580a25. +commit 78e79e0.

diff --git a/api/sensor.html b/api/sensor.html index 2c5b677400..765eb62cd5 100644 --- a/api/sensor.html +++ b/api/sensor.html @@ -1841,5 +1841,5 @@

URLArtifact

Generated with gen-crd-api-reference-docs -on git commit c580a25. +on git commit 78e79e0.

diff --git a/api/sensor.md b/api/sensor.md index c650742109..d0d92b180d 100644 --- a/api/sensor.md +++ b/api/sensor.md @@ -3706,6 +3706,6 @@ VerifyCert decides whether the connection is secure or not

Generated with gen-crd-api-reference-docs on git -commit c580a25. +commit 78e79e0.

diff --git a/common/events.go b/common/events.go deleted file mode 100644 index e1b338730d..0000000000 --- a/common/events.go +++ /dev/null @@ -1,73 +0,0 @@ -/* -Copyright 2018 BlackRock, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package common - -import ( - "time" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" -) - -// K8sEventType is the type of event generated to indicate change in state of resource -type K8sEventType string - -// Possible values for K8sEventType -var ( - EscalationEventType K8sEventType = "Escalation" - StateChangeEventType K8sEventType = "StateChange" - OperationFailureEventType K8sEventType = "OperationFailed" - OperationSuccessEventType K8sEventType = "OperationSuccessful" -) - -const ( - // LabelEventType is label for k8 event type - LabelEventType = "event-type" -) - -// GenerateK8sEvent generates a kubernetes event -func GenerateK8sEvent(clientset kubernetes.Interface, reason string, eventType K8sEventType, action string, name, namespace, instanceId, kind string, labels map[string]string) error { - event := &corev1.Event{ - Reason: reason, - Type: string(eventType), - Action: action, - EventTime: metav1.MicroTime{ - Time: time.Now(), - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, - GenerateName: name + "-", - Labels: labels, - }, - InvolvedObject: corev1.ObjectReference{ - Namespace: namespace, - Name: name, - Kind: kind, - }, - Source: corev1.EventSource{ - Component: name, - }, - ReportingInstance: instanceId, - ReportingController: name, - } - - if _, err := clientset.CoreV1().Events(namespace).Create(event); err != nil { - return err - } - return nil -} diff --git a/common/events_test.go b/common/events_test.go deleted file mode 100644 index de1f2c8c12..0000000000 --- a/common/events_test.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2018 BlackRock, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package common - -import ( - "testing" - - "github.com/smartystreets/goconvey/convey" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" -) - -func TestGenerateK8sEvent(t *testing.T) { - convey.Convey("Given an event", t, func() { - fakeclientset := fake.NewSimpleClientset() - - convey.Convey("Generate a K8s event", func() { - - convey.So(GenerateK8sEvent(fakeclientset, "fake event", StateChangeEventType, - "state change", "fake-component", "fake-namespace", "1", "fake-kind", - map[string]string{"fake": "fake"}), convey.ShouldBeEmpty) - - convey.Convey("List events", func() { - eventList, err := fakeclientset.CoreV1().Events("fake-namespace").List(metav1.ListOptions{}) - - convey.Convey("No error should be generated when listing the events", func() { - convey.So(err, convey.ShouldBeEmpty) - }) - - convey.Convey("Only one event is generated", func() { - - convey.So(len(eventList.Items), convey.ShouldEqual, 1) - - convey.Convey("Event namespace must be fake-namespace", func() { - convey.So(eventList.Items[0].Namespace, convey.ShouldEqual, "fake-namespace") - }) - }) - }) - }) - }) -} diff --git a/controllers/gateway/controller.go b/controllers/gateway/controller.go index c5f88662a4..2da1b13db9 100644 --- a/controllers/gateway/controller.go +++ b/controllers/gateway/controller.go @@ -19,7 +19,6 @@ package gateway import ( "context" "errors" - "fmt" "time" base "github.com/argoproj/argo-events" @@ -116,21 +115,7 @@ func (c *Controller) processNextItem() bool { err = ctx.operate() if err != nil { - if err := common.GenerateK8sEvent(c.k8sClient, - fmt.Sprintf("controller failed to operate on gateway %s", gw.Name), - common.StateChangeEventType, - "controller operation failed", - gw.Name, - gw.Namespace, - c.Config.InstanceID, - gw.Kind, - map[string]string{ - common.LabelResourceName: gw.Name, - common.LabelEventType: string(common.EscalationEventType), - }, - ); err != nil { - ctx.logger.WithError(err).Errorln("failed to create K8s event to escalate controller operation failure") - } + ctx.logger.WithField("gateway", gw.Name).WithError(err).Errorln("failed to operate on the gateway object") } err = c.handleErr(err, key) diff --git a/controllers/gateway/operator.go b/controllers/gateway/operator.go index e4de8f960c..3653b7a63f 100644 --- a/controllers/gateway/operator.go +++ b/controllers/gateway/operator.go @@ -20,7 +20,6 @@ import ( "time" "github.com/argoproj/argo-events/common" - "github.com/argoproj/argo-events/pkg/apis/gateway" "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1" gwclient "github.com/argoproj/argo-events/pkg/client/gateway/clientset/versioned" "github.com/sirupsen/logrus" @@ -29,8 +28,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -// the context of an operation in the controller. -// the controller creates this context each time it picks a Gateway off its queue. +// the gatewayContext of an operation in the controller. +// the controller creates this gatewayContext each time it picks a Gateway off its queue. type gatewayContext struct { // gateway is the controller object gateway *v1alpha1.Gateway @@ -108,39 +107,18 @@ func (ctx *gatewayContext) operate() error { // updateGatewayState updates the gateway state func (ctx *gatewayContext) updateGatewayState() { + defer func() { + ctx.updated = false + }() if ctx.updated { - var err error - eventType := common.StateChangeEventType - labels := map[string]string{ - common.LabelResourceName: ctx.gateway.Name, - LabelPhase: string(ctx.gateway.Status.Phase), - LabelControllerInstanceID: ctx.controller.Config.InstanceID, - common.LabelOperation: "persist_gateway_state", - } - - ctx.gateway, err = PersistUpdates(ctx.controller.gatewayClient, ctx.gateway, ctx.logger) + updatedGateway, err := PersistUpdates(ctx.controller.gatewayClient, ctx.gateway, ctx.logger) if err != nil { - ctx.logger.WithError(err).Errorln("failed to persist gateway update, escalating...") - eventType = common.EscalationEventType - } - - labels[common.LabelEventType] = string(eventType) - if err := common.GenerateK8sEvent(ctx.controller.k8sClient, - "persist update", - eventType, - "gateway state update", - ctx.gateway.Name, - ctx.gateway.Namespace, - ctx.controller.Config.InstanceID, - gateway.Kind, - labels, - ); err != nil { - ctx.logger.WithError(err).Errorln("failed to create K8s event to logger gateway state persist operation") + ctx.logger.WithError(err).Errorln("failed to persist gateway update") return } + ctx.gateway = updatedGateway ctx.logger.Infoln("successfully persisted gateway resource update and created K8s event") } - ctx.updated = false } // mark the gateway phase @@ -186,24 +164,21 @@ func (ctx *gatewayContext) markGatewayPhase(phase v1alpha1.NodePhase, message st func PersistUpdates(client gwclient.Interface, gw *v1alpha1.Gateway, log *logrus.Logger) (*v1alpha1.Gateway, error) { gatewayClient := client.ArgoprojV1alpha1().Gateways(gw.ObjectMeta.Namespace) - // in case persist update fails - oldgw := gw.DeepCopy() - - gw, err := gatewayClient.Update(gw) + updatedGateway, err := gatewayClient.Update(gw) if err != nil { log.WithError(err).Warn("error updating gateway") if errors.IsConflict(err) { - return oldgw, err + return nil, err } log.Info("re-applying updates on latest version and retrying update") err = ReapplyUpdates(client, gw) if err != nil { log.WithError(err).Error("failed to re-apply update") - return oldgw, err + return nil, err } } - log.WithField(common.LabelPhase, string(gw.Status.Phase)).Info("gateway state updated successfully") - return gw, nil + log.WithField(common.LabelPhase, string(updatedGateway.Status.Phase)).Info("gateway state updated successfully") + return updatedGateway, nil } // ReapplyUpdates to gateway resource diff --git a/controllers/gateway/resource_test.go b/controllers/gateway/resource_test.go index 553d6af09a..ebf6b52633 100644 --- a/controllers/gateway/resource_test.go +++ b/controllers/gateway/resource_test.go @@ -84,14 +84,7 @@ var gatewayObj = &v1alpha1.Gateway{ Port: "9330", }, }, - Watchers: &v1alpha1.NotificationWatchers{ - Sensors: []v1alpha1.SensorNotificationWatcher{ - { - Name: "fake-sensor", - Namespace: common.DefaultControllerNamespace, - }, - }, - }, + Subscribers: []string{"http://fake-sensor.fake.svc.cluser.local:8080/"}, }, } diff --git a/controllers/gateway/validate.go b/controllers/gateway/validate.go index b1dbb51a4c..2d19a2a3a6 100644 --- a/controllers/gateway/validate.go +++ b/controllers/gateway/validate.go @@ -17,7 +17,6 @@ limitations under the License. package gateway import ( - apicommon "github.com/argoproj/argo-events/pkg/apis/common" "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1" "github.com/pkg/errors" ) @@ -33,34 +32,8 @@ func Validate(gatewayObj *v1alpha1.Gateway) error { if gatewayObj.Spec.EventSourceRef == nil { return errors.New("event source for the gateway is not specified") } - if gatewayObj.Spec.ProcessorPort == "" { return errors.New("gateway processor port is not specified") } - - switch gatewayObj.Spec.EventProtocol.Type { - case apicommon.HTTP: - if gatewayObj.Spec.Watchers == nil || (gatewayObj.Spec.Watchers.Gateways == nil && gatewayObj.Spec.Watchers.Sensors == nil) { - return errors.New("no associated watchers with gateway") - } - if gatewayObj.Spec.EventProtocol.Http.Port == "" { - return errors.New("http server port is not defined") - } - case apicommon.NATS: - if gatewayObj.Spec.EventProtocol.Nats.URL == "" { - return errors.New("nats url is not defined") - } - if gatewayObj.Spec.EventProtocol.Nats.Type == "" { - return errors.New("nats service type is not defined") - } - if gatewayObj.Spec.EventProtocol.Nats.Type == apicommon.Streaming && gatewayObj.Spec.EventProtocol.Nats.ClientId == "" { - return errors.New("client id must be specified when using nats streaming") - } - if gatewayObj.Spec.EventProtocol.Nats.Type == apicommon.Streaming && gatewayObj.Spec.EventProtocol.Nats.ClusterId == "" { - return errors.New("cluster id must be specified when using nats streaming") - } - default: - return errors.New("unknown gateway type") - } return nil } diff --git a/controllers/sensor/controller.go b/controllers/sensor/controller.go index b2923d670e..3c49eddddc 100644 --- a/controllers/sensor/controller.go +++ b/controllers/sensor/controller.go @@ -19,12 +19,10 @@ package sensor import ( "context" "errors" - "fmt" "time" base "github.com/argoproj/argo-events" "github.com/argoproj/argo-events/common" - "github.com/argoproj/argo-events/pkg/apis/sensor" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" clientset "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned" "github.com/sirupsen/logrus" @@ -117,22 +115,7 @@ func (controller *Controller) processNextItem() bool { err = ctx.operate() if err != nil { - if err := common.GenerateK8sEvent(controller.k8sClient, - fmt.Sprintf("failed to operate on sensor %s", s.Name), - common.EscalationEventType, - "sensor operation failed", - s.Name, - s.Namespace, - controller.Config.InstanceID, - sensor.Kind, - map[string]string{ - common.LabelSensorName: s.Name, - common.LabelEventType: string(common.EscalationEventType), - common.LabelOperation: "controller_operation", - }, - ); err != nil { - ctx.logger.WithError(err).Errorln("failed to create K8s event to escalate sensor operation failure") - } + ctx.logger.WithError(err).WithField("sensor", s.Name).Errorln("failed to operate on the sensor object") } err = controller.handleErr(err, key) diff --git a/controllers/sensor/operator.go b/controllers/sensor/operator.go index 95227ac995..ebcaaa79e6 100644 --- a/controllers/sensor/operator.go +++ b/controllers/sensor/operator.go @@ -21,7 +21,6 @@ import ( "github.com/argoproj/argo-events/common" apicommon "github.com/argoproj/argo-events/pkg/apis/common" - "github.com/argoproj/argo-events/pkg/apis/sensor" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" sensorclientset "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned" "github.com/sirupsen/logrus" @@ -167,43 +166,18 @@ func (ctx *sensorContext) updateSensorResources() error { // updateSensorState updates the sensor resource state func (ctx *sensorContext) updateSensorState() { + defer func() { + ctx.updated = false + }() if ctx.updated { - // persist updates to sensor resource - labels := map[string]string{ - common.LabelSensorName: ctx.sensor.Name, - LabelPhase: string(ctx.sensor.Status.Phase), - LabelControllerInstanceID: ctx.controller.Config.InstanceID, - common.LabelOperation: "persist_state_update", - } - eventType := common.StateChangeEventType - updatedSensor, err := PersistUpdates(ctx.controller.sensorClient, ctx.sensor, ctx.logger) if err != nil { ctx.logger.WithError(err).Errorln("failed to persist sensor update") - - // escalate failure - eventType = common.EscalationEventType - } - - // update sensor ref. in case of failure to persist updates, this is a deep copy of old sensor resource - ctx.sensor = updatedSensor - - labels[common.LabelEventType] = string(eventType) - if err := common.GenerateK8sEvent(ctx.controller.k8sClient, - "persist update", - eventType, - "sensor state update", - ctx.sensor.Name, - ctx.sensor.Namespace, - ctx.controller.Config.InstanceID, - sensor.Kind, - labels); err != nil { - ctx.logger.WithError(err).Error("failed to create K8s event to logger sensor state persist operation") return } - ctx.logger.Info("successfully persisted sensor resource update and created K8s event") + ctx.sensor = updatedSensor + ctx.logger.Info("successfully persisted sensor resource update") } - ctx.updated = false } // mark the overall sensor phase @@ -304,14 +278,12 @@ func (ctx *sensorContext) markDependencyNodesActive() { // PersistUpdates persists the updates to the Sensor resource func PersistUpdates(client sensorclientset.Interface, sensorObj *v1alpha1.Sensor, log *logrus.Logger) (*v1alpha1.Sensor, error) { sensorClient := client.ArgoprojV1alpha1().Sensors(sensorObj.ObjectMeta.Namespace) - // in case persist update fails - oldsensor := sensorObj.DeepCopy() - sensorObj, err := sensorClient.Update(sensorObj) + updatedSensor, err := sensorClient.Update(sensorObj) if err != nil { if errors.IsConflict(err) { log.WithError(err).Error("error updating sensor") - return oldsensor, err + return nil, err } log.Infoln(err) @@ -319,11 +291,11 @@ func PersistUpdates(client sensorclientset.Interface, sensorObj *v1alpha1.Sensor err = ReapplyUpdate(client, sensorObj) if err != nil { log.WithError(err).Error("failed to re-apply update") - return oldsensor, err + return nil, err } } log.WithField(common.LabelPhase, string(sensorObj.Status.Phase)).Info("sensor state updated successfully") - return sensorObj, nil + return updatedSensor, nil } // Reapply the update to sensor diff --git a/gateways/client/cloud-events.go b/gateways/client/cloud-events.go index e439a3b66d..62c7f086e0 100644 --- a/gateways/client/cloud-events.go +++ b/gateways/client/cloud-events.go @@ -27,6 +27,36 @@ import ( "github.com/google/uuid" ) +// updateSubscriberClients updates the active clients for event subscribers +func (gatewayContext *GatewayContext) updateSubscriberClients() { + if gatewayContext.subscriberClients == nil { + gatewayContext.subscriberClients = make(map[string]cloudevents.Client) + } + if len(gatewayContext.gateway.Spec.Subscribers) == 0 { + return + } + for _, subscriber := range gatewayContext.gateway.Spec.Subscribers { + if _, ok := gatewayContext.subscriberClients[subscriber]; !ok { + t, err := cloudevents.NewHTTPTransport( + cloudevents.WithTarget(subscriber), + cloudevents.WithEncoding(cloudevents.HTTPBinaryV02), + ) + if err != nil { + gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a transport") + continue + } + + client, err := cloudevents.NewClient(t) + if err != nil { + gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a client") + continue + } + gatewayContext.logger.WithField("subscriber", subscriber).Infoln("added a client for the subscriber") + gatewayContext.subscriberClients[subscriber] = client + } + } +} + // dispatchEvent dispatches event to gateway transformer for further processing func (gatewayContext *GatewayContext) dispatchEvent(gatewayEvent *gateways.Event) error { logger := gatewayContext.logger.WithField(common.LabelEventSource, gatewayEvent.Name) @@ -39,33 +69,16 @@ func (gatewayContext *GatewayContext) dispatchEvent(gatewayEvent *gateways.Event completeSuccess := true - for _, sensor := range gatewayContext.gateway.Spec.Watchers.Sensors { - namespace := gatewayContext.namespace - if sensor.Namespace != "" { - namespace = sensor.Namespace - } - - target := fmt.Sprintf("http://%s:%s%s", common.ServiceDNSName(sensor.Name, namespace), gatewayContext.gateway.Spec.EventProtocol.Http.Port, common.SensorServiceEndpoint) - - t, err := cloudevents.NewHTTPTransport( - cloudevents.WithTarget(target), - cloudevents.WithEncoding(cloudevents.HTTPBinaryV02), - ) - if err != nil { - logger.WithError(err).WithField("target", target).Warnln("failed to create a transport") - completeSuccess = false - continue - } - - client, err := cloudevents.NewClient(t) - if err != nil { - logger.WithError(err).WithField("target", target).Warnln("failed to create a client") + for _, subscriber := range gatewayContext.gateway.Spec.Subscribers { + client, ok := gatewayContext.subscriberClients[subscriber] + if !ok { + gatewayContext.logger.WithField("subscriber", subscriber).Warnln("unable to send event. no client found for the subscriber") completeSuccess = false continue } if _, _, err := client.Send(context.Background(), *cloudEvent); err != nil { - logger.WithError(err).WithField("target", target).Warnln("failed to send the event") + logger.WithError(err).WithField("target", subscriber).Warnln("failed to send the event") completeSuccess = false continue } diff --git a/gateways/client/context.go b/gateways/client/context.go index 91417d18a5..420a665cdf 100644 --- a/gateways/client/context.go +++ b/gateways/client/context.go @@ -18,17 +18,14 @@ package main import ( "context" - "fmt" + cloudevents "github.com/cloudevents/sdk-go" "os" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" - pc "github.com/argoproj/argo-events/pkg/apis/common" "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1" eventsourceClientset "github.com/argoproj/argo-events/pkg/client/eventsources/clientset/versioned" gwclientset "github.com/argoproj/argo-events/pkg/client/gateway/clientset/versioned" - "github.com/nats-io/go-nats" - snats "github.com/nats-io/go-nats-streaming" "github.com/sirupsen/logrus" "google.golang.org/grpc" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -62,13 +59,9 @@ type GatewayContext struct { // controllerInstanceId is instance ID of the gateway controller controllerInstanceID string // statusCh is used to communicate the status of an event source - statusCh chan EventSourceStatus - // natsConn is the standard nats connection used to publish events to cluster. Only used if dispatch protocol is NATS - natsConn *nats.Conn - // natsStreamingConn is the nats connection used for streaming. - natsStreamingConn snats.Conn - // sensorHttpPort is the http server running in sensor that listens to event. Only used if dispatch protocol is HTTP - sensorHttpPort string + statusCh chan notification + // subscriberClients holds the active clients for subscribers + subscriberClients map[string]cloudevents.Client } // EventSourceContext contains information of a event source for gateway to run. @@ -134,25 +127,9 @@ func NewGatewayContext() *GatewayContext { gateway: gateway, controllerInstanceID: controllerInstanceID, serverPort: serverPort, - statusCh: make(chan EventSourceStatus), + statusCh: make(chan notification), + subscriberClients: make(map[string]cloudevents.Client), } - switch gateway.Spec.EventProtocol.Type { - case pc.HTTP: - gatewayConfig.sensorHttpPort = gateway.Spec.EventProtocol.Http.Port - case pc.NATS: - if gatewayConfig.natsConn, err = nats.Connect(gateway.Spec.EventProtocol.Nats.URL); err != nil { - panic(fmt.Errorf("failed to obtain NATS standard connection. err: %+v", err)) - } - gatewayConfig.logger.WithField(common.LabelURL, gateway.Spec.EventProtocol.Nats.URL).Infoln("connected to nats service") - - if gatewayConfig.gateway.Spec.EventProtocol.Nats.Type == pc.Streaming { - gatewayConfig.natsStreamingConn, err = snats.Connect(gatewayConfig.gateway.Spec.EventProtocol.Nats.ClusterId, gatewayConfig.gateway.Spec.EventProtocol.Nats.ClientId, snats.NatsConn(gatewayConfig.natsConn)) - if err != nil { - panic(fmt.Errorf("failed to obtain NATS streaming connection. err: %+v", err)) - } - gatewayConfig.logger.WithField(common.LabelURL, gateway.Spec.EventProtocol.Nats.URL).Infoln("nats streaming connection successful") - } - } return gatewayConfig } diff --git a/gateways/client/event-source_test.go b/gateways/client/event-source_test.go index a825c13ac3..991fa2d4d0 100644 --- a/gateways/client/event-source_test.go +++ b/gateways/client/event-source_test.go @@ -41,16 +41,14 @@ func getGatewayContext() *GatewayContext { return &GatewayContext{ logger: common.NewArgoEventsLogger(), serverPort: "20000", - statusCh: make(chan EventSourceStatus), + statusCh: make(chan notification), gateway: &v1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: "fake-gateway", Namespace: "fake-namespace", }, Spec: v1alpha1.GatewaySpec{ - Watchers: &v1alpha1.NotificationWatchers{ - Sensors: []v1alpha1.SensorNotificationWatcher{}, - }, + Subscribers: []string{}, EventProtocol: &apicommon.EventProtocol{ Type: apicommon.HTTP, Http: apicommon.Http{ @@ -175,10 +173,8 @@ func TestSyncEventSources(t *testing.T) { go func() { for { select { - case status := <-gatewayContext.statusCh: - fmt.Println(status.Message) + case <-gatewayContext.statusCh: case <-stopStatus: - fmt.Println("returning from status") return } } diff --git a/gateways/client/event-sources.go b/gateways/client/event-sources.go index 7a97ea7bd9..fce46a68b6 100644 --- a/gateways/client/event-sources.go +++ b/gateways/client/event-sources.go @@ -23,7 +23,6 @@ import ( "github.com/argoproj/argo-events/gateways" apicommon "github.com/argoproj/argo-events/pkg/apis/common" eventSourceV1Alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" - "github.com/argoproj/argo-events/pkg/apis/gateway" "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1" "github.com/ghodss/yaml" "github.com/sirupsen/logrus" @@ -140,11 +139,13 @@ func (gatewayContext *GatewayContext) activateEventSources(eventSources map[stri // conn should be in READY state if eventSource.conn.GetState() != connectivity.Ready { logger.Errorln("connection is not in ready state.") - gatewayContext.statusCh <- EventSourceStatus{ - Phase: v1alpha1.NodePhaseError, - Id: eventSource.source.Id, - Message: "connection_is_not_in_ready_state", - Name: eventSource.source.Name, + gatewayContext.statusCh <- notification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseError, + id: eventSource.source.Id, + message: "connection is not in the ready state", + name: eventSource.source.Name, + }, } return } @@ -159,11 +160,13 @@ func (gatewayContext *GatewayContext) activateEventSources(eventSources map[stri if err := eventSource.conn.Close(); err != nil { logger.WithError(err).Errorln("failed to close client connection") } - gatewayContext.statusCh <- EventSourceStatus{ - Phase: v1alpha1.NodePhaseError, - Id: eventSource.source.Id, - Message: "event_source_is_not_valid", - Name: eventSource.source.Name, + gatewayContext.statusCh <- notification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseError, + id: eventSource.source.Id, + message: "event_source_is_not_valid", + name: eventSource.source.Name, + }, } return } @@ -171,22 +174,26 @@ func (gatewayContext *GatewayContext) activateEventSources(eventSources map[stri logger.Infoln("event source is valid") // mark event source as running - gatewayContext.statusCh <- EventSourceStatus{ - Phase: v1alpha1.NodePhaseRunning, - Message: "event_source_is_running", - Id: eventSource.source.Id, - Name: eventSource.source.Name, + gatewayContext.statusCh <- notification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseRunning, + message: "event source is running", + id: eventSource.source.Id, + name: eventSource.source.Name, + }, } // listen to events from gateway server eventStream, err := eventSource.client.StartEventSource(eventSource.ctx, eventSource.source) if err != nil { logger.WithError(err).Errorln("error occurred while starting event source") - gatewayContext.statusCh <- EventSourceStatus{ - Phase: v1alpha1.NodePhaseError, - Message: "failed_to_receive_event_stream", - Name: eventSource.source.Name, - Id: eventSource.source.Id, + gatewayContext.statusCh <- notification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseError, + message: "failed to receive event stream", + name: eventSource.source.Name, + id: eventSource.source.Id, + }, } return } @@ -197,37 +204,30 @@ func (gatewayContext *GatewayContext) activateEventSources(eventSources map[stri if err != nil { if err == io.EOF { logger.Infoln("event source has stopped") - gatewayContext.statusCh <- EventSourceStatus{ - Phase: v1alpha1.NodePhaseCompleted, - Message: "event_source_has_been_stopped", - Name: eventSource.source.Name, - Id: eventSource.source.Id, + gatewayContext.statusCh <- notification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseCompleted, + message: "event source has been stopped", + name: eventSource.source.Name, + id: eventSource.source.Id, + }, } return } logger.WithError(err).Errorln("failed to receive event from stream") - gatewayContext.statusCh <- EventSourceStatus{ - Phase: v1alpha1.NodePhaseError, - Message: "failed_to_receive_event_from_event_source_stream", - Name: eventSource.source.Name, - Id: eventSource.source.Id, + gatewayContext.statusCh <- notification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseError, + message: "failed to receive event from the event source stream", + name: eventSource.source.Name, + id: eventSource.source.Id, + }, } return } err = gatewayContext.dispatchEvent(event) if err != nil { - // escalate error through a K8s event - labels := map[string]string{ - common.LabelEventType: string(common.EscalationEventType), - common.LabelEventSourceName: eventSource.source.Name, - common.LabelResourceName: gatewayContext.name, - common.LabelEventSourceID: eventSource.source.Id, - common.LabelOperation: "dispatch_event_to_watchers", - } - if err := common.GenerateK8sEvent(gatewayContext.k8sClient, fmt.Sprintf("failed to dispatch event to watchers"), common.EscalationEventType, "event dispatch failed", gatewayContext.name, gatewayContext.namespace, gatewayContext.controllerInstanceID, gateway.Kind, labels); err != nil { - logger.WithError(err).Errorln("failed to create K8s event to escalate event dispatch failure") - } logger.WithError(err).Errorln("failed to dispatch event to watchers") } } @@ -247,11 +247,13 @@ func (gatewayContext *GatewayContext) deactivateEventSources(eventSourceNames [] logger.WithField(common.LabelEventSource, eventSource.source.Name).Infoln("stopping the event source") delete(gatewayContext.eventSourceContexts, eventSourceName) - gatewayContext.statusCh <- EventSourceStatus{ - Phase: v1alpha1.NodePhaseRemove, - Id: eventSource.source.Id, - Message: "event_source_is_removed", - Name: eventSource.source.Name, + gatewayContext.statusCh <- notification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseRemove, + id: eventSource.source.Id, + message: "event source is removed", + name: eventSource.source.Name, + }, } eventSource.cancel() if err := eventSource.conn.Close(); err != nil { diff --git a/gateways/client/state.go b/gateways/client/state.go index 007bf8eb93..307e77c37a 100644 --- a/gateways/client/state.go +++ b/gateways/client/state.go @@ -21,52 +21,63 @@ import ( "github.com/argoproj/argo-events/common" gtw "github.com/argoproj/argo-events/controllers/gateway" - "github.com/argoproj/argo-events/pkg/apis/gateway" "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// EventSourceStatus encapsulates state of an event source -type EventSourceStatus struct { - // Id of the event source - Id string - // Name of the event source - Name string - // Message - Message string - // Phase of the event source - Phase v1alpha1.NodePhase - // Gateway reference - Gateway *v1alpha1.Gateway +// notification encapsulates state of an event source +type notification struct { + // event source update notification + eventSourceNotification *eventSourceUpdate + // gateway resource update notification + gatewayNotification *resourceUpdate } -// markGatewayNodePhase marks the node with a phase, returns the node -func (gatewayContext *GatewayContext) markGatewayNodePhase(nodeStatus *EventSourceStatus) *v1alpha1.NodeStatus { +// eventSourceUpdate refers to update related to a event source +type eventSourceUpdate struct { + // id of the event source + id string + // name of the event source + name string + // phase of the event source node within gateway resource + phase v1alpha1.NodePhase + // message refers to cause of phase change + message string +} + +// resourceUpdate refers to update related to gateway resource +type resourceUpdate struct { + // gateway refers to updated gateway resource + gateway *v1alpha1.Gateway +} + +// markNodePhase marks the gateway node with a phase +func (gatewayContext *GatewayContext) markNodePhase(notification *eventSourceUpdate) *v1alpha1.NodeStatus { logger := gatewayContext.logger.WithFields( map[string]interface{}{ - common.LabelNodeName: nodeStatus.Name, - common.LabelPhase: string(nodeStatus.Phase), + common.LabelNodeName: notification.name, + common.LabelPhase: string(notification.phase), }, ) logger.Infoln("marking node phase") - node := gatewayContext.getNodeByID(nodeStatus.Id) + node := gatewayContext.getNodeByID(notification.id) if node == nil { logger.Warnln("node is not initialized") return nil } - if node.Phase != nodeStatus.Phase { - logger.WithField("new-phase", string(nodeStatus.Phase)).Infoln("phase updated") - node.Phase = nodeStatus.Phase + if node.Phase != notification.phase { + logger.WithField("new-phase", string(notification.phase)).Infoln("phase updated") + node.Phase = notification.phase } - node.Message = nodeStatus.Message + node.Message = notification.message gatewayContext.gateway.Status.Nodes[node.ID] = *node gatewayContext.updated = true return node } -// getNodeByName returns the node from this gateway for the nodeName +// getNodeByName returns a gateway node by the id func (gatewayContext *GatewayContext) getNodeByID(nodeID string) *v1alpha1.NodeStatus { node, ok := gatewayContext.gateway.Status.Nodes[nodeID] if !ok { @@ -81,7 +92,7 @@ func (gatewayContext *GatewayContext) initializeNode(nodeID string, nodeName str gatewayContext.gateway.Status.Nodes = make(map[string]v1alpha1.NodeStatus) } - gatewayContext.logger.WithField(common.LabelNodeName, nodeName).Infoln("node") + gatewayContext.logger.WithField(common.LabelNodeName, nodeName).Infoln("initializing the node") node, ok := gatewayContext.gateway.Status.Nodes[nodeID] if !ok { @@ -107,55 +118,46 @@ func (gatewayContext *GatewayContext) initializeNode(nodeID string, nodeName str return node } -// UpdateGatewayState updates gateway resource nodes state -func (gatewayContext *GatewayContext) UpdateGatewayState(status *EventSourceStatus) { +// UpdateGatewayState updates the gateway resource or the event source node status +func (gatewayContext *GatewayContext) UpdateGatewayState(notification *notification) { + defer func() { + gatewayContext.updated = false + }() + logger := gatewayContext.logger - if status.Phase != v1alpha1.NodePhaseResourceUpdate { - logger = logger.WithField(common.LabelEventSource, status.Name).Logger - } - logger.Infoln("received a gateway state update notification") + if notification.gatewayNotification != nil { + logger.Infoln("received a gateway resource update notification") + gatewayContext.gateway = notification.gatewayNotification.gateway + logger.Infoln("checking if any new subscribers are added") + gatewayContext.updateSubscriberClients() + } - switch status.Phase { - case v1alpha1.NodePhaseRunning: - // init the node and mark it as running - gatewayContext.initializeNode(status.Id, status.Name, status.Message) + if notification.eventSourceNotification != nil { + logger = logger.WithField(common.LabelEventSource, notification.eventSourceNotification).Logger + logger.Infoln("received a event source state update notification") - case v1alpha1.NodePhaseCompleted, v1alpha1.NodePhaseError: - gatewayContext.markGatewayNodePhase(status) + switch notification.eventSourceNotification.phase { + case v1alpha1.NodePhaseRunning: + // init the node and mark it as running + gatewayContext.initializeNode(notification.eventSourceNotification.id, notification.eventSourceNotification.name, notification.eventSourceNotification.message) - case v1alpha1.NodePhaseResourceUpdate: - gatewayContext.gateway = status.Gateway + case v1alpha1.NodePhaseCompleted, v1alpha1.NodePhaseError: + gatewayContext.markNodePhase(notification.eventSourceNotification) - case v1alpha1.NodePhaseRemove: - delete(gatewayContext.gateway.Status.Nodes, status.Id) - logger.Infoln("event source is removed") - gatewayContext.updated = true + case v1alpha1.NodePhaseRemove: + delete(gatewayContext.gateway.Status.Nodes, notification.eventSourceNotification.id) + logger.Infoln("event source is removed") + gatewayContext.updated = true + } } if gatewayContext.updated { - // persist changes and create K8s event logging the change - eventType := common.StateChangeEventType - labels := map[string]string{ - common.LabelEventSourceName: status.Name, - common.LabelResourceName: gatewayContext.name, - common.LabelEventSourceID: status.Id, - common.LabelOperation: "persist_event_source_state", - } updatedGw, err := gtw.PersistUpdates(gatewayContext.gatewayClient, gatewayContext.gateway, gatewayContext.logger) if err != nil { logger.WithError(err).Errorln("failed to persist gateway resource updates, reverting to old state") - eventType = common.EscalationEventType + return } - - // update gateway ref. in case of failure to persist updates, this is a deep copy of old gateway resource gatewayContext.gateway = updatedGw - labels[common.LabelEventType] = string(eventType) - - // generate a K8s event for persist event source state change - if err := common.GenerateK8sEvent(gatewayContext.k8sClient, status.Message, eventType, "event source state update", gatewayContext.name, gatewayContext.namespace, gatewayContext.controllerInstanceID, gateway.Kind, labels); err != nil { - logger.WithError(err).Errorln("failed to create K8s event to log event source state change") - } } - gatewayContext.updated = false } diff --git a/gateways/client/state_test.go b/gateways/client/state_test.go index d389582a8d..fa3b901102 100644 --- a/gateways/client/state_test.go +++ b/gateways/client/state_test.go @@ -17,9 +17,10 @@ limitations under the License. package main import ( + "testing" + "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1" "github.com/smartystreets/goconvey/convey" - "testing" ) func TestGatewayState(t *testing.T) { @@ -32,52 +33,50 @@ func TestGatewayState(t *testing.T) { }) convey.Convey("Update gateway resource test-node node state to running", func() { - gc.UpdateGatewayState(&EventSourceStatus{ - Phase: v1alpha1.NodePhaseRunning, - Name: "test-node", - Message: "node is marked as running", - Id: "test-node", + gc.UpdateGatewayState(¬ification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseRunning, + name: "test-node", + message: "node is marked as running", + id: "test-node", + }, }) convey.So(len(gc.gateway.Status.Nodes), convey.ShouldEqual, 1) convey.So(gc.gateway.Status.Nodes["test-node"].Phase, convey.ShouldEqual, v1alpha1.NodePhaseRunning) }) updatedGw := gc.gateway - updatedGw.Spec.Watchers = &v1alpha1.NotificationWatchers{ - Sensors: []v1alpha1.SensorNotificationWatcher{ - { - Name: "sensor-1", - }, - }, - } + updatedGw.Spec.Subscribers = []string{"sensor-1"} convey.Convey("Update gateway watchers", func() { - gc.UpdateGatewayState(&EventSourceStatus{ - Phase: v1alpha1.NodePhaseResourceUpdate, - Name: "test-node", - Message: "gateway resource is updated", - Id: "test-node", - Gateway: updatedGw, + gc.UpdateGatewayState(¬ification{ + gatewayNotification: &resourceUpdate{ + gateway: updatedGw, + }, }) - convey.So(len(gc.gateway.Spec.Watchers.Sensors), convey.ShouldEqual, 1) + convey.So(len(gc.gateway.Spec.Subscribers), convey.ShouldEqual, 1) }) convey.Convey("Update gateway resource test-node node state to completed", func() { - gc.UpdateGatewayState(&EventSourceStatus{ - Phase: v1alpha1.NodePhaseCompleted, - Name: "test-node", - Message: "node is marked completed", - Id: "test-node", + gc.UpdateGatewayState(¬ification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseCompleted, + name: "test-node", + message: "node is marked completed", + id: "test-node", + }, }) convey.So(gc.gateway.Status.Nodes["test-node"].Phase, convey.ShouldEqual, v1alpha1.NodePhaseCompleted) }) convey.Convey("Remove gateway resource test-node node", func() { - gc.UpdateGatewayState(&EventSourceStatus{ - Phase: v1alpha1.NodePhaseRemove, - Name: "test-node", - Message: "node is removed", - Id: "test-node", + gc.UpdateGatewayState(¬ification{ + eventSourceNotification: &eventSourceUpdate{ + phase: v1alpha1.NodePhaseRemove, + name: "test-node", + message: "node is removed", + id: "test-node", + }, }) convey.So(len(gc.gateway.Status.Nodes), convey.ShouldEqual, 0) }) @@ -87,12 +86,13 @@ func TestGatewayState(t *testing.T) { func TestMarkGatewayNodePhase(t *testing.T) { convey.Convey("Given a node status, mark node state", t, func() { gc := getGatewayContext() - nodeStatus := &EventSourceStatus{ - Name: "fake", - Id: "1234", - Message: "running", - Phase: v1alpha1.NodePhaseRunning, - Gateway: gc.gateway, + n := ¬ification{ + eventSourceNotification: &eventSourceUpdate{ + name: "fake", + id: "1234", + message: "running", + phase: v1alpha1.NodePhaseRunning, + }, } gc.gateway.Status.Nodes = map[string]v1alpha1.NodeStatus{ "1234": v1alpha1.NodeStatus{ @@ -103,9 +103,9 @@ func TestMarkGatewayNodePhase(t *testing.T) { }, } - resultStatus := gc.markGatewayNodePhase(nodeStatus) + resultStatus := gc.markNodePhase(n.eventSourceNotification) convey.So(resultStatus, convey.ShouldNotBeNil) - convey.So(resultStatus.Name, convey.ShouldEqual, nodeStatus.Name) + convey.So(resultStatus.Name, convey.ShouldEqual, n.eventSourceNotification.name) gc.gateway.Status.Nodes = map[string]v1alpha1.NodeStatus{ "4567": v1alpha1.NodeStatus{ @@ -116,7 +116,7 @@ func TestMarkGatewayNodePhase(t *testing.T) { }, } - resultStatus = gc.markGatewayNodePhase(nodeStatus) + resultStatus = gc.markNodePhase(n.eventSourceNotification) convey.So(resultStatus, convey.ShouldBeNil) }) } diff --git a/gateways/client/watcher.go b/gateways/client/watcher.go index 0b3bfba35a..88c47dfb9e 100644 --- a/gateways/client/watcher.go +++ b/gateways/client/watcher.go @@ -104,10 +104,8 @@ func (gatewayContext *GatewayContext) WatchGatewayUpdates(ctx context.Context) ( UpdateFunc: func(old, new interface{}) { if g, ok := new.(*v1alpha1.Gateway); ok { gatewayContext.logger.Info("detected gateway update. updating gateway watchers") - gatewayContext.statusCh <- EventSourceStatus{ - Phase: v1alpha1.NodePhaseResourceUpdate, - Gateway: g, - Message: "gateway_resource_update", + gatewayContext.statusCh <- notification{ + gatewayNotification: &resourceUpdate{gateway: g}, } } }, diff --git a/pkg/apis/gateway/v1alpha1/openapi_generated.go b/pkg/apis/gateway/v1alpha1/openapi_generated.go index 33be14ca54..f784365837 100644 --- a/pkg/apis/gateway/v1alpha1/openapi_generated.go +++ b/pkg/apis/gateway/v1alpha1/openapi_generated.go @@ -28,16 +28,13 @@ import ( func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { return map[string]common.OpenAPIDefinition{ - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.EventSourceRef": schema_pkg_apis_gateway_v1alpha1_EventSourceRef(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.Gateway": schema_pkg_apis_gateway_v1alpha1_Gateway(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayList": schema_pkg_apis_gateway_v1alpha1_GatewayList(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayNotificationWatcher": schema_pkg_apis_gateway_v1alpha1_GatewayNotificationWatcher(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayResource": schema_pkg_apis_gateway_v1alpha1_GatewayResource(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewaySpec": schema_pkg_apis_gateway_v1alpha1_GatewaySpec(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayStatus": schema_pkg_apis_gateway_v1alpha1_GatewayStatus(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.NodeStatus": schema_pkg_apis_gateway_v1alpha1_NodeStatus(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.NotificationWatchers": schema_pkg_apis_gateway_v1alpha1_NotificationWatchers(ref), - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.SensorNotificationWatcher": schema_pkg_apis_gateway_v1alpha1_SensorNotificationWatcher(ref), + "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.EventSourceRef": schema_pkg_apis_gateway_v1alpha1_EventSourceRef(ref), + "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.Gateway": schema_pkg_apis_gateway_v1alpha1_Gateway(ref), + "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayList": schema_pkg_apis_gateway_v1alpha1_GatewayList(ref), + "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayResource": schema_pkg_apis_gateway_v1alpha1_GatewayResource(ref), + "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewaySpec": schema_pkg_apis_gateway_v1alpha1_GatewaySpec(ref), + "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayStatus": schema_pkg_apis_gateway_v1alpha1_GatewayStatus(ref), + "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.NodeStatus": schema_pkg_apis_gateway_v1alpha1_NodeStatus(ref), } } @@ -166,48 +163,6 @@ func schema_pkg_apis_gateway_v1alpha1_GatewayList(ref common.ReferenceCallback) } } -func schema_pkg_apis_gateway_v1alpha1_GatewayNotificationWatcher(ref common.ReferenceCallback) common.OpenAPIDefinition { - return common.OpenAPIDefinition{ - Schema: spec.Schema{ - SchemaProps: spec.SchemaProps{ - Description: "GatewayNotificationWatcher is the gateway interested in listening to notifications from this gateway", - Type: []string{"object"}, - Properties: map[string]spec.Schema{ - "name": { - SchemaProps: spec.SchemaProps{ - Description: "Name is the gateway name", - Type: []string{"string"}, - Format: "", - }, - }, - "port": { - SchemaProps: spec.SchemaProps{ - Description: "Port is http server port on which gateway is running", - Type: []string{"string"}, - Format: "", - }, - }, - "endpoint": { - SchemaProps: spec.SchemaProps{ - Description: "Endpoint is REST API endpoint to post event to. Events are sent using HTTP POST method to this endpoint.", - Type: []string{"string"}, - Format: "", - }, - }, - "namespace": { - SchemaProps: spec.SchemaProps{ - Description: "Namespace of the gateway", - Type: []string{"string"}, - Format: "", - }, - }, - }, - Required: []string{"name", "port", "endpoint"}, - }, - }, - } -} - func schema_pkg_apis_gateway_v1alpha1_GatewayResource(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -268,10 +223,23 @@ func schema_pkg_apis_gateway_v1alpha1_GatewaySpec(ref common.ReferenceCallback) Ref: ref("k8s.io/api/core/v1.Service"), }, }, - "watchers": { + "subscribers": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-type": "subscribers", + }, + }, SchemaProps: spec.SchemaProps{ - Description: "Watchers are components which are interested listening to notifications from this gateway These only need to be specified when gateway dispatch mechanism is through HTTP POST notifications. In future, support for NATS, KAFKA will be added as a means to dispatch notifications in which case specifying watchers would be unnecessary.", - Ref: ref("github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.NotificationWatchers"), + Description: "Subscribers are HTTP endpoints to send events to.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, }, }, "processorPort": { @@ -299,7 +267,7 @@ func schema_pkg_apis_gateway_v1alpha1_GatewaySpec(ref common.ReferenceCallback) }, }, Dependencies: []string{ - "github.com/argoproj/argo-events/pkg/apis/common.EventProtocol", "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.EventSourceRef", "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.NotificationWatchers", "k8s.io/api/core/v1.PodTemplateSpec", "k8s.io/api/core/v1.Service"}, + "github.com/argoproj/argo-events/pkg/apis/common.EventProtocol", "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.EventSourceRef", "k8s.io/api/core/v1.PodTemplateSpec", "k8s.io/api/core/v1.Service"}, } } @@ -415,82 +383,3 @@ func schema_pkg_apis_gateway_v1alpha1_NodeStatus(ref common.ReferenceCallback) c "k8s.io/apimachinery/pkg/apis/meta/v1.MicroTime"}, } } - -func schema_pkg_apis_gateway_v1alpha1_NotificationWatchers(ref common.ReferenceCallback) common.OpenAPIDefinition { - return common.OpenAPIDefinition{ - Schema: spec.Schema{ - SchemaProps: spec.SchemaProps{ - Description: "NotificationWatchers are components which are interested listening to notifications from this gateway", - Type: []string{"object"}, - Properties: map[string]spec.Schema{ - "gateways": { - VendorExtensible: spec.VendorExtensible{ - Extensions: spec.Extensions{ - "x-kubernetes-list-type": "gateways", - }, - }, - SchemaProps: spec.SchemaProps{ - Description: "Gateways is the list of gateways interested in listening to notifications from this gateway", - Type: []string{"array"}, - Items: &spec.SchemaOrArray{ - Schema: &spec.Schema{ - SchemaProps: spec.SchemaProps{ - Ref: ref("github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayNotificationWatcher"), - }, - }, - }, - }, - }, - "sensors": { - VendorExtensible: spec.VendorExtensible{ - Extensions: spec.Extensions{ - "x-kubernetes-list-type": "sensors", - }, - }, - SchemaProps: spec.SchemaProps{ - Description: "Sensors is the list of sensors interested in listening to notifications from this gateway", - Type: []string{"array"}, - Items: &spec.SchemaOrArray{ - Schema: &spec.Schema{ - SchemaProps: spec.SchemaProps{ - Ref: ref("github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.SensorNotificationWatcher"), - }, - }, - }, - }, - }, - }, - }, - }, - Dependencies: []string{ - "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.GatewayNotificationWatcher", "github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1.SensorNotificationWatcher"}, - } -} - -func schema_pkg_apis_gateway_v1alpha1_SensorNotificationWatcher(ref common.ReferenceCallback) common.OpenAPIDefinition { - return common.OpenAPIDefinition{ - Schema: spec.Schema{ - SchemaProps: spec.SchemaProps{ - Description: "SensorNotificationWatcher is the sensor interested in listening to notifications from this gateway", - Type: []string{"object"}, - Properties: map[string]spec.Schema{ - "name": { - SchemaProps: spec.SchemaProps{ - Description: "Name is the name of the sensor", - Type: []string{"string"}, - Format: "", - }, - }, - "namespace": { - SchemaProps: spec.SchemaProps{ - Description: "Namespace of the sensor", - Type: []string{"string"}, - Format: "", - }, - }, - }, - Required: []string{"name"}, - }, - }, - } -} diff --git a/pkg/apis/gateway/v1alpha1/types.go b/pkg/apis/gateway/v1alpha1/types.go index 7f4a69befe..bcf927e484 100644 --- a/pkg/apis/gateway/v1alpha1/types.go +++ b/pkg/apis/gateway/v1alpha1/types.go @@ -27,12 +27,11 @@ type NodePhase string // possible types of node phases const ( - NodePhaseRunning NodePhase = "Running" // the node is running - NodePhaseError NodePhase = "Error" // the node has encountered an error in processing - NodePhaseNew NodePhase = "" // the node is new - NodePhaseCompleted NodePhase = "Completed" // node has completed running - NodePhaseRemove NodePhase = "Remove" // stale node - NodePhaseResourceUpdate NodePhase = "ResourceUpdate" // resource is updated + NodePhaseRunning NodePhase = "Running" // the node is running + NodePhaseError NodePhase = "Error" // the node has encountered an error in processing + NodePhaseNew NodePhase = "" // the node is new + NodePhaseCompleted NodePhase = "Completed" // node has completed running + NodePhaseRemove NodePhase = "Remove" // stale node ) // Gateway is the definition of a gateway resource @@ -67,11 +66,10 @@ type GatewaySpec struct { // Service is the specifications of the service to expose the gateway // Refer https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.11/#service-v1-core Service *corev1.Service `json:"service,omitempty" protobuf:"bytes,4,opt,name=service"` - // Watchers are components which are interested listening to notifications from this gateway - // These only need to be specified when gateway dispatch mechanism is through HTTP POST notifications. - // In future, support for NATS, KAFKA will be added as a means to dispatch notifications in which case - // specifying watchers would be unnecessary. - Watchers *NotificationWatchers `json:"watchers,omitempty" protobuf:"bytes,5,opt,name=watchers"` + // Subscribers are HTTP endpoints to send events to. + // +listType=subscribers + // +optional + Subscribers []string `json:"subscribers,omitempty" protobuf:"bytes,5,opt,name=subscribers"` // Port on which the gateway event source processor is running on. ProcessorPort string `json:"processorPort" protobuf:"bytes,6,opt,name=processorPort"` // EventProtocol is the underlying protocol used to send events from gateway to watchers(components interested in listening to event from this gateway) @@ -134,36 +132,3 @@ type NodeStatus struct { // UpdateTime is the time when node(gateway configuration) was updated UpdateTime metav1.MicroTime `json:"updateTime,omitempty" protobuf:"bytes,9,opt,name=updateTime"` } - -// NotificationWatchers are components which are interested listening to notifications from this gateway -type NotificationWatchers struct { - // +listType=gateways - // Gateways is the list of gateways interested in listening to notifications from this gateway - Gateways []GatewayNotificationWatcher `json:"gateways,omitempty" protobuf:"bytes,1,opt,name=gateways"` - // +listType=sensors - // Sensors is the list of sensors interested in listening to notifications from this gateway - Sensors []SensorNotificationWatcher `json:"sensors,omitempty" protobuf:"bytes,2,rep,name=sensors"` -} - -// GatewayNotificationWatcher is the gateway interested in listening to notifications from this gateway -type GatewayNotificationWatcher struct { - // Name is the gateway name - Name string `json:"name" protobuf:"bytes,1,name=name"` - // Port is http server port on which gateway is running - Port string `json:"port" protobuf:"bytes,2,name=port"` - // Endpoint is REST API endpoint to post event to. - // Events are sent using HTTP POST method to this endpoint. - Endpoint string `json:"endpoint" protobuf:"bytes,3,name=endpoint"` - // Namespace of the gateway - // +Optional - Namespace string `json:"namespace,omitempty" protobuf:"bytes,4,opt,name=namespace"` -} - -// SensorNotificationWatcher is the sensor interested in listening to notifications from this gateway -type SensorNotificationWatcher struct { - // Name is the name of the sensor - Name string `json:"name" protobuf:"bytes,1,name=name"` - // Namespace of the sensor - // +Optional - Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"` -} diff --git a/pkg/apis/gateway/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/gateway/v1alpha1/zz_generated.deepcopy.go index 06e3df7089..783481c869 100644 --- a/pkg/apis/gateway/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/gateway/v1alpha1/zz_generated.deepcopy.go @@ -103,22 +103,6 @@ func (in *GatewayList) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *GatewayNotificationWatcher) DeepCopyInto(out *GatewayNotificationWatcher) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GatewayNotificationWatcher. -func (in *GatewayNotificationWatcher) DeepCopy() *GatewayNotificationWatcher { - if in == nil { - return nil - } - out := new(GatewayNotificationWatcher) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GatewayResource) DeepCopyInto(out *GatewayResource) { *out = *in @@ -163,10 +147,10 @@ func (in *GatewaySpec) DeepCopyInto(out *GatewaySpec) { *out = new(corev1.Service) (*in).DeepCopyInto(*out) } - if in.Watchers != nil { - in, out := &in.Watchers, &out.Watchers - *out = new(NotificationWatchers) - (*in).DeepCopyInto(*out) + if in.Subscribers != nil { + in, out := &in.Subscribers, &out.Subscribers + *out = make([]string, len(*in)) + copy(*out, *in) } if in.EventProtocol != nil { in, out := &in.EventProtocol, &out.EventProtocol @@ -232,45 +216,3 @@ func (in *NodeStatus) DeepCopy() *NodeStatus { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *NotificationWatchers) DeepCopyInto(out *NotificationWatchers) { - *out = *in - if in.Gateways != nil { - in, out := &in.Gateways, &out.Gateways - *out = make([]GatewayNotificationWatcher, len(*in)) - copy(*out, *in) - } - if in.Sensors != nil { - in, out := &in.Sensors, &out.Sensors - *out = make([]SensorNotificationWatcher, len(*in)) - copy(*out, *in) - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NotificationWatchers. -func (in *NotificationWatchers) DeepCopy() *NotificationWatchers { - if in == nil { - return nil - } - out := new(NotificationWatchers) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SensorNotificationWatcher) DeepCopyInto(out *SensorNotificationWatcher) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SensorNotificationWatcher. -func (in *SensorNotificationWatcher) DeepCopy() *SensorNotificationWatcher { - if in == nil { - return nil - } - out := new(SensorNotificationWatcher) - in.DeepCopyInto(out) - return out -} diff --git a/sensors/triggers/params.go b/sensors/triggers/params.go index 218823e5b2..a123775bc6 100644 --- a/sensors/triggers/params.go +++ b/sensors/triggers/params.go @@ -179,7 +179,7 @@ func resolveParamValue(src *v1alpha1.TriggerParameterSource, events map[string]a } // helper method to extract the events from the event dependencies nodes associated with the resource params -// returns a map of the events keyed by the event dependency Name +// returns a map of the events keyed by the event dependency name func extractEvents(sensor *v1alpha1.Sensor, params []v1alpha1.TriggerParameter) map[string]apicommon.Event { events := make(map[string]apicommon.Event) for _, param := range params { diff --git a/sensors/triggers/params_test.go b/sensors/triggers/params_test.go index 8458af46a7..5e08f52a19 100644 --- a/sensors/triggers/params_test.go +++ b/sensors/triggers/params_test.go @@ -47,7 +47,7 @@ func TestExtractEvents(t *testing.T) { SpecVersion: "0.3", Subject: "example-1", }, - Data: []byte("{\"Name\": \"fake\"}"), + Data: []byte("{\"name\": \"fake\"}"), }, }, }, @@ -56,7 +56,7 @@ func TestExtractEvents(t *testing.T) { { Src: &v1alpha1.TriggerParameterSource{ Event: "fake-dependency", - DataKey: "Name", + DataKey: "name", }, }, }) @@ -68,7 +68,7 @@ func TestExtractEvents(t *testing.T) { { Src: &v1alpha1.TriggerParameterSource{ Event: "fake-dependency", - DataKey: "Name", + DataKey: "name", }, }, }) @@ -86,7 +86,7 @@ func TestResolveParamValue(t *testing.T) { ID: "1", Time: metav1.MicroTime{Time: time.Now()}, }, - Data: []byte("{\"Name\": {\"first\": \"fake\", \"last\": \"user\"} }"), + Data: []byte("{\"name\": {\"first\": \"fake\", \"last\": \"user\"} }"), } eventBody, err := json.Marshal(event) assert.Nil(t, err) @@ -103,10 +103,10 @@ func TestResolveParamValue(t *testing.T) { result string }{ { - name: "get first Name", + name: "get first name", source: &v1alpha1.TriggerParameterSource{ Event: "fake-dependency", - DataKey: "Name.first", + DataKey: "name.first", }, result: "fake", }, @@ -138,7 +138,7 @@ func TestResolveParamValue(t *testing.T) { source: &v1alpha1.TriggerParameterSource{ Event: "fake-dependency", ContextKey: "subject", - DataKey: "Name.first", + DataKey: "name.first", }, result: "fake", },