diff --git a/pkg/apis/workflow/register.go b/pkg/apis/workflow/register.go index 066bf15938ed..b432dd755f51 100644 --- a/pkg/apis/workflow/register.go +++ b/pkg/apis/workflow/register.go @@ -23,38 +23,5 @@ const ( ClusterWorkflowTemplatePlural string = "clusterworkflowtemplates" ClusterWorkflowTemplateShortName string = "cwftmpl" ClusterWorkflowTemplateFullName string = ClusterWorkflowTemplatePlural + "." + Group + WorkflowEventBindingKind string = "WorkflowEventBinding" ) - -type CRD struct { - Kind, Singular, Plural, ShortName, FullName string -} - -var CRDs = []CRD{ - { - Kind: ClusterWorkflowTemplateKind, - Singular: ClusterWorkflowTemplateSingular, - Plural: ClusterWorkflowTemplatePlural, - ShortName: ClusterWorkflowTemplateShortName, - FullName: ClusterWorkflowTemplateFullName, - }, - { - Kind: CronWorkflowKind, - Singular: CronWorkflowSingular, - Plural: CronWorkflowPlural, - ShortName: CronWorkflowShortName, - FullName: CronWorkflowFullName, - }, - { - Kind: WorkflowKind, - Singular: WorkflowSingular, - Plural: WorkflowPlural, - ShortName: WorkflowShortName, - FullName: WorkflowFullName, - }, - { - Kind: WorkflowTemplateKind, - Singular: WorkflowTemplateSingular, - ShortName: WorkflowTemplateShortName, - FullName: WorkflowTemplateFullName, - }, -} diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index 2ef246776ddc..9d78de967b3e 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -46,6 +46,7 @@ import ( grpcutil "github.com/argoproj/argo/util/grpc" "github.com/argoproj/argo/util/instanceid" "github.com/argoproj/argo/util/json" + "github.com/argoproj/argo/workflow/events" "github.com/argoproj/argo/workflow/hydrator" ) @@ -156,8 +157,9 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st // disable the archiving - and still read old records wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService) } + eventRecorderManager := events.NewEventRecorderManager(as.kubeClientset) artifactServer := artifacts.NewArtifactServer(as.authenticator, hydrator.New(offloadRepo), wfArchive, instanceIDService) - eventServer := event.NewController(instanceIDService, as.eventQueueSize, as.eventWorkerCount) + eventServer := event.NewController(instanceIDService, eventRecorderManager, as.eventQueueSize, as.eventWorkerCount) grpcServer := as.newGRPCServer(instanceIDService, offloadRepo, wfArchive, eventServer, configMap.Links) httpServer := as.newHTTPServer(ctx, port, artifactServer) diff --git a/server/event/dispatch/operation.go b/server/event/dispatch/operation.go index 720fc2e4e689..7308514b4845 100644 --- a/server/event/dispatch/operation.go +++ b/server/event/dispatch/operation.go @@ -11,9 +11,11 @@ import ( "github.com/antonmedv/expr" log "github.com/sirupsen/logrus" "google.golang.org/grpc/metadata" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -26,18 +28,20 @@ import ( type Operation struct { ctx context.Context + eventRecorder record.EventRecorder instanceIDService instanceid.Service events []wfv1.WorkflowEventBinding env map[string]interface{} } -func NewOperation(ctx context.Context, instanceIDService instanceid.Service, events []wfv1.WorkflowEventBinding, namespace, discriminator string, payload *wfv1.Item) (*Operation, error) { +func NewOperation(ctx context.Context, instanceIDService instanceid.Service, eventRecorder record.EventRecorder, events []wfv1.WorkflowEventBinding, namespace, discriminator string, payload *wfv1.Item) (*Operation, error) { env, err := expressionEnvironment(ctx, namespace, discriminator, payload) if err != nil { return nil, fmt.Errorf("failed to create workflow template expression environment: %w", err) } return &Operation{ ctx: ctx, + eventRecorder: eventRecorder, instanceIDService: instanceIDService, events: events, env: env, @@ -60,6 +64,7 @@ func (o *Operation) Dispatch() { }) if err != nil { log.WithError(err).WithFields(log.Fields{"namespace": event.Namespace, "event": event.Name}).Error("failed to dispatch from event") + o.eventRecorder.Event(&event, corev1.EventTypeWarning, "WorkflowEventBindingError", "failed to dispatch event: "+err.Error()) } } } diff --git a/server/event/dispatch/operation_test.go b/server/event/dispatch/operation_test.go index b9ff76a60bdd..342b6ec725b0 100644 --- a/server/event/dispatch/operation_test.go +++ b/server/event/dispatch/operation_test.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc/metadata" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/tools/record" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/client/clientset/versioned/fake" @@ -45,9 +46,13 @@ func TestNewOperation(t *testing.T) { }, ) ctx := context.WithValue(context.WithValue(context.Background(), auth.WfKey, client), auth.ClaimSetKey, &jws.ClaimSet{Sub: "my-sub"}) + recorder := record.NewFakeRecorder(1) // act - operation, err := NewOperation(ctx, instanceid.NewService("my-instanceid"), []wfv1.WorkflowEventBinding{ + operation, err := NewOperation(ctx, instanceid.NewService("my-instanceid"), recorder, []wfv1.WorkflowEventBinding{ + { + ObjectMeta: metav1.ObjectMeta{Name: "malformed", Namespace: "my-ns"}, + }, { ObjectMeta: metav1.ObjectMeta{Name: "my-wfeb-1", Namespace: "my-ns"}, Spec: wfv1.WorkflowEventBindingSpec{ @@ -81,9 +86,9 @@ func TestNewOperation(t *testing.T) { assert.Contains(t, wf.Labels, common.LabelKeyWorkflowEventBinding) fromString := intstr.FromString(`foo`) assert.Equal(t, []wfv1.Parameter{{Name: "my-param", Value: &fromString}}, wf.Spec.Arguments.Parameters) - } } + assert.Equal(t, "Warning WorkflowEventBindingError failed to dispatch event: failed to evaluate workflow template expression: unexpected token EOF (1:1)", <-recorder.Events) } func Test_expressionEnvironment(t *testing.T) { diff --git a/server/event/event_server.go b/server/event/event_server.go index d1d59815d774..5adcdfd91089 100644 --- a/server/event/event_server.go +++ b/server/event/event_server.go @@ -12,10 +12,12 @@ import ( "github.com/argoproj/argo/server/auth" "github.com/argoproj/argo/server/event/dispatch" "github.com/argoproj/argo/util/instanceid" + "github.com/argoproj/argo/workflow/events" ) type Controller struct { - instanceIDService instanceid.Service + instanceIDService instanceid.Service + eventRecorderManager events.EventRecorderManager // a channel for operations to be executed async on operationQueue chan dispatch.Operation workerCount int @@ -23,11 +25,12 @@ type Controller struct { var _ eventpkg.EventServiceServer = &Controller{} -func NewController(instanceIDService instanceid.Service, operationQueueSize, workerCount int) *Controller { +func NewController(instanceIDService instanceid.Service, eventRecorderManager events.EventRecorderManager, operationQueueSize, workerCount int) *Controller { log.WithFields(log.Fields{"workerCount": workerCount, "operationQueueSize": operationQueueSize}).Info("Creating event controller") return &Controller{ - instanceIDService: instanceIDService, + instanceIDService: instanceIDService, + eventRecorderManager: eventRecorderManager, // so we can have `operationQueueSize` operations outstanding before we start putting back pressure on the senders operationQueue: make(chan dispatch.Operation, operationQueueSize), workerCount: workerCount, @@ -70,7 +73,7 @@ func (s *Controller) ReceiveEvent(ctx context.Context, req *eventpkg.EventReques return nil, err } - operation, err := dispatch.NewOperation(ctx, s.instanceIDService, list.Items, req.Namespace, req.Discriminator, req.Payload) + operation, err := dispatch.NewOperation(ctx, s.instanceIDService, s.eventRecorderManager.Get(req.Namespace), list.Items, req.Namespace, req.Discriminator, req.Payload) if err != nil { return nil, err } diff --git a/server/event/event_server_test.go b/server/event/event_server_test.go index 41de46fdac52..42dfebb85e31 100644 --- a/server/event/event_server_test.go +++ b/server/event/event_server_test.go @@ -5,17 +5,19 @@ import ( "github.com/stretchr/testify/assert" "golang.org/x/net/context" + fakekube "k8s.io/client-go/kubernetes/fake" eventpkg "github.com/argoproj/argo/pkg/apiclient/event" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo/server/auth" "github.com/argoproj/argo/util/instanceid" + "github.com/argoproj/argo/workflow/events" ) func TestController(t *testing.T) { clientset := fake.NewSimpleClientset() - s := NewController(instanceid.NewService("my-instanceid"), 1, 1) + s := NewController(instanceid.NewService("my-instanceid"), events.NewEventRecorderManager(fakekube.NewSimpleClientset()), 1, 1) ctx := context.WithValue(context.TODO(), auth.WfKey, clientset) _, err := s.ReceiveEvent(ctx, &eventpkg.EventRequest{Namespace: "my-ns", Payload: &wfv1.Item{}}) @@ -31,5 +33,4 @@ func TestController(t *testing.T) { s.Run(stopCh) assert.Len(t, s.operationQueue, 0, "all events were processed") - } diff --git a/test/e2e/argo_server_test.go b/test/e2e/argo_server_test.go index 2d720fdeae38..d3f23e53e93d 100644 --- a/test/e2e/argo_server_test.go +++ b/test/e2e/argo_server_test.go @@ -20,6 +20,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "github.com/argoproj/argo/pkg/apis/workflow" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/test/e2e/fixtures" "github.com/argoproj/argo/workflow/common" @@ -276,6 +277,36 @@ spec: }) } +func (s *ArgoServerSuite) TestEventOnMalformedWorkflowEventBinding() { + s.Given(). + WorkflowEventBinding(` +metadata: + name: malformed + labels: + argo-e2e: true +`). + When(). + CreateWorkflowEventBinding(). + And(func() { + s.e(). + POST("/api/v1/events/argo/"). + WithBytes([]byte(`{}`)). + Expect(). + Status(200) + }). + Then(). + ExpectAuditEvents( + func(event corev1.Event) bool { + return event.InvolvedObject.Name=="malformed" && event.InvolvedObject.Kind == workflow.WorkflowEventBindingKind + }, + func(t *testing.T, event corev1.Event) { + assert.Equal(t, "argo", event.InvolvedObject.Namespace) + assert.Equal(t, "WorkflowEventBindingError", event.Reason) + assert.Equal(t, "failed to dispatch event: failed to evaluate workflow template expression: unexpected token EOF (1:1)", event.Message) + }, + ) +} + func (s *ArgoServerSuite) TestGetUserInfo() { s.e().GET("/api/v1/userinfo"). Expect(). diff --git a/test/e2e/fixtures/then.go b/test/e2e/fixtures/then.go index 001c184c262f..8cd85737f3ac 100644 --- a/test/e2e/fixtures/then.go +++ b/test/e2e/fixtures/then.go @@ -9,7 +9,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "github.com/argoproj/argo/pkg/apis/workflow" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/argoproj/argo/workflow/hydrator" @@ -90,7 +89,7 @@ func (t *Then) ExpectWorkflowList(listOptions metav1.ListOptions, block func(t * return t } -func (t *Then) ExpectAuditEvents(blocks ...func(*testing.T, apiv1.Event)) *Then { +func (t *Then) ExpectAuditEvents(filter func(event apiv1.Event) bool, blocks ...func(*testing.T, apiv1.Event)) *Then { t.t.Helper() eventList, err := t.kubeClient.CoreV1().Events(Namespace).Watch(metav1.ListOptions{}) if err != nil { @@ -107,7 +106,7 @@ func (t *Then) ExpectAuditEvents(blocks ...func(*testing.T, apiv1.Event)) *Then if !ok { t.t.Fatal("event is not an event") } - if e.InvolvedObject.Name == t.workflowName && e.Namespace == Namespace && e.InvolvedObject.Kind == workflow.WorkflowKind { + if filter(*e) { blocks[0](t.t, *e) blocks = blocks[1:] if t.t.Failed() { diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index e04de5a21394..ad1527bad3ac 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -13,6 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/argoproj/argo/pkg/apis/workflow" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/test/e2e/fixtures" ) @@ -190,6 +191,9 @@ func (s *FunctionalSuite) TestEventOnNodeFail() { WaitForWorkflow(30*time.Second). Then(). ExpectAuditEvents( + func(event corev1.Event) bool { + return strings.HasPrefix(event.InvolvedObject.Name, "failed-step-event-") && event.InvolvedObject.Kind == workflow.WorkflowKind + }, func(t *testing.T, e corev1.Event) { assert.Equal(t, "WorkflowRunning", e.Reason) }, @@ -215,6 +219,9 @@ func (s *FunctionalSuite) TestEventOnWorkflowSuccess() { WaitForWorkflow(60*time.Second). Then(). ExpectAuditEvents( + func(event corev1.Event) bool { + return strings.HasPrefix(event.InvolvedObject.Name, "success-event-") && event.InvolvedObject.Kind == workflow.WorkflowKind + }, func(t *testing.T, e corev1.Event) { assert.Equal(t, "WorkflowRunning", e.Reason) }, @@ -240,6 +247,9 @@ func (s *FunctionalSuite) TestEventOnPVCFail() { WaitForWorkflow(120*time.Second). Then(). ExpectAuditEvents( + func(event corev1.Event) bool { + return strings.HasPrefix(event.InvolvedObject.Name, "volumes-pvc-fail-event-") && event.InvolvedObject.Kind == workflow.WorkflowKind + }, func(t *testing.T, e corev1.Event) { assert.Equal(t, "WorkflowRunning", e.Reason) }, diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 0aec779a7fc0..2329e3fe6243 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -40,6 +40,7 @@ import ( controllercache "github.com/argoproj/argo/workflow/controller/cache" "github.com/argoproj/argo/workflow/controller/pod" "github.com/argoproj/argo/workflow/cron" + "github.com/argoproj/argo/workflow/events" "github.com/argoproj/argo/workflow/hydrator" "github.com/argoproj/argo/workflow/metrics" "github.com/argoproj/argo/workflow/sync" @@ -85,7 +86,7 @@ type WorkflowController struct { wfArchive sqldb.WorkflowArchive syncManager *sync.SyncManager metrics *metrics.Metrics - eventRecorderManager EventRecorderManager + eventRecorderManager events.EventRecorderManager archiveLabelSelector labels.Selector cacheFactory controllercache.CacheFactory } @@ -112,7 +113,7 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int completedPods: make(chan string, 512), gcPods: make(chan string, 512), cacheFactory: controllercache.NewCacheFactory(kubeclientset, namespace), - eventRecorderManager: newEventRecorderManager(kubeclientset), + eventRecorderManager: events.NewEventRecorderManager(kubeclientset), } wfc.UpdateConfig() diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 4bb31d323960..61f192e432d4 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -27,6 +27,7 @@ import ( wfextv "github.com/argoproj/argo/pkg/client/informers/externalversions" "github.com/argoproj/argo/test" controllercache "github.com/argoproj/argo/workflow/controller/cache" + "github.com/argoproj/argo/workflow/events" hydratorfake "github.com/argoproj/argo/workflow/hydrator/fake" "github.com/argoproj/argo/workflow/metrics" ) @@ -112,7 +113,7 @@ func (t testEventRecorderManager) Get(string) record.EventRecorder { return t.eventRecorder } -var _ EventRecorderManager = &testEventRecorderManager{} +var _ events.EventRecorderManager = &testEventRecorderManager{} func newController(objects ...runtime.Object) (context.CancelFunc, *WorkflowController) { wfclientset := fakewfclientset.NewSimpleClientset(objects...) diff --git a/workflow/controller/event_recorder_manager.go b/workflow/events/event_recorder_manager.go similarity index 93% rename from workflow/controller/event_recorder_manager.go rename to workflow/events/event_recorder_manager.go index 4988bb093f13..5e6ed30c0805 100644 --- a/workflow/controller/event_recorder_manager.go +++ b/workflow/events/event_recorder_manager.go @@ -1,4 +1,4 @@ -package controller +package events import ( "sync" @@ -36,7 +36,7 @@ func (m *eventRecorderManager) Get(namespace string) record.EventRecorder { } -func newEventRecorderManager(kubernetes kubernetes.Interface) EventRecorderManager { +func NewEventRecorderManager(kubernetes kubernetes.Interface) EventRecorderManager { return &eventRecorderManager{ kubernetes: kubernetes, lock: sync.Mutex{},