Skip to content

Commit

Permalink
feat: extend resource eventsource field filter. Closes #913 (#915)
Browse files Browse the repository at this point in the history
* feat: extend resource eventsource field filter. Closes #913

* re-run codegen with lastest panddoc

* infof
  • Loading branch information
whynowy authored Oct 20, 2020
1 parent 9bbb43a commit c1a7453
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 51 deletions.
7 changes: 5 additions & 2 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 41 additions & 27 deletions eventsources/sources/resource/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package resource
import (
"context"
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
"time"

"github.com/pkg/errors"
"github.com/tidwall/gjson"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
Expand Down Expand Up @@ -111,14 +111,6 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
options.LabelSelector = sel.String()
}

if resourceEventSource.Filter != nil && resourceEventSource.Filter.Fields != nil {
sel, err := FieldSelector(resourceEventSource.Filter.Fields)
if err != nil {
return errors.Wrapf(err, "failed to create the field selector for the event source %s", el.GetEventName())
}
options.FieldSelector = sel.String()
}

tweakListOptions := func(op *metav1.ListOptions) {
*op = *options
}
Expand Down Expand Up @@ -247,23 +239,6 @@ func LabelSelector(selectors []v1alpha1.Selector) (labels.Selector, error) {
return labels.NewSelector().Add(labelRequirements...), nil
}

// FieldSelector returns field selector for resource filtering
func FieldSelector(selectors []v1alpha1.Selector) (fields.Selector, error) {
var result []fields.Selector
for _, sel := range selectors {
op := selection.Equals
if sel.Operation != "" {
op = selection.Operator(sel.Operation)
}
selector, err := fields.ParseSelector(fmt.Sprintf("%s%s%s", sel.Key, op, sel.Value))
if err != nil {
return nil, err
}
result = append(result, selector)
}
return fields.AndSelectors(result...), nil
}

// helper method to check if the object passed the user defined filters
func passFilters(event *InformerEvent, filter *v1alpha1.ResourceFilter, startTime time.Time, log *zap.SugaredLogger) bool {
// no filters are applied.
Expand All @@ -284,5 +259,44 @@ func passFilters(event *InformerEvent, filter *v1alpha1.ResourceFilter, startTim
log.Infof("resource is created before service start time. creation-timestamp: %s, start-timestamp: %s\n", created.UTC().String(), startTime.UTC().String())
return false
}
if len(filter.Fields) > 0 {
jsData, err := uObj.MarshalJSON()
if err != nil {
log.Errorw("failed to marshal informer event", zap.Error(err))
return false
}

return filterFields(jsData, filter.Fields, log)
}
return true
}

func filterFields(jsonData []byte, selectors []v1alpha1.Selector, log *zap.SugaredLogger) bool {
for _, selector := range selectors {
res := gjson.GetBytes(jsonData, selector.Key)
if !res.Exists() {
return false
}
exp, err := regexp.Compile(selector.Value)
if err != nil {
log.Errorw("invalid regex", zap.Error(err))
return false
}
match := exp.Match([]byte(res.Str))

switch selection.Operator(selector.Operation) {
case selection.Equals, selection.DoubleEquals:
if !match {
return false
}
case selection.NotEquals:
if match {
return false
}
default:
log.Errorf("invalid operator, only %v, %v and %v are supported", selection.Equals, selection.DoubleEquals, selection.NotEquals)
return false
}
}
return true
}
29 changes: 27 additions & 2 deletions eventsources/sources/resource/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ limitations under the License.
package resource

import (
"encoding/json"
"testing"
"time"

"github.com/mitchellh/mapstructure"
"github.com/smartystreets/goconvey/convey"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -53,6 +53,23 @@ func TestFilter(t *testing.T) {
Value: "my-workflow",
},
},
Fields: []v1alpha1.Selector{
{
Key: "metadata.name",
Operation: "==",
Value: "fak*",
},
{
Key: "status.phase",
Operation: "!=",
Value: "Error",
},
{
Key: "spec.serviceAccountName",
Operation: "=",
Value: "test*",
},
},
},
}
pod := &corev1.Pod{
Expand All @@ -64,12 +81,20 @@ func TestFilter(t *testing.T) {
"name": "my-workflow",
},
},
Spec: corev1.PodSpec{
ServiceAccountName: "test-sa",
},
Status: corev1.PodStatus{
Phase: "Running",
},
}
pod, err := fake.NewSimpleClientset().CoreV1().Pods("fake").Create(pod)
convey.So(err, convey.ShouldBeNil)

outmap := make(map[string]interface{})
err = mapstructure.Decode(pod, &outmap)
jsonData, err := json.Marshal(pod)
convey.So(err, convey.ShouldBeNil)
err = json.Unmarshal(jsonData, &outmap)
convey.So(err, convey.ShouldBeNil)

pass := passFilters(&InformerEvent{
Expand Down
6 changes: 4 additions & 2 deletions examples/event-sources/resource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ spec:

# # fields provide listing options to K8s API to watch objects
# fields:
# # It's an extention of https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/.
# # Unlike k8s field selector, any arbitrary field like "spec.serviceAccountName" is supported.
# - key: metadata.name
# # Supported operations like ==, !=, <=, >= etc.
# # Supported operations like =, ==, !=.
# # Defaults to ==.
# # Refer https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/ for more info.
# # optional.
# operation: ==
# # Value could a string or a regex like "my*"
# value: my-workflow


Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/minio/minio-go v1.0.1-0.20190523192347-c6c2912aa552
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.3.0
github.com/mitchellh/mapstructure v1.3.0 // indirect
github.com/mitchellh/reflectwalk v1.0.1 // indirect
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.7.2
Expand Down
8 changes: 4 additions & 4 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ spec:
spec:
containers:
- env:
- name: EVENTSOURCE_IMAGE
value: argoproj/eventsource:latest
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: EVENTSOURCE_IMAGE
value: argoproj/eventsource:latest
image: argoproj/eventsource-controller:latest
imagePullPolicy: Always
livenessProbe:
Expand Down Expand Up @@ -344,12 +344,12 @@ spec:
spec:
containers:
- env:
- name: SENSOR_IMAGE
value: argoproj/sensor:latest
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SENSOR_IMAGE
value: argoproj/sensor:latest
image: argoproj/sensor-controller:latest
livenessProbe:
httpGet:
Expand Down
8 changes: 4 additions & 4 deletions manifests/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ spec:
- args:
- --namespaced
env:
- name: EVENTSOURCE_IMAGE
value: argoproj/eventsource:latest
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: EVENTSOURCE_IMAGE
value: argoproj/eventsource:latest
image: argoproj/eventsource-controller:latest
imagePullPolicy: Always
livenessProbe:
Expand Down Expand Up @@ -265,12 +265,12 @@ spec:
- args:
- --namespaced
env:
- name: SENSOR_IMAGE
value: argoproj/sensor:latest
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SENSOR_IMAGE
value: argoproj/sensor:latest
image: argoproj/sensor-controller:latest
livenessProbe:
httpGet:
Expand Down
7 changes: 5 additions & 2 deletions pkg/apis/eventsource/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/eventsource/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions pkg/apis/eventsource/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,11 @@ type ResourceFilter struct {
// Refer https://kubernetes.io/docs/concepts/overview/working-with-objects/label-selectors/ for more info.
// +optional
Labels []Selector `json:"labels,omitempty" protobuf:"bytes,2,rep,name=labels"`
// Fields provide listing options to K8s API to watch resource/s.
// Refer https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/ for more info.
// Fields provide field filters similar to K8s field selector
// (see https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/).
// Unlike K8s field selector, it supports arbitrary fileds like "spec.serviceAccountName",
// and the value could be a string or a regex.
// Same as K8s field selector, operator "=", "==" and "!=" are supported.
// +optional
Fields []Selector `json:"fields,omitempty" protobuf:"bytes,3,rep,name=fields"`
// If resource is created before the specified time then the event is treated as valid.
Expand Down

0 comments on commit c1a7453

Please sign in to comment.