From 44b6abf82b3f5a7d71d3a807f805b1d9b6685696 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Fri, 14 Aug 2020 11:26:03 -0700 Subject: [PATCH 1/8] fix(controller): Failure tolerant workflow archiving. Fixes #3786 --- workflow/common/common.go | 3 + workflow/controller/archive/handler.go | 83 ++++++++++++++++++++++++++ workflow/controller/controller.go | 2 + workflow/controller/operator.go | 8 +-- 4 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 workflow/controller/archive/handler.go diff --git a/workflow/common/common.go b/workflow/common/common.go index 1d5e927726be..370ac1477829 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -69,6 +69,9 @@ const ( LabelKeyWorkflowEventBinding = workflow.WorkflowFullName + "/workflow-event-binding" // LabelKeyWorkflowTemplate is a label applied to Workflows that are submitted from ClusterWorkflowtemplate LabelKeyClusterWorkflowTemplate = workflow.WorkflowFullName + "/cluster-workflow-template" + // LabelKeyArchiveStatus is a label applied to workflows to indicate its status + // Pending/Error/Succeeded + LabelKeyArchiveStatus = workflow.WorkflowFullName + "/archive-status" // LabelKeyOnExit is a label applied to Pods that are run from onExit nodes, so that they are not shut down when stopping a Workflow LabelKeyOnExit = workflow.WorkflowFullName + "/on-exit" diff --git a/workflow/controller/archive/handler.go b/workflow/controller/archive/handler.go new file mode 100644 index 000000000000..7e9da3426c85 --- /dev/null +++ b/workflow/controller/archive/handler.go @@ -0,0 +1,83 @@ +package archive + +import ( + "encoding/json" + + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + + "github.com/argoproj/argo/persist/sqldb" + "github.com/argoproj/argo/pkg/client/clientset/versioned" + "github.com/argoproj/argo/workflow/common" + "github.com/argoproj/argo/workflow/hydrator" + "github.com/argoproj/argo/workflow/util" +) + +type handler struct { + hydrator hydrator.Interface + workflowArchive sqldb.WorkflowArchive + workflowInterface versioned.Interface +} + +func (h handler) OnAdd(obj interface{}) { + h.archive(obj) +} + +func (h handler) OnUpdate(_, newObj interface{}) { + h.archive(newObj) +} + +func (h handler) OnDelete(obj interface{}) { + h.archive(obj) +} + +func (h handler) archive(obj interface{}) { + un, ok := obj.(*unstructured.Unstructured) + if ok && un.GetLabels()[common.LabelKeyArchiveStatus] == "Pending" { + logCtx := log.WithFields(log.Fields{"namespace": un.GetNamespace(), "name": un.GetName()}) + archiveStatus := "Error" + wf, err := util.FromUnstructured(un) + if err != nil { + logCtx.WithError(err).Error("Failed to convert from unstructured to workflow") + } else { + err := h.hydrator.Hydrate(wf) + if err != nil { + logCtx.WithError(err).Error("Failed to hydrate workflow") + } else { + err := h.workflowArchive.ArchiveWorkflow(wf) + if err != nil { + logCtx.WithError(err).Error("Failed to archive workflow") + } else { + archiveStatus = "Succeeded" + } + } + } + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]string{ + common.LabelKeyArchiveStatus: archiveStatus, + }, + }, + }) + logCtx.WithField("archiveStatus", archiveStatus).Info("Updating workflow archive status") + _, err = h.workflowInterface.ArgoprojV1alpha1().Workflows(un.GetNamespace()).Patch(un.GetName(), types.MergePatchType, patch) + if err != nil { + logCtx.WithError(err).Error("Failed to mark workflow as archived") + } + + } +} + +func NewHandler(hydrator hydrator.Interface, workflowArchive sqldb.WorkflowArchive, workflowInterface versioned.Interface) cache.FilteringResourceEventHandler { + return cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + un, ok := obj.(*unstructured.Unstructured) + return ok && un.GetLabels()[common.LabelKeyArchiveStatus] != "" + }, + Handler: &handler{hydrator, workflowArchive, workflowInterface}, + } +} + +var _ cache.ResourceEventHandler = &handler{} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 34a00ebec64c..57bddb290a56 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -37,6 +37,7 @@ import ( wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1" authutil "github.com/argoproj/argo/util/auth" "github.com/argoproj/argo/workflow/common" + "github.com/argoproj/argo/workflow/controller/archive" controllercache "github.com/argoproj/argo/workflow/controller/cache" "github.com/argoproj/argo/workflow/controller/informer" "github.com/argoproj/argo/workflow/controller/pod" @@ -697,6 +698,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() { } }, }) + wfc.wfInformer.AddEventHandler(archive.NewHandler(wfc.hydrator, wfc.wfArchive, wfc.wfclientset)) } func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 519eed3afbd2..82c80107f806 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1726,12 +1726,10 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted } if woc.controller.isArchivable(woc.wf) { - err = woc.controller.wfArchive.ArchiveWorkflow(woc.wf) - if err != nil { - woc.log.WithField("err", err).Error("Failed to archive workflow") - } + woc.wf.Labels[common.LabelKeyArchiveStatus] = "Pending" + woc.log.Infof("Labelled workflow as pending archiving") } else { - woc.log.Infof("Does't match with archive label selector. Skipping Archive") + woc.log.Infof("Doesn't match with archive label selector. Skipping Archive") } woc.updated = true } From afcf09e133218a72003c0703445d8e9b558e635a Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 20 Aug 2020 17:41:46 -0700 Subject: [PATCH 2/8] fix-3786 --- workflow/hydrator/hydrator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/workflow/hydrator/hydrator.go b/workflow/hydrator/hydrator.go index 5772d3c50f96..bc29da3921f4 100644 --- a/workflow/hydrator/hydrator.go +++ b/workflow/hydrator/hydrator.go @@ -4,6 +4,7 @@ import ( "os" log "github.com/sirupsen/logrus" + "k8s.io/client-go/util/retry" "github.com/argoproj/argo/persist/sqldb" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -51,6 +52,7 @@ func (h hydrator) Hydrate(wf *wfv1.Workflow) error { return err } if wf.Status.IsOffloadNodeStatus() { + retry.OnError() offloadedNodes, err := h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion()) if err != nil { return err From 1d862b253d23cbf84a3bb49810825e090cef5815 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 20 Aug 2020 18:04:41 -0700 Subject: [PATCH 3/8] fix-3786 --- Makefile | 4 +- workflow/common/common.go | 2 +- workflow/controller/archive/handler.go | 76 ++++++++++++++------------ workflow/hydrator/hydrator.go | 14 ++++- 4 files changed, 55 insertions(+), 41 deletions(-) diff --git a/Makefile b/Makefile index 6d8ed79639a9..9ff8ebada1fc 100644 --- a/Makefile +++ b/Makefile @@ -283,11 +283,9 @@ proto: $(GOPATH)/bin/go-to-protobuf $(GOPATH)/bin/protoc-gen-gogo $(GOPATH)/bin/ ./hack/generate-proto.sh ./hack/update-codegen.sh -dist/install_kustomize.sh: +/usr/local/bin/kustomize: mkdir -p dist ./hack/recurl.sh dist/install_kustomize.sh https://raw.githubusercontent.com/kubernetes-sigs/kustomize/master/hack/install_kustomize.sh - -/usr/local/bin/kustomize: dist/install_kustomize.sh chmod +x ./dist/install_kustomize.sh ./dist/install_kustomize.sh sudo mv kustomize /usr/local/bin/ diff --git a/workflow/common/common.go b/workflow/common/common.go index 370ac1477829..f8308fadac68 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -70,7 +70,7 @@ const ( // LabelKeyWorkflowTemplate is a label applied to Workflows that are submitted from ClusterWorkflowtemplate LabelKeyClusterWorkflowTemplate = workflow.WorkflowFullName + "/cluster-workflow-template" // LabelKeyArchiveStatus is a label applied to workflows to indicate its status - // Pending/Error/Succeeded + // Pending/Succeeded LabelKeyArchiveStatus = workflow.WorkflowFullName + "/archive-status" // LabelKeyOnExit is a label applied to Pods that are run from onExit nodes, so that they are not shut down when stopping a Workflow LabelKeyOnExit = workflow.WorkflowFullName + "/on-exit" diff --git a/workflow/controller/archive/handler.go b/workflow/controller/archive/handler.go index 7e9da3426c85..0ce5333d69dc 100644 --- a/workflow/controller/archive/handler.go +++ b/workflow/controller/archive/handler.go @@ -6,7 +6,9 @@ import ( log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" "github.com/argoproj/argo/persist/sqldb" "github.com/argoproj/argo/pkg/client/clientset/versioned" @@ -15,6 +17,10 @@ import ( "github.com/argoproj/argo/workflow/util" ) +// This handle will make many attempts to archive a workflow if there is a problem. +// Firstly, when it is marked as pending. +// Secondly, when it is re-synced. +// Finally, when it is deleted. type handler struct { hydrator hydrator.Interface workflowArchive sqldb.WorkflowArchive @@ -25,8 +31,8 @@ func (h handler) OnAdd(obj interface{}) { h.archive(obj) } -func (h handler) OnUpdate(_, newObj interface{}) { - h.archive(newObj) +func (h handler) OnUpdate(_, obj interface{}) { + h.archive(obj) } func (h handler) OnDelete(obj interface{}) { @@ -35,38 +41,40 @@ func (h handler) OnDelete(obj interface{}) { func (h handler) archive(obj interface{}) { un, ok := obj.(*unstructured.Unstructured) - if ok && un.GetLabels()[common.LabelKeyArchiveStatus] == "Pending" { - logCtx := log.WithFields(log.Fields{"namespace": un.GetNamespace(), "name": un.GetName()}) - archiveStatus := "Error" - wf, err := util.FromUnstructured(un) - if err != nil { - logCtx.WithError(err).Error("Failed to convert from unstructured to workflow") - } else { - err := h.hydrator.Hydrate(wf) - if err != nil { - logCtx.WithError(err).Error("Failed to hydrate workflow") - } else { - err := h.workflowArchive.ArchiveWorkflow(wf) - if err != nil { - logCtx.WithError(err).Error("Failed to archive workflow") - } else { - archiveStatus = "Succeeded" - } - } - } - patch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]string{ - common.LabelKeyArchiveStatus: archiveStatus, - }, + if !ok && un.GetLabels()[common.LabelKeyArchiveStatus] != "Pending" { + return + } + logCtx := log.WithFields(log.Fields{"namespace": un.GetNamespace(), "name": un.GetName()}) + wf, err := util.FromUnstructured(un) + if err != nil { + logCtx.WithError(err).Error("failed to convert from unstructured to workflow") + return + } + err = h.hydrator.Hydrate(wf) + if err != nil { + logCtx.WithError(err).Error("failed to hydrate workflow") + return + } + err = wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) { + err := h.workflowArchive.ArchiveWorkflow(wf) + return err == nil, err + }) + if err != nil { + logCtx.WithError(err).Error("failed to archive workflow") + return + } + patch, _ := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]string{ + common.LabelKeyArchiveStatus: "Succeeded", }, - }) - logCtx.WithField("archiveStatus", archiveStatus).Info("Updating workflow archive status") - _, err = h.workflowInterface.ArgoprojV1alpha1().Workflows(un.GetNamespace()).Patch(un.GetName(), types.MergePatchType, patch) - if err != nil { - logCtx.WithError(err).Error("Failed to mark workflow as archived") - } - + }, + }) + logCtx.Info("Marking workflow archiving as succeeded") + _, err = h.workflowInterface.ArgoprojV1alpha1().Workflows(un.GetNamespace()).Patch(un.GetName(), types.MergePatchType, patch) + if err != nil { + logCtx.WithError(err).Error("failed to mark workflow as archived") + return } } @@ -74,7 +82,7 @@ func NewHandler(hydrator hydrator.Interface, workflowArchive sqldb.WorkflowArchi return cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { un, ok := obj.(*unstructured.Unstructured) - return ok && un.GetLabels()[common.LabelKeyArchiveStatus] != "" + return ok && un.GetLabels()[common.LabelKeyArchiveStatus] == "Pending" }, Handler: &handler{hydrator, workflowArchive, workflowInterface}, } diff --git a/workflow/hydrator/hydrator.go b/workflow/hydrator/hydrator.go index bc29da3921f4..74554a543b64 100644 --- a/workflow/hydrator/hydrator.go +++ b/workflow/hydrator/hydrator.go @@ -4,6 +4,7 @@ import ( "os" log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" "github.com/argoproj/argo/persist/sqldb" @@ -52,8 +53,11 @@ func (h hydrator) Hydrate(wf *wfv1.Workflow) error { return err } if wf.Status.IsOffloadNodeStatus() { - retry.OnError() - offloadedNodes, err := h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion()) + var offloadedNodes wfv1.Nodes + err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) { + offloadedNodes, err = h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion()) + return err == nil, err + }) if err != nil { return err } @@ -75,7 +79,11 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error { } } if packer.IsTooLargeError(err) || alwaysOffloadNodeStatus { - offloadVersion, err := h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes) + var offloadVersion string + err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) { + offloadVersion, err = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes) + return err == nil, err + }) if err != nil { return err } From df8f54bca4705a4b2501324938cd37bc71c9e615 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 20 Aug 2020 18:43:42 -0700 Subject: [PATCH 4/8] fix-3786 --- test/e2e/argo_server_test.go | 23 ++++++++--------------- test/e2e/cli_with_server_test.go | 1 + test/e2e/fixtures/when.go | 14 +++++++++++--- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/test/e2e/argo_server_test.go b/test/e2e/argo_server_test.go index 9ac8e5fa19a6..9dd55539cea8 100644 --- a/test/e2e/argo_server_test.go +++ b/test/e2e/argo_server_test.go @@ -451,9 +451,7 @@ func (s *ArgoServerSuite) TestPermission() { { "name": "run-workflow", "container": { - "image": "argoproj/argosay:v2", - "command": ["sh"], - "args": ["-c", "sleep 1"] + "image": "argoproj/argosay:v2" } } ], @@ -499,10 +497,7 @@ func (s *ArgoServerSuite) TestPermission() { { "name": "run-workflow", "container": { - "image": "argoproj/argosay:v2", - "imagePullPolicy": "IfNotPresent", - "command": ["sh"], - "args": ["-c", "sleep 1"] + "image": "argoproj/argosay:v2" } } ], @@ -528,7 +523,8 @@ func (s *ArgoServerSuite) TestPermission() { s.Given(). WorkflowName("test-wf-good"). When(). - WaitForWorkflow(30 * time.Second) + WaitForWorkflow(30 * time.Second). + WaitForWorkflowToBeArchived(5 * time.Second) // Test delete workflow with bad token s.bearerToken = badToken @@ -1095,9 +1091,7 @@ spec: templates: - name: run-archie container: - image: argoproj/argosay:v2 - command: [cowsay, ":) Hello Argo!"] - imagePullPolicy: IfNotPresent`). + image: argoproj/argosay:v2`). When(). SubmitWorkflow(). WaitForWorkflow(20 * time.Second). @@ -1117,12 +1111,11 @@ spec: templates: - name: run-betty container: - image: argoproj/argosay:v2 - command: [cowsay, ":) Hello Argo!"] - imagePullPolicy: IfNotPresent`). + image: argoproj/argosay:v2`). When(). SubmitWorkflow(). - WaitForWorkflow(20 * time.Second) + WaitForWorkflow(20 * time.Second). + WaitForWorkflowToBeArchived(5 * time.Second) for _, tt := range []struct { name string diff --git a/test/e2e/cli_with_server_test.go b/test/e2e/cli_with_server_test.go index 1323e1a028a9..b046486e257e 100644 --- a/test/e2e/cli_with_server_test.go +++ b/test/e2e/cli_with_server_test.go @@ -107,6 +107,7 @@ func (s *CLIWithServerSuite) TestArchive() { When(). SubmitWorkflow(). WaitForWorkflow(30 * time.Second). + WaitForWorkflowToBeArchived(5*time.Second). Then(). ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { uid = metadata.UID diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index 76b82515e4df..ef0d49fd39af 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -12,6 +12,7 @@ import ( wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" + "github.com/argoproj/argo/workflow/common" "github.com/argoproj/argo/workflow/hydrator" ) @@ -189,10 +190,17 @@ func (w *When) WaitForWorkflowName(workflowName string, timeout time.Duration) * }, "to finish", timeout) } -func (w *When) Wait(timeout time.Duration) *When { +func (w *When) WaitForWorkflowToBeArchived(duration time.Duration) *When { w.t.Helper() - log.Infof("Waiting for %v", timeout) - time.Sleep(timeout) + return w.WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool { + return wf.Labels[common.LabelKeyArchiveStatus] == "Succeeded" + }, "workflow is archived", duration) +} + +func (w *When) Wait(duration time.Duration) *When { + w.t.Helper() + log.Infof("Waiting for %v", duration) + time.Sleep(duration) log.Infof("Done waiting") return w } From cdcd125e6e1f56804c542963b62a0fc73a934f2e Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 20 Aug 2020 18:59:55 -0700 Subject: [PATCH 5/8] fix-3786 --- test/e2e/argo_server_test.go | 1 + test/e2e/cli_with_server_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/test/e2e/argo_server_test.go b/test/e2e/argo_server_test.go index 9dd55539cea8..4a5611573b44 100644 --- a/test/e2e/argo_server_test.go +++ b/test/e2e/argo_server_test.go @@ -961,6 +961,7 @@ func (s *ArgoServerSuite) TestArtifactServer() { When(). SubmitWorkflow(). WaitForWorkflow(20 * time.Second). + WaitForWorkflowToBeArchived(5 * time.Second). Then(). ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { uid = metadata.UID diff --git a/test/e2e/cli_with_server_test.go b/test/e2e/cli_with_server_test.go index b046486e257e..a50c646b9f53 100644 --- a/test/e2e/cli_with_server_test.go +++ b/test/e2e/cli_with_server_test.go @@ -107,7 +107,7 @@ func (s *CLIWithServerSuite) TestArchive() { When(). SubmitWorkflow(). WaitForWorkflow(30 * time.Second). - WaitForWorkflowToBeArchived(5*time.Second). + WaitForWorkflowToBeArchived(5 * time.Second). Then(). ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { uid = metadata.UID From a9cab9f35d6729ab87dc334e375a09f8e80481fd Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 20 Aug 2020 19:51:32 -0700 Subject: [PATCH 6/8] fix-3786 --- workflow/controller/archive/handler.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/workflow/controller/archive/handler.go b/workflow/controller/archive/handler.go index 0ce5333d69dc..8637411ecab1 100644 --- a/workflow/controller/archive/handler.go +++ b/workflow/controller/archive/handler.go @@ -41,7 +41,10 @@ func (h handler) OnDelete(obj interface{}) { func (h handler) archive(obj interface{}) { un, ok := obj.(*unstructured.Unstructured) - if !ok && un.GetLabels()[common.LabelKeyArchiveStatus] != "Pending" { + if !ok { + return + } + if un.GetLabels()[common.LabelKeyArchiveStatus] != "Pending" { return } logCtx := log.WithFields(log.Fields{"namespace": un.GetNamespace(), "name": un.GetName()}) From 5b7092197d14430d9cc79dd9fde3cac3dec1f650 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Fri, 21 Aug 2020 10:16:21 -0700 Subject: [PATCH 7/8] fix-3786 --- test/e2e/argo_server_test.go | 16 +++++++++------- test/e2e/cli_with_server_test.go | 1 - test/e2e/fixtures/when.go | 14 +++----------- workflow/common/common.go | 3 --- workflow/controller/controller.go | 2 -- workflow/controller/operator.go | 9 +++++++-- 6 files changed, 19 insertions(+), 26 deletions(-) diff --git a/test/e2e/argo_server_test.go b/test/e2e/argo_server_test.go index 4a5611573b44..9d8cb1e1feaa 100644 --- a/test/e2e/argo_server_test.go +++ b/test/e2e/argo_server_test.go @@ -451,7 +451,9 @@ func (s *ArgoServerSuite) TestPermission() { { "name": "run-workflow", "container": { - "image": "argoproj/argosay:v2" + "image": "argoproj/argosay:v2", + "command": ["sh"], + "args": ["-c", "sleep 1"] } } ], @@ -497,7 +499,10 @@ func (s *ArgoServerSuite) TestPermission() { { "name": "run-workflow", "container": { - "image": "argoproj/argosay:v2" + "image": "argoproj/argosay:v2", + "imagePullPolicy": "IfNotPresent", + "command": ["sh"], + "args": ["-c", "sleep 1"] } } ], @@ -523,8 +528,7 @@ func (s *ArgoServerSuite) TestPermission() { s.Given(). WorkflowName("test-wf-good"). When(). - WaitForWorkflow(30 * time.Second). - WaitForWorkflowToBeArchived(5 * time.Second) + WaitForWorkflow(30 * time.Second) // Test delete workflow with bad token s.bearerToken = badToken @@ -961,7 +965,6 @@ func (s *ArgoServerSuite) TestArtifactServer() { When(). SubmitWorkflow(). WaitForWorkflow(20 * time.Second). - WaitForWorkflowToBeArchived(5 * time.Second). Then(). ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { uid = metadata.UID @@ -1115,8 +1118,7 @@ spec: image: argoproj/argosay:v2`). When(). SubmitWorkflow(). - WaitForWorkflow(20 * time.Second). - WaitForWorkflowToBeArchived(5 * time.Second) + WaitForWorkflow(20 * time.Second) for _, tt := range []struct { name string diff --git a/test/e2e/cli_with_server_test.go b/test/e2e/cli_with_server_test.go index a50c646b9f53..1323e1a028a9 100644 --- a/test/e2e/cli_with_server_test.go +++ b/test/e2e/cli_with_server_test.go @@ -107,7 +107,6 @@ func (s *CLIWithServerSuite) TestArchive() { When(). SubmitWorkflow(). WaitForWorkflow(30 * time.Second). - WaitForWorkflowToBeArchived(5 * time.Second). Then(). ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { uid = metadata.UID diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index ef0d49fd39af..76b82515e4df 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -12,7 +12,6 @@ import ( wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" - "github.com/argoproj/argo/workflow/common" "github.com/argoproj/argo/workflow/hydrator" ) @@ -190,17 +189,10 @@ func (w *When) WaitForWorkflowName(workflowName string, timeout time.Duration) * }, "to finish", timeout) } -func (w *When) WaitForWorkflowToBeArchived(duration time.Duration) *When { +func (w *When) Wait(timeout time.Duration) *When { w.t.Helper() - return w.WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool { - return wf.Labels[common.LabelKeyArchiveStatus] == "Succeeded" - }, "workflow is archived", duration) -} - -func (w *When) Wait(duration time.Duration) *When { - w.t.Helper() - log.Infof("Waiting for %v", duration) - time.Sleep(duration) + log.Infof("Waiting for %v", timeout) + time.Sleep(timeout) log.Infof("Done waiting") return w } diff --git a/workflow/common/common.go b/workflow/common/common.go index f8308fadac68..1d5e927726be 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -69,9 +69,6 @@ const ( LabelKeyWorkflowEventBinding = workflow.WorkflowFullName + "/workflow-event-binding" // LabelKeyWorkflowTemplate is a label applied to Workflows that are submitted from ClusterWorkflowtemplate LabelKeyClusterWorkflowTemplate = workflow.WorkflowFullName + "/cluster-workflow-template" - // LabelKeyArchiveStatus is a label applied to workflows to indicate its status - // Pending/Succeeded - LabelKeyArchiveStatus = workflow.WorkflowFullName + "/archive-status" // LabelKeyOnExit is a label applied to Pods that are run from onExit nodes, so that they are not shut down when stopping a Workflow LabelKeyOnExit = workflow.WorkflowFullName + "/on-exit" diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 1d9b900b7c11..1033f465c7e2 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -37,7 +37,6 @@ import ( wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1" authutil "github.com/argoproj/argo/util/auth" "github.com/argoproj/argo/workflow/common" - "github.com/argoproj/argo/workflow/controller/archive" controllercache "github.com/argoproj/argo/workflow/controller/cache" "github.com/argoproj/argo/workflow/controller/informer" "github.com/argoproj/argo/workflow/controller/pod" @@ -688,7 +687,6 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() { } }, }) - wfc.wfInformer.AddEventHandler(archive.NewHandler(wfc.hydrator, wfc.wfArchive, wfc.wfclientset)) } func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index fa1acc64cc8e..1fc233b61513 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1730,8 +1730,13 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted } if woc.controller.isArchivable(woc.wf) { - woc.wf.Labels[common.LabelKeyArchiveStatus] = "Pending" - woc.log.Infof("Labelled workflow as pending archiving") + err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { + err := woc.controller.wfArchive.ArchiveWorkflow(woc.wf) + return err == nil, err + }) + if err != nil { + woc.log.WithField("err", err).Error("Failed to archive workflow") + } } else { woc.log.Infof("Doesn't match with archive label selector. Skipping Archive") } From bf15e5a09f90073dbb4eb4c6c3225eb777c1fd54 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Fri, 21 Aug 2020 11:22:40 -0700 Subject: [PATCH 8/8] deleted handler.go --- workflow/controller/archive/handler.go | 94 -------------------------- 1 file changed, 94 deletions(-) delete mode 100644 workflow/controller/archive/handler.go diff --git a/workflow/controller/archive/handler.go b/workflow/controller/archive/handler.go deleted file mode 100644 index 8637411ecab1..000000000000 --- a/workflow/controller/archive/handler.go +++ /dev/null @@ -1,94 +0,0 @@ -package archive - -import ( - "encoding/json" - - log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/retry" - - "github.com/argoproj/argo/persist/sqldb" - "github.com/argoproj/argo/pkg/client/clientset/versioned" - "github.com/argoproj/argo/workflow/common" - "github.com/argoproj/argo/workflow/hydrator" - "github.com/argoproj/argo/workflow/util" -) - -// This handle will make many attempts to archive a workflow if there is a problem. -// Firstly, when it is marked as pending. -// Secondly, when it is re-synced. -// Finally, when it is deleted. -type handler struct { - hydrator hydrator.Interface - workflowArchive sqldb.WorkflowArchive - workflowInterface versioned.Interface -} - -func (h handler) OnAdd(obj interface{}) { - h.archive(obj) -} - -func (h handler) OnUpdate(_, obj interface{}) { - h.archive(obj) -} - -func (h handler) OnDelete(obj interface{}) { - h.archive(obj) -} - -func (h handler) archive(obj interface{}) { - un, ok := obj.(*unstructured.Unstructured) - if !ok { - return - } - if un.GetLabels()[common.LabelKeyArchiveStatus] != "Pending" { - return - } - logCtx := log.WithFields(log.Fields{"namespace": un.GetNamespace(), "name": un.GetName()}) - wf, err := util.FromUnstructured(un) - if err != nil { - logCtx.WithError(err).Error("failed to convert from unstructured to workflow") - return - } - err = h.hydrator.Hydrate(wf) - if err != nil { - logCtx.WithError(err).Error("failed to hydrate workflow") - return - } - err = wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) { - err := h.workflowArchive.ArchiveWorkflow(wf) - return err == nil, err - }) - if err != nil { - logCtx.WithError(err).Error("failed to archive workflow") - return - } - patch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]string{ - common.LabelKeyArchiveStatus: "Succeeded", - }, - }, - }) - logCtx.Info("Marking workflow archiving as succeeded") - _, err = h.workflowInterface.ArgoprojV1alpha1().Workflows(un.GetNamespace()).Patch(un.GetName(), types.MergePatchType, patch) - if err != nil { - logCtx.WithError(err).Error("failed to mark workflow as archived") - return - } -} - -func NewHandler(hydrator hydrator.Interface, workflowArchive sqldb.WorkflowArchive, workflowInterface versioned.Interface) cache.FilteringResourceEventHandler { - return cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - un, ok := obj.(*unstructured.Unstructured) - return ok && un.GetLabels()[common.LabelKeyArchiveStatus] == "Pending" - }, - Handler: &handler{hydrator, workflowArchive, workflowInterface}, - } -} - -var _ cache.ResourceEventHandler = &handler{}