Skip to content

Commit

Permalink
feat(controller): Add audit logs to workflows. Fixes #1769 (#1930)
Browse files Browse the repository at this point in the history
  • Loading branch information
amarrella authored and alexec committed Jan 22, 2020
1 parent 2982c1a commit 5c98a14
Show file tree
Hide file tree
Showing 16 changed files with 445 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,10 @@ rules:
- watch
- update
- patch
- delete
- delete
- apiGroups:
- ""
resources:
- events
verbs:
- create
6 changes: 6 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ rules:
- update
- patch
- delete
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions manifests/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ rules:
- update
- patch
- delete
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,10 @@ rules:
- watch
- update
- patch
- delete
- delete
- apiGroups:
- ""
resources:
- events
verbs:
- create
3 changes: 2 additions & 1 deletion pkg/apis/workflow/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (

// SchemeGroupVersion is group version used to register these objects
var (
SchemeGroupVersion = schema.GroupVersion{Group: workflow.Group, Version: "v1alpha1"}
SchemeGroupVersion = schema.GroupVersion{Group: workflow.Group, Version: "v1alpha1"}
WorkflowSchemaGroupVersionKind = schema.GroupVersionKind{Group: workflow.Group, Version: "v1alpha1", Kind: workflow.WorkflowKind}
)

// Kind takes an unqualified kind and returns back a Group qualified GroupKind
Expand Down
14 changes: 14 additions & 0 deletions test/e2e/expectedfailures/failed-step-event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# e2e test to ensure the controller publishes
# an audit event marking the failure
# in case of node failure
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: failed-step-event-
spec:
entrypoint: exit
templates:
- name: exit
container:
image: docker/whalesay:latest
command: [sh, -c, exit 1]
42 changes: 42 additions & 0 deletions test/e2e/expectedfailures/volumes-pvc-fail-event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This example demonstrates that argo emits
# a WorkflowFailed event in case of pvc creation failure
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: volumes-pvc-fail-event-
spec:
entrypoint: volumes-pvc-example
volumeClaimTemplates:
- metadata:
name: workdir
spec:
accessModes: [ "InvalidAccessMode" ]
resources:
requests:
storage: 1Gi

templates:
- name: volumes-pvc-example
steps:
- - name: generate
template: whalesay
- - name: print
template: print-message

- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol

- name: print-message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol
1 change: 1 addition & 0 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,5 +277,6 @@ func (s *E2ESuite) Given() *Given {
wfTemplateClient: s.wfTemplateClient,
cronClient: s.cronClient,
offloadNodeStatusRepo: s.offloadNodeStatusRepo,
kubeClient: s.KubeClient,
}
}
4 changes: 4 additions & 0 deletions test/e2e/fixtures/given.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"strings"
"testing"

"k8s.io/client-go/kubernetes"

"sigs.k8s.io/yaml"

"github.com/argoproj/argo/persist/sqldb"
Expand All @@ -23,6 +25,7 @@ type Given struct {
wfTemplates []*wfv1.WorkflowTemplate
cronWf *wfv1.CronWorkflow
workflowName string
kubeClient kubernetes.Interface
}

// creates a workflow based on the parameter, this may be:
Expand Down Expand Up @@ -163,5 +166,6 @@ func (g *Given) When() *When {
cronClient: g.cronClient,
offloadNodeStatusRepo: g.offloadNodeStatusRepo,
workflowName: g.workflowName,
kubeClient: g.kubeClient,
}
}
20 changes: 20 additions & 0 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"testing"

log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand All @@ -20,6 +22,7 @@ type Then struct {
client v1alpha1.WorkflowInterface
cronClient v1alpha1.CronWorkflowInterface
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
kubeClient kubernetes.Interface
}

func (t *Then) Expect(block func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus)) *Then {
Expand Down Expand Up @@ -67,6 +70,23 @@ func (t *Then) ExpectWorkflowList(listOptions metav1.ListOptions, block func(t *
return t
}

func (t *Then) ExpectAuditEvents(block func(*testing.T, *apiv1.EventList)) *Then {
if t.workflowName == "" {
t.t.Fatal("No workflow to test")
}
log.WithFields(log.Fields{"test": t.t.Name(), "workflow": t.workflowName}).Info("Checking expectation")
wf, err := t.client.Get(t.workflowName, metav1.GetOptions{})
if err != nil {
t.t.Fatal(err)
}
eventList, err := t.kubeClient.CoreV1().Events(wf.ObjectMeta.Namespace).List(metav1.ListOptions{})
if err != nil {
t.t.Fatal(err)
}
block(t.t, eventList)
return t
}

func (t *Then) RunCli(args []string, block func(t *testing.T, output string, err error)) *Then {
output, err := runCli(t.diagnostics, args)
block(t.t, output, err)
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/argoproj/pkg/humanize"
"k8s.io/client-go/kubernetes"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,6 +31,7 @@ type When struct {
workflowName string
wfTemplateNames []string
cronWorkflowName string
kubeClient kubernetes.Interface
}

func (w *When) SubmitWorkflow() *When {
Expand Down Expand Up @@ -162,5 +164,6 @@ func (w *When) Then() *Then {
client: w.client,
cronClient: w.cronClient,
offloadNodeStatusRepo: w.offloadNodeStatusRepo,
kubeClient: w.kubeClient,
}
}
13 changes: 13 additions & 0 deletions test/e2e/functional/success-event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# e2e test to ensure the controller publishes
# an audit event marking the success
# in case of workflow success
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: success-event-
spec:
entrypoint: exit
templates:
- name: exit
container:
image: docker/whalesay:latest
70 changes: 70 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"strings"
"testing"
"time"

Expand All @@ -10,6 +11,9 @@ import (

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test/e2e/fixtures"
"github.com/argoproj/argo/util/argo"

apiv1 "k8s.io/api/core/v1"
)

type FunctionalSuite struct {
Expand Down Expand Up @@ -93,6 +97,72 @@ func (s *FunctionalSuite) TestFastFailOnPodTermination() {
})
}

func (s *FunctionalSuite) TestEventOnNodeFail() {
// Test whether an WorkflowFailed event (with appropriate message) is emitted in case of node failure
s.Given().
Workflow("@expectedfailures/failed-step-event.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(30 * time.Second).
Then().
ExpectAuditEvents(func(t *testing.T, events *apiv1.EventList) {
found := false
for _, e := range events.Items {
isAboutFailedStep := strings.HasPrefix(e.InvolvedObject.Name, "failed-step-event-")
isFailureEvent := e.Reason == argo.EventReasonWorkflowFailed
if isAboutFailedStep && isFailureEvent {
found = true
assert.Equal(t, "failed with exit code 1", e.Message)
}
}
assert.True(t, found, "event not found")
})
}

func (s *FunctionalSuite) TestEventOnWorkflowSuccess() {
// Test whether an WorkflowSuccess event is emitted in case of successfully completed workflow
s.Given().
Workflow("@functional/success-event.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(60 * time.Second).
Then().
ExpectAuditEvents(func(t *testing.T, events *apiv1.EventList) {
found := false
for _, e := range events.Items {
isAboutSuccess := strings.HasPrefix(e.InvolvedObject.Name, "success-event-")
isSuccessEvent := e.Reason == argo.EventReasonWorkflowSucceded
if isAboutSuccess && isSuccessEvent {
found = true
assert.Equal(t, "Workflow completed", e.Message)
}
}
assert.True(t, found, "event not found")
})
}

func (s *FunctionalSuite) TestEventOnPVCFail() {
// Test whether an WorkflowFailed event (with appropriate message) is emitted in case of error in creating the PVC
s.Given().
Workflow("@expectedfailures/volumes-pvc-fail-event.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(120 * time.Second).
Then().
ExpectAuditEvents(func(t *testing.T, events *apiv1.EventList) {
found := false
for _, e := range events.Items {
isAboutSuccess := strings.HasPrefix(e.InvolvedObject.Name, "volumes-pvc-fail-event-")
isFailureEvent := e.Reason == argo.EventReasonWorkflowFailed
if isAboutSuccess && isFailureEvent {
found = true
assert.True(t, strings.Contains(e.Message, "pvc create error"), "event should contain \"pvc create error\"")
}
}
assert.True(t, found, "event not found")
})
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
Loading

0 comments on commit 5c98a14

Please sign in to comment.