From ee848921348676718a8ab4cef8e8c2f52b86d124 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Fri, 28 Aug 2020 09:16:30 -0700 Subject: [PATCH] fix(controller): Failure tolerant workflow archiving and offloading. Fixes #3786 and #3837 (#3787) --- test/e2e/argo_server_test.go | 8 ++------ workflow/controller/operator.go | 7 +++++-- workflow/hydrator/hydrator.go | 14 ++++++++++++-- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/test/e2e/argo_server_test.go b/test/e2e/argo_server_test.go index 654c1e2ad56a..cc0c4bdc0045 100644 --- a/test/e2e/argo_server_test.go +++ b/test/e2e/argo_server_test.go @@ -890,9 +890,7 @@ spec: templates: - name: run-archie container: - image: argoproj/argosay:v2 - command: [cowsay, ":) Hello Argo!"] - imagePullPolicy: IfNotPresent`). + image: argoproj/argosay:v2`). When(). SubmitWorkflow(). WaitForWorkflow(20 * time.Second). @@ -912,9 +910,7 @@ spec: templates: - name: run-betty container: - image: argoproj/argosay:v2 - command: [cowsay, ":) Hello Argo!"] - imagePullPolicy: IfNotPresent`). + image: argoproj/argosay:v2`). When(). SubmitWorkflow(). WaitForWorkflow(20 * time.Second) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index d448c94b6fd3..4193fcd7937f 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1679,12 +1679,15 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted } if woc.controller.isArchivable(woc.wf) { - err = woc.controller.wfArchive.ArchiveWorkflow(woc.wf) + err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { + err := woc.controller.wfArchive.ArchiveWorkflow(woc.wf) + return err == nil, err + }) if err != nil { woc.log.WithField("err", err).Error("Failed to archive workflow") } } else { - woc.log.Infof("Does't match with archive label selector. Skipping Archive") + woc.log.Infof("Doesn't match with archive label selector. Skipping Archive") } woc.updated = true } diff --git a/workflow/hydrator/hydrator.go b/workflow/hydrator/hydrator.go index 5772d3c50f96..74554a543b64 100644 --- a/workflow/hydrator/hydrator.go +++ b/workflow/hydrator/hydrator.go @@ -4,6 +4,8 @@ import ( "os" log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" "github.com/argoproj/argo/persist/sqldb" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -51,7 +53,11 @@ func (h hydrator) Hydrate(wf *wfv1.Workflow) error { return err } if wf.Status.IsOffloadNodeStatus() { - offloadedNodes, err := h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion()) + var offloadedNodes wfv1.Nodes + err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) { + offloadedNodes, err = h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion()) + return err == nil, err + }) if err != nil { return err } @@ -73,7 +79,11 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error { } } if packer.IsTooLargeError(err) || alwaysOffloadNodeStatus { - offloadVersion, err := h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes) + var offloadVersion string + err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) { + offloadVersion, err = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes) + return err == nil, err + }) if err != nil { return err }