Skip to content

Commit

Permalink
fix(controller): Failure tolerant workflow archiving and offloading. F…
Browse files Browse the repository at this point in the history
…ixes #3786 and #3837 (#3787)
  • Loading branch information
alexec authored Aug 28, 2020
1 parent 359ee8d commit 0cf7709
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
8 changes: 2 additions & 6 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,7 @@ spec:
templates:
- name: run-archie
container:
image: argoproj/argosay:v2
command: [cowsay, ":) Hello Argo!"]
imagePullPolicy: IfNotPresent`).
image: argoproj/argosay:v2`).
When().
SubmitWorkflow().
WaitForWorkflow(20 * time.Second).
Expand All @@ -1120,9 +1118,7 @@ spec:
templates:
- name: run-betty
container:
image: argoproj/argosay:v2
command: [cowsay, ":) Hello Argo!"]
imagePullPolicy: IfNotPresent`).
image: argoproj/argosay:v2`).
When().
SubmitWorkflow().
WaitForWorkflow(20 * time.Second)
Expand Down
7 changes: 5 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1738,12 +1738,15 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted
}

if woc.controller.isArchivable(woc.wf) {
err = woc.controller.wfArchive.ArchiveWorkflow(woc.wf)
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := woc.controller.wfArchive.ArchiveWorkflow(woc.wf)
return err == nil, err
})
if err != nil {
woc.log.WithField("err", err).Error("Failed to archive workflow")
}
} else {
woc.log.Infof("Does't match with archive label selector. Skipping Archive")
woc.log.Infof("Doesn't match with archive label selector. Skipping Archive")
}
woc.updated = true
}
Expand Down
14 changes: 12 additions & 2 deletions workflow/hydrator/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"os"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -51,7 +53,11 @@ func (h hydrator) Hydrate(wf *wfv1.Workflow) error {
return err
}
if wf.Status.IsOffloadNodeStatus() {
offloadedNodes, err := h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
var offloadedNodes wfv1.Nodes
err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
offloadedNodes, err = h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
return err == nil, err
})
if err != nil {
return err
}
Expand All @@ -73,7 +79,11 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error {
}
}
if packer.IsTooLargeError(err) || alwaysOffloadNodeStatus {
offloadVersion, err := h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
var offloadVersion string
err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
offloadVersion, err = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
return err == nil, err
})
if err != nil {
return err
}
Expand Down

0 comments on commit 0cf7709

Please sign in to comment.