Skip to content

Commit

Permalink
Boolean circuit for event dependencies (#162)
Browse files Browse the repository at this point in the history
* Moving EventProtocol to api common

* Moving EventProtocol to api common

* Added support to declare dependency groups and circuit expression

* AAdded DependencyGroups as separate entity from Dependencies

* Validating every sensor file and other chores

* Added conditions for triggers

* Validating every sensor file and other chores
  • Loading branch information
VaibhavPage authored and magaldima committed Feb 4, 2019
1 parent 6d3d3ae commit 8989738
Show file tree
Hide file tree
Showing 26 changed files with 1,366 additions and 1,379 deletions.
863 changes: 0 additions & 863 deletions Gopkg.lock

This file was deleted.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ required = [
branch = "master"

[[constraint]]
name = "github.com/xanzy/go-gitlab"
name = "github.com/Knetic/govaluate"
branch = "master"

[[override]]
Expand Down
28 changes: 20 additions & 8 deletions controllers/gateway/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,33 @@ limitations under the License.
package gateway

import (
"fmt"
"github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1"
"github.com/ghodss/yaml"
"io/ioutil"
"strings"
"testing"

"github.com/smartystreets/goconvey/convey"
)

func TestValidate(t *testing.T) {
convey.Convey("Given a gateway", t, func() {
gateway, err := getGateway()

convey.Convey("Make sure gateway is a valid gateway", func() {
dir := "../../examples/gateways"
convey.Convey("Validate list of gateways", t, func() {
files, err := ioutil.ReadDir(dir)
convey.So(err, convey.ShouldBeNil)
for _, file := range files {
if strings.HasSuffix(file.Name(), "configmap.yaml") {
continue
}
fmt.Println("filename: ", file.Name())
content, err := ioutil.ReadFile(fmt.Sprintf("%s/%s", dir, file.Name()))
convey.So(err, convey.ShouldBeNil)
convey.So(gateway, convey.ShouldNotBeNil)

err := Validate(gateway)
var gateway *v1alpha1.Gateway
err = yaml.Unmarshal([]byte(content), &gateway)
convey.So(err, convey.ShouldBeNil)
err = Validate(gateway)
convey.So(err, convey.ShouldBeNil)
})
}
})
}
24 changes: 19 additions & 5 deletions controllers/sensor/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,15 @@ func (soc *sOperationCtx) operate() error {
}

// Initialize all event dependency nodes
for _, eventDependency := range soc.s.Spec.Dependencies {
InitializeNode(soc.s, eventDependency.Name, v1alpha1.NodeTypeEventDependency, &soc.log)
for _, dependency := range soc.s.Spec.Dependencies {
InitializeNode(soc.s, dependency.Name, v1alpha1.NodeTypeEventDependency, &soc.log)
}

// Initialize all dependency groups
if soc.s.Spec.DependencyGroups != nil {
for _, group := range soc.s.Spec.DependencyGroups {
InitializeNode(soc.s, group.Name, v1alpha1.NodeTypeDependencyGroup, &soc.log)
}
}

// Initialize all trigger nodes
Expand Down Expand Up @@ -182,9 +189,16 @@ func (soc *sOperationCtx) operate() error {
}
}

// Mark all eventDependency nodes as active
for _, eventDependency := range soc.s.Spec.Dependencies {
MarkNodePhase(soc.s, eventDependency.Name, v1alpha1.NodeTypeEventDependency, v1alpha1.NodePhaseActive, nil, &soc.log, "node is active")
// Mark all event dependency nodes as active
for _, dependency := range soc.s.Spec.Dependencies {
MarkNodePhase(soc.s, dependency.Name, v1alpha1.NodeTypeEventDependency, v1alpha1.NodePhaseActive, nil, &soc.log, "node is active")
}

// Mark all dependency groups as active
if soc.s.Spec.DependencyGroups != nil {
for _, group := range soc.s.Spec.DependencyGroups {
MarkNodePhase(soc.s, group.Name, v1alpha1.NodeTypeDependencyGroup, v1alpha1.NodePhaseActive, nil, &soc.log, "node is active")
}
}

// if we get here - we know the signals are running
Expand Down
2 changes: 2 additions & 0 deletions controllers/sensor/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func TestSensorOperations(t *testing.T) {
switch node.Type {
case v1alpha1.NodeTypeEventDependency:
convey.So(node.Phase, convey.ShouldEqual, v1alpha1.NodePhaseActive)
case v1alpha1.NodeTypeDependencyGroup:
convey.So(node.Phase, convey.ShouldEqual, v1alpha1.NodePhaseActive)
case v1alpha1.NodeTypeTrigger:
convey.So(node.Phase, convey.ShouldEqual, v1alpha1.NodePhaseNew)
}
Expand Down
23 changes: 23 additions & 0 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sensor

import (
"fmt"
"github.com/Knetic/govaluate"
"time"

"github.com/argoproj/argo-events/common"
Expand Down Expand Up @@ -61,6 +62,25 @@ func ValidateSensor(s *v1alpha1.Sensor) error {
default:
return fmt.Errorf("unknown gateway type")
}

if s.Spec.DependencyGroups != nil {
if s.Spec.Circuit == "" {
return fmt.Errorf("no circuit expression provided to resolve dependency groups")
}
expression, err := govaluate.NewEvaluableExpression(s.Spec.Circuit)
if err != nil {
return fmt.Errorf("circuit expression can't be created for dependency groups. err: %+v", err)
}

groups := make(map[string]interface{}, len(s.Spec.DependencyGroups))
for _, group := range s.Spec.DependencyGroups {
groups[group.Name] = false
}
if _, err = expression.Evaluate(groups); err != nil {
return fmt.Errorf("circuit expression can't be evaluated for dependency groups. err: %+v", err)
}
}

return nil
}

Expand All @@ -77,6 +97,9 @@ func validateTriggers(triggers []v1alpha1.Trigger) error {
if trigger.Resource == nil {
return fmt.Errorf("trigger '%s' does not contain an absolute action", trigger.Name)
}
if trigger.When != nil && trigger.When.All != nil && trigger.When.Any != nil {
return fmt.Errorf("trigger condition can't have both any and all condition")
}
}
return nil
}
Expand Down
21 changes: 16 additions & 5 deletions controllers/sensor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,29 @@ limitations under the License.
package sensor

import (
"fmt"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/ghodss/yaml"
"io/ioutil"
"testing"

"github.com/smartystreets/goconvey/convey"
)

func TestValidateSensor(t *testing.T) {
convey.Convey("Given a sensor", t, func() {
sensor, err := getSensor()
dir := "../../examples/sensors"
convey.Convey("Validate list of sensor", t, func() {
files, err := ioutil.ReadDir(dir)
convey.So(err, convey.ShouldBeNil)
convey.Convey("Validate", func() {
err := ValidateSensor(sensor)
for _, file := range files {
fmt.Println("filename: ", file.Name())
content, err := ioutil.ReadFile(fmt.Sprintf("%s/%s", dir, file.Name()))
convey.So(err, convey.ShouldBeNil)
})
var sensor *v1alpha1.Sensor
err = yaml.Unmarshal([]byte(content), &sensor)
convey.So(err, convey.ShouldBeNil)
err = ValidateSensor(sensor)
convey.So(err, convey.ShouldBeNil)
}
})
}
5 changes: 3 additions & 2 deletions examples/sensors/context-filter-webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ spec:
kind: Workflow
source:
s3:
bucket: workflows
key: hello-world.yaml
bucket:
name: workflows
key: hello-world.yaml
endpoint: minio-service.argo-events:9000
insecure: true
accessKey:
Expand Down
13 changes: 8 additions & 5 deletions examples/sensors/data-filter-webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ spec:
dependencies:
- name: "webhook-gateway:foo"
filters:
name: "data-filter"
data:
- path: bucket
type: string
value: argo-workflow-input
dataFilters:
- path: bucket
type: string
value: argo-workflow-input
eventProtocol:
type: "HTTP"
http:
Expand All @@ -32,8 +34,9 @@ spec:
kind: Workflow
source:
s3:
bucket: workflows
key: hello-world.yaml
bucket:
name: workflows
key: hello-world.yaml
endpoint: minio-service.argo-events:9000
insecure: true
accessKey:
Expand Down
138 changes: 138 additions & 0 deletions examples/sensors/webhook-http-dependency-groups.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# The dependency grouping and selective workflow trigger execution is not supported in latest release
# This feature will be released in next release v0.8.
# You can try this example with sensor and sensor controller image v0.7.1
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: webhook-sensor-http
labels:
sensors.argoproj.io/sensor-controller-instanceid: argo-events
spec:
deploySpec:
containers:
- name: "sensor"
image: "argoproj/sensor:v0.7.1"
imagePullPolicy: Always
serviceAccountName: argo-events-sa
dependencies:
- name: "webhook-gateway-http:endpoint1"
filters:
context:
source:
host: xyz.com
contentType: application/json
- name: "webhook-gateway-http:endpoint2"
- name: "webhook-gateway-http:endpoint3"
- name: "webhook-gateway-http:endpoint4"
- name: "webhook-gateway-http:endpoint5"
- name: "webhook-gateway-http:endpoint6"
- name: "webhook-gateway-http:endpoint7"
filters:
name: "data-filter"
data:
dataFilters:
- path: bucket
type: string
value: argo-workflow-input
- name: "webhook-gateway-http:endpoint8"
- name: "webhook-gateway-http:endpoint9"
dependencyGroups:
- name: "group_1"
dependencies:
- "webhook-gateway-http:endpoint1"
- "webhook-gateway-http:endpoint2"
- name: "group_2"
dependencies:
- "webhook-gateway-http:endpoint3"
- name: "group_3"
dependencies:
- "webhook-gateway-http:endpoint4"
- "webhook-gateway-http:endpoint5"
- name: "group_4"
dependencies:
- "webhook-gateway-http:endpoint6"
- "webhook-gateway-http:endpoint7"
- "webhook-gateway-http:endpoint8"
- name: "group_5"
dependencies:
- "webhook-gateway-http:endpoint9"
circuit: "group_1 || group_2 || ((group_3 || group_4) && group_5)"
eventProtocol:
type: "HTTP"
http:
port: "9300"
triggers:
- name: webhook-workflow-trigger
when:
any:
- "group_1"
- "group_2"
resource:
namespace: argo-events
group: argoproj.io
version: v1alpha1
kind: Workflow
source:
inline: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
args:
- "hello world"
command:
- cowsay
image: "docker/whalesay:latest"
- name: webhook-workflow-trigger-2
when:
all:
- "group_5"
- "group_4"
resource:
namespace: argo-events
group: argoproj.io
version: v1alpha1
kind: Workflow
source:
inline: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-2-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
args:
- "hello world"
command:
- cowsay
image: "docker/whalesay:latest"
- name: webhook-workflow-trigger-common
resource:
namespace: argo-events
group: argoproj.io
version: v1alpha1
kind: Workflow
source:
inline: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-common-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
args:
- "hello world"
command:
- cowsay
image: "docker/whalesay:latest"
2 changes: 1 addition & 1 deletion examples/sensors/webhook-http.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ spec:
- "hello world"
command:
- cowsay
image: "docker/whalesay:latest"
image: "docker/whalesay:latest"
2 changes: 1 addition & 1 deletion examples/sensors/webhook-with-complete-payload.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: webhook-with-resource-param-sensor
name: webhook-with-complete-payload-sensor
labels:
sensors.argoproj.io/sensor-controller-instanceid: argo-events
spec:
Expand Down
Loading

0 comments on commit 8989738

Please sign in to comment.