Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Emit audit events for workflow event binding errors #3704

Merged
merged 2 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 1 addition & 34 deletions pkg/apis/workflow/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,5 @@ const (
ClusterWorkflowTemplatePlural string = "clusterworkflowtemplates"
ClusterWorkflowTemplateShortName string = "cwftmpl"
ClusterWorkflowTemplateFullName string = ClusterWorkflowTemplatePlural + "." + Group
WorkflowEventBindingKind string = "WorkflowEventBinding"
)

type CRD struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered we need to add new CRD for CRD generation. Do we not need it now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused code

Kind, Singular, Plural, ShortName, FullName string
}

var CRDs = []CRD{
{
Kind: ClusterWorkflowTemplateKind,
Singular: ClusterWorkflowTemplateSingular,
Plural: ClusterWorkflowTemplatePlural,
ShortName: ClusterWorkflowTemplateShortName,
FullName: ClusterWorkflowTemplateFullName,
},
{
Kind: CronWorkflowKind,
Singular: CronWorkflowSingular,
Plural: CronWorkflowPlural,
ShortName: CronWorkflowShortName,
FullName: CronWorkflowFullName,
},
{
Kind: WorkflowKind,
Singular: WorkflowSingular,
Plural: WorkflowPlural,
ShortName: WorkflowShortName,
FullName: WorkflowFullName,
},
{
Kind: WorkflowTemplateKind,
Singular: WorkflowTemplateSingular,
ShortName: WorkflowTemplateShortName,
FullName: WorkflowTemplateFullName,
},
}
4 changes: 3 additions & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
grpcutil "github.com/argoproj/argo/util/grpc"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/util/json"
"github.com/argoproj/argo/workflow/events"
"github.com/argoproj/argo/workflow/hydrator"
)

Expand Down Expand Up @@ -156,8 +157,9 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService)
}
eventRecorderManager := events.NewEventRecorderManager(as.kubeClientset)
artifactServer := artifacts.NewArtifactServer(as.authenticator, hydrator.New(offloadRepo), wfArchive, instanceIDService)
eventServer := event.NewController(instanceIDService, as.eventQueueSize, as.eventWorkerCount)
eventServer := event.NewController(instanceIDService, eventRecorderManager, as.eventQueueSize, as.eventWorkerCount)
grpcServer := as.newGRPCServer(instanceIDService, offloadRepo, wfArchive, eventServer, configMap.Links)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

Expand Down
7 changes: 6 additions & 1 deletion server/event/dispatch/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"github.com/antonmedv/expr"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand All @@ -26,18 +28,20 @@ import (

type Operation struct {
ctx context.Context
eventRecorder record.EventRecorder
instanceIDService instanceid.Service
events []wfv1.WorkflowEventBinding
env map[string]interface{}
}

func NewOperation(ctx context.Context, instanceIDService instanceid.Service, events []wfv1.WorkflowEventBinding, namespace, discriminator string, payload *wfv1.Item) (*Operation, error) {
func NewOperation(ctx context.Context, instanceIDService instanceid.Service, eventRecorder record.EventRecorder, events []wfv1.WorkflowEventBinding, namespace, discriminator string, payload *wfv1.Item) (*Operation, error) {
env, err := expressionEnvironment(ctx, namespace, discriminator, payload)
if err != nil {
return nil, fmt.Errorf("failed to create workflow template expression environment: %w", err)
}
return &Operation{
ctx: ctx,
eventRecorder: eventRecorder,
instanceIDService: instanceIDService,
events: events,
env: env,
Expand All @@ -60,6 +64,7 @@ func (o *Operation) Dispatch() {
})
if err != nil {
log.WithError(err).WithFields(log.Fields{"namespace": event.Namespace, "event": event.Name}).Error("failed to dispatch from event")
o.eventRecorder.Event(&event, corev1.EventTypeWarning, "WorkflowEventBindingError", "failed to dispatch event: "+err.Error())
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions server/event/dispatch/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"google.golang.org/grpc/metadata"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -45,9 +46,13 @@ func TestNewOperation(t *testing.T) {
},
)
ctx := context.WithValue(context.WithValue(context.Background(), auth.WfKey, client), auth.ClaimSetKey, &jws.ClaimSet{Sub: "my-sub"})
recorder := record.NewFakeRecorder(1)

// act
operation, err := NewOperation(ctx, instanceid.NewService("my-instanceid"), []wfv1.WorkflowEventBinding{
operation, err := NewOperation(ctx, instanceid.NewService("my-instanceid"), recorder, []wfv1.WorkflowEventBinding{
{
ObjectMeta: metav1.ObjectMeta{Name: "malformed", Namespace: "my-ns"},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "my-wfeb-1", Namespace: "my-ns"},
Spec: wfv1.WorkflowEventBindingSpec{
Expand Down Expand Up @@ -81,9 +86,9 @@ func TestNewOperation(t *testing.T) {
assert.Contains(t, wf.Labels, common.LabelKeyWorkflowEventBinding)
fromString := intstr.FromString(`foo`)
assert.Equal(t, []wfv1.Parameter{{Name: "my-param", Value: &fromString}}, wf.Spec.Arguments.Parameters)

}
}
assert.Equal(t, "Warning WorkflowEventBindingError failed to dispatch event: failed to evaluate workflow template expression: unexpected token EOF (1:1)", <-recorder.Events)
}

func Test_expressionEnvironment(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions server/event/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@ import (
"github.com/argoproj/argo/server/auth"
"github.com/argoproj/argo/server/event/dispatch"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/workflow/events"
)

type Controller struct {
instanceIDService instanceid.Service
instanceIDService instanceid.Service
eventRecorderManager events.EventRecorderManager
// a channel for operations to be executed async on
operationQueue chan dispatch.Operation
workerCount int
}

var _ eventpkg.EventServiceServer = &Controller{}

func NewController(instanceIDService instanceid.Service, operationQueueSize, workerCount int) *Controller {
func NewController(instanceIDService instanceid.Service, eventRecorderManager events.EventRecorderManager, operationQueueSize, workerCount int) *Controller {
log.WithFields(log.Fields{"workerCount": workerCount, "operationQueueSize": operationQueueSize}).Info("Creating event controller")

return &Controller{
instanceIDService: instanceIDService,
instanceIDService: instanceIDService,
eventRecorderManager: eventRecorderManager,
// so we can have `operationQueueSize` operations outstanding before we start putting back pressure on the senders
operationQueue: make(chan dispatch.Operation, operationQueueSize),
workerCount: workerCount,
Expand Down Expand Up @@ -70,7 +73,7 @@ func (s *Controller) ReceiveEvent(ctx context.Context, req *eventpkg.EventReques
return nil, err
}

operation, err := dispatch.NewOperation(ctx, s.instanceIDService, list.Items, req.Namespace, req.Discriminator, req.Payload)
operation, err := dispatch.NewOperation(ctx, s.instanceIDService, s.eventRecorderManager.Get(req.Namespace), list.Items, req.Namespace, req.Discriminator, req.Payload)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions server/event/event_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
fakekube "k8s.io/client-go/kubernetes/fake"

eventpkg "github.com/argoproj/argo/pkg/apiclient/event"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo/server/auth"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/workflow/events"
)

func TestController(t *testing.T) {
clientset := fake.NewSimpleClientset()
s := NewController(instanceid.NewService("my-instanceid"), 1, 1)
s := NewController(instanceid.NewService("my-instanceid"), events.NewEventRecorderManager(fakekube.NewSimpleClientset()), 1, 1)

ctx := context.WithValue(context.TODO(), auth.WfKey, clientset)
_, err := s.ReceiveEvent(ctx, &eventpkg.EventRequest{Namespace: "my-ns", Payload: &wfv1.Item{}})
Expand All @@ -31,5 +33,4 @@ func TestController(t *testing.T) {
s.Run(stopCh)

assert.Len(t, s.operationQueue, 0, "all events were processed")

}
31 changes: 31 additions & 0 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test/e2e/fixtures"
"github.com/argoproj/argo/workflow/common"
Expand Down Expand Up @@ -276,6 +277,36 @@ spec:
})
}

func (s *ArgoServerSuite) TestEventOnMalformedWorkflowEventBinding() {
s.Given().
WorkflowEventBinding(`
metadata:
name: malformed
labels:
argo-e2e: true
`).
When().
CreateWorkflowEventBinding().
And(func() {
s.e().
POST("/api/v1/events/argo/").
WithBytes([]byte(`{}`)).
Expect().
Status(200)
}).
Then().
ExpectAuditEvents(
func(event corev1.Event) bool {
return event.InvolvedObject.Name=="malformed" && event.InvolvedObject.Kind == workflow.WorkflowEventBindingKind
},
func(t *testing.T, event corev1.Event) {
assert.Equal(t, "argo", event.InvolvedObject.Namespace)
assert.Equal(t, "WorkflowEventBindingError", event.Reason)
assert.Equal(t, "failed to dispatch event: failed to evaluate workflow template expression: unexpected token EOF (1:1)", event.Message)
},
)
}

func (s *ArgoServerSuite) TestGetUserInfo() {
s.e().GET("/api/v1/userinfo").
Expect().
Expand Down
5 changes: 2 additions & 3 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/hydrator"
Expand Down Expand Up @@ -90,7 +89,7 @@ func (t *Then) ExpectWorkflowList(listOptions metav1.ListOptions, block func(t *
return t
}

func (t *Then) ExpectAuditEvents(blocks ...func(*testing.T, apiv1.Event)) *Then {
func (t *Then) ExpectAuditEvents(filter func(event apiv1.Event) bool, blocks ...func(*testing.T, apiv1.Event)) *Then {
t.t.Helper()
eventList, err := t.kubeClient.CoreV1().Events(Namespace).Watch(metav1.ListOptions{})
if err != nil {
Expand All @@ -107,7 +106,7 @@ func (t *Then) ExpectAuditEvents(blocks ...func(*testing.T, apiv1.Event)) *Then
if !ok {
t.t.Fatal("event is not an event")
}
if e.InvolvedObject.Name == t.workflowName && e.Namespace == Namespace && e.InvolvedObject.Kind == workflow.WorkflowKind {
if filter(*e) {
blocks[0](t.t, *e)
blocks = blocks[1:]
if t.t.Failed() {
Expand Down
10 changes: 10 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test/e2e/fixtures"
)
Expand Down Expand Up @@ -190,6 +191,9 @@ func (s *FunctionalSuite) TestEventOnNodeFail() {
WaitForWorkflow(30*time.Second).
Then().
ExpectAuditEvents(
func(event corev1.Event) bool {
return strings.HasPrefix(event.InvolvedObject.Name, "failed-step-event-") && event.InvolvedObject.Kind == workflow.WorkflowKind
},
func(t *testing.T, e corev1.Event) {
assert.Equal(t, "WorkflowRunning", e.Reason)
},
Expand All @@ -215,6 +219,9 @@ func (s *FunctionalSuite) TestEventOnWorkflowSuccess() {
WaitForWorkflow(60*time.Second).
Then().
ExpectAuditEvents(
func(event corev1.Event) bool {
return strings.HasPrefix(event.InvolvedObject.Name, "success-event-") && event.InvolvedObject.Kind == workflow.WorkflowKind
},
func(t *testing.T, e corev1.Event) {
assert.Equal(t, "WorkflowRunning", e.Reason)
},
Expand All @@ -240,6 +247,9 @@ func (s *FunctionalSuite) TestEventOnPVCFail() {
WaitForWorkflow(120*time.Second).
Then().
ExpectAuditEvents(
func(event corev1.Event) bool {
return strings.HasPrefix(event.InvolvedObject.Name, "volumes-pvc-fail-event-") && event.InvolvedObject.Kind == workflow.WorkflowKind
},
func(t *testing.T, e corev1.Event) {
assert.Equal(t, "WorkflowRunning", e.Reason)
},
Expand Down
5 changes: 3 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
controllercache "github.com/argoproj/argo/workflow/controller/cache"
"github.com/argoproj/argo/workflow/controller/pod"
"github.com/argoproj/argo/workflow/cron"
"github.com/argoproj/argo/workflow/events"
"github.com/argoproj/argo/workflow/hydrator"
"github.com/argoproj/argo/workflow/metrics"
"github.com/argoproj/argo/workflow/sync"
Expand Down Expand Up @@ -85,7 +86,7 @@ type WorkflowController struct {
wfArchive sqldb.WorkflowArchive
syncManager *sync.SyncManager
metrics *metrics.Metrics
eventRecorderManager EventRecorderManager
eventRecorderManager events.EventRecorderManager
archiveLabelSelector labels.Selector
cacheFactory controllercache.CacheFactory
}
Expand All @@ -112,7 +113,7 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int
completedPods: make(chan string, 512),
gcPods: make(chan string, 512),
cacheFactory: controllercache.NewCacheFactory(kubeclientset, namespace),
eventRecorderManager: newEventRecorderManager(kubeclientset),
eventRecorderManager: events.NewEventRecorderManager(kubeclientset),
}

wfc.UpdateConfig()
Expand Down
3 changes: 2 additions & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
wfextv "github.com/argoproj/argo/pkg/client/informers/externalversions"
"github.com/argoproj/argo/test"
controllercache "github.com/argoproj/argo/workflow/controller/cache"
"github.com/argoproj/argo/workflow/events"
hydratorfake "github.com/argoproj/argo/workflow/hydrator/fake"
"github.com/argoproj/argo/workflow/metrics"
)
Expand Down Expand Up @@ -112,7 +113,7 @@ func (t testEventRecorderManager) Get(string) record.EventRecorder {
return t.eventRecorder
}

var _ EventRecorderManager = &testEventRecorderManager{}
var _ events.EventRecorderManager = &testEventRecorderManager{}

func newController(objects ...runtime.Object) (context.CancelFunc, *WorkflowController) {
wfclientset := fakewfclientset.NewSimpleClientset(objects...)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package controller
package events

import (
"sync"
Expand Down Expand Up @@ -36,7 +36,7 @@ func (m *eventRecorderManager) Get(namespace string) record.EventRecorder {

}

func newEventRecorderManager(kubernetes kubernetes.Interface) EventRecorderManager {
func NewEventRecorderManager(kubernetes kubernetes.Interface) EventRecorderManager {
return &eventRecorderManager{
kubernetes: kubernetes,
lock: sync.Mutex{},
Expand Down