Skip to content

Commit

Permalink
fix(resource-eventsource): Use event time instead of obj create time …
Browse files Browse the repository at this point in the history
…to filter UPDATE/DELETE events (#1157)

* fix(resource-eventsource): Use event time instead of obj create time to watch UPDATE/DELETE events

Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Apr 5, 2021
1 parent eb94f71 commit f945fb5
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 4 deletions.
32 changes: 28 additions & 4 deletions eventsources/sources/resource/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,16 @@ func passFilters(event *InformerEvent, filter *v1alpha1.ResourceFilter, startTim
log.Infof("resource name does not match prefix. resource-name: %s, prefix: %s\n", uObj.GetName(), filter.Prefix)
return false
}
eventTime := getEventTime(uObj, event.Type)
if filter.AfterStart && eventTime.UTC().Before(startTime.UTC()) {
log.Infof("Event happened before service start time. event-timestamp: %s, start-timestamp: %s\n", eventTime.UTC().String(), startTime.UTC().String())
return false
}
created := uObj.GetCreationTimestamp()
if !filter.CreatedBy.IsZero() && created.UTC().After(filter.CreatedBy.UTC()) {
log.Infof("resource is created after filter time. creation-timestamp: %s, filter-creation-timestamp: %s\n", created.UTC().String(), filter.CreatedBy.UTC().String())
return false
}
if filter.AfterStart && created.UTC().Before(startTime.UTC()) {
log.Infof("resource is created before service start time. creation-timestamp: %s, start-timestamp: %s\n", created.UTC().String(), startTime.UTC().String())
return false
}
if len(filter.Fields) > 0 {
jsData, err := uObj.MarshalJSON()
if err != nil {
Expand Down Expand Up @@ -324,3 +325,26 @@ func filterFields(jsonData []byte, selectors []v1alpha1.Selector, log *zap.Sugar
}
return true
}

func getEventTime(obj *unstructured.Unstructured, eventType v1alpha1.ResourceEventType) metav1.Time {
switch eventType {
case v1alpha1.ADD:
return obj.GetCreationTimestamp()
case v1alpha1.DELETE:
if obj.GetDeletionTimestamp() != nil {
return *obj.GetDeletionTimestamp()
} else {
return metav1.Now()
}
case v1alpha1.UPDATE:
t := obj.GetCreationTimestamp()
for _, f := range obj.GetManagedFields() {
if f.Operation == metav1.ManagedFieldsOperationUpdate && f.Time.UTC().After(t.UTC()) {
t = *f.Time
}
}
return t
default:
return obj.GetCreationTimestamp()
}
}
1 change: 1 addition & 0 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (s *E2ESuite) DeleteResources() {
resources := []schema.GroupVersionResource{
{Group: eventsource.Group, Version: "v1alpha1", Resource: eventsource.Plural},
{Group: sensor.Group, Version: "v1alpha1", Resource: sensor.Plural},
{Group: "", Version: "v1", Resource: "pods"},
}
s.deleteResources(resources)
}
Expand Down
10 changes: 10 additions & 0 deletions test/e2e/fixtures/given.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -128,3 +129,12 @@ func (g *Given) When() *When {
kubeClient: g.kubeClient,
}
}

var OutputRegexp = func(rx string) func(t *testing.T, output string, err error) {
return func(t *testing.T, output string, err error) {
t.Helper()
if assert.NoError(t, err, output) {
assert.Regexp(t, rx, output)
}
}
}
24 changes: 24 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,30 @@ func (s *FunctionalSuite) TestMetricsWithWebhook() {
Contains("argo_events_action_failed_total")
}

func (s *FunctionalSuite) TestResourceEventSource() {
w1 := s.Given().EventSource("@testdata/es-resource.yaml").
When().
CreateEventSource().
WaitForEventSourceReady().
Exec("kubectl", []string{"-n", fixtures.Namespace, "run", "test-pod", "--image", "hello-world", "-l", fixtures.Label + "=" + fixtures.LabelValue}, fixtures.OutputRegexp(`pod/.* created`))

t1 := w1.Then().
ExpectEventSourcePodLogContains(LogEventSourceStarted)

t2 := s.Given().Sensor("@testdata/sensor-resource.yaml").
When().
CreateSensor().
WaitForSensorReady().
Then().
ExpectSensorPodLogContains(LogSensorStarted)

w1.Exec("kubectl", []string{"-n", fixtures.Namespace, "delete", "pod", "test-pod"}, fixtures.OutputRegexp(`pod "test-pod" deleted`))

t1.ExpectEventSourcePodLogContains(LogPublishEventSuccessful)

t2.ExpectSensorPodLogContains(LogTriggerActionSuccessful)
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
21 changes: 21 additions & 0 deletions test/e2e/testdata/es-resource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: test-resource
spec:
template:
serviceAccountName: argo-events-sa
resource:
example:
namespace: argo-events
group: ""
version: v1
resource: pods
eventTypes:
- DELETE
filter:
afterStart: true
fields:
- key: metadata.name
operation: ==
value: test-pod
13 changes: 13 additions & 0 deletions test/e2e/testdata/sensor-resource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: e2e-resource-log
spec:
dependencies:
- name: test-dep
eventSourceName: test-resource
eventName: example
triggers:
- template:
name: log-trigger
log: {}

0 comments on commit f945fb5

Please sign in to comment.