Skip to content

Commit

Permalink
Add feature to continue workflow on failed/error steps/tasks (#1205)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Schrodi authored and jessesuen committed Feb 27, 2019
1 parent 3f1fb9d commit 94cda3d
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 4 deletions.
19 changes: 19 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,17 @@
}
}
},
"io.argoproj.workflow.v1alpha1.ContinueOn": {
"description": "ContinueOn defines if a workflow should continue even if a task or step fails/errors. It can be specified if the workflow should continue when the pod errors, fails or both.",
"properties": {
"error": {
"type": "boolean"
},
"failed": {
"type": "boolean"
}
}
},
"io.argoproj.workflow.v1alpha1.DAGTask": {
"description": "DAGTask represents a node in the graph during DAG execution",
"required": [
Expand All @@ -174,6 +185,10 @@
"description": "Arguments are the parameter and artifact arguments to the template",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Arguments"
},
"continueOn": {
"description": "ContinueOn makes argo to proceed with the following step even if this step fails. Errors and Failed states can be specified",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.ContinueOn"
},
"dependencies": {
"description": "Dependencies are name of other targets which this depends on",
"type": "array",
Expand Down Expand Up @@ -1181,6 +1196,10 @@
"description": "Arguments hold arguments to the template",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Arguments"
},
"continueOn": {
"description": "ContinueOn makes argo to proceed with the following step even if this step fails. Errors and Failed states can be specified",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.ContinueOn"
},
"name": {
"description": "Name of the step",
"type": "string"
Expand Down
44 changes: 44 additions & 0 deletions examples/dag-continueOn-fail.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-contiueOn-fail-
spec:
entrypoint: workflow
templates:
- name: workflow
dag:
tasks:
- name: A
template: whalesay
- name: B
dependencies: [A]
template: intentional-fail
continueOn:
failed: true
- name: C
dependencies: [A]
template: whalesay
- name: D
dependencies: [B, C]
template: whalesay
- name: E
dependencies: [A]
template: intentional-fail
- name: F
dependencies: [A]
template: whalesay
- name: G
dependencies: [E, F]
template: whalesay

- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]

- name: intentional-fail
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 1"]
67 changes: 67 additions & 0 deletions examples/workflow-continueOn-fail.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Example on specifying parallelism on the outer workflow and limiting the number of its
# children workflowss to be run at the same time.
#
# As the parallelism of A is 1, the four steps of seq-step will run sequentially.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-continueOn-fail-
spec:
entrypoint: workflow
templates:
- name: workflow
steps:
- - name: wf-ignore
template: workflow-ignore
- name: wf-not-ignore
template: workflow-not-ignore

- name: workflow-ignore
steps:
- - name: A
template: whalesay
- - name: B
template: whalesay
- name: C
template: intentional-fail
continueOn:
failed: true
- - name: D
template: whalesay

- name: workflow-not-ignore
steps:
- - name: E
template: whalesay
- - name: F
template: whalesay
- name: G
template: intentional-fail
- - name: H
template: whalesay

# - name: B
# inputs:
# parameters:
# - name: seq-id
# steps:
# - - name: jobs
# template: one-job
# arguments:
# parameters:
# - name: seq-id
# value: "{{inputs.parameters.seq-id}}"
# withParam: "[1, 2]"

- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]

- name: intentional-fail
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 1"]
42 changes: 40 additions & 2 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ type WorkflowStep struct {

// When is an expression in which the step should conditionally execute
When string `json:"when,omitempty"`

// ContinueOn makes argo to proceed with the following step even if this step fails.
// Errors and Failed states can be specified
ContinueOn *ContinueOn `json:"continueOn,omitempty"`
}

// Item expands a single workflow step into multiple parallel steps
Expand Down Expand Up @@ -845,6 +849,10 @@ type DAGTask struct {

// When is an expression in which the task should conditionally execute
When string `json:"when,omitempty"`

// ContinueOn makes argo to proceed with the following step even if this step fails.
// Errors and Failed states can be specified
ContinueOn *ContinueOn `json:"continueOn,omitempty"`
}

// SuspendTemplate is a template subtype to suspend a workflow at a predetermined point in time
Expand Down Expand Up @@ -940,3 +948,35 @@ func (wf *Workflow) NodeID(name string) string {
_, _ = h.Write([]byte(name))
return fmt.Sprintf("%s-%v", wf.ObjectMeta.Name, h.Sum32())
}

// ContinueOn defines if a workflow should continue even if a task or step fails/errors.
// It can be specified if the workflow should continue when the pod errors, fails or both.
type ContinueOn struct {
// +optional
Error bool `json:"error,omitempty"`
// +optional
Failed bool `json:"failed,omitempty"`
}

func continues(c *ContinueOn, phase NodePhase) bool {
if c == nil {
return false
}
if c.Error == true && phase == NodeError {
return true
}
if c.Failed == true && phase == NodeFailed {
return true
}
return false
}

// ContinuesOn returns whether the DAG should be proceeded if the task fails or errors.
func (t *DAGTask) ContinuesOn(phase NodePhase) bool {
return continues(t.ContinueOn, phase)
}

// ContinuesOn returns whether the StepGroup should be proceeded if the task fails or errors.
func (s *WorkflowStep) ContinuesOn(phase NodePhase) bool {
return continues(s.ContinueOn, phase)
}
26 changes: 26 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
depNode := dagCtx.getTaskNode(depName)
if depNode != nil {
if depNode.Completed() {
if !depNode.Successful() {
if !depNode.Successful() && !dagCtx.getTask(depName).ContinuesOn(depNode.Phase) {
dependenciesSuccessful = false
}
continue
Expand Down
8 changes: 7 additions & 1 deletion workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
woc.log.Debugf("Step group node %v already marked completed", node)
return node
}

// First, resolve any references to outputs from previous steps, and perform substitution
stepGroup, err := woc.resolveReferences(stepGroup, stepsCtx.scope)
if err != nil {
Expand All @@ -167,6 +168,9 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
return woc.markNodeError(sgNodeName, err)
}

// Maps nodes to their steps
nodeSteps := make(map[string]wfv1.WorkflowStep)

// Kick off all parallel steps in the group
for _, step := range stepGroup {
childNodeName := fmt.Sprintf("%s.%s", sgNodeName, step.Name)
Expand Down Expand Up @@ -202,6 +206,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
}
}
if childNode != nil {
nodeSteps[childNodeName] = step
woc.addChildNode(sgNodeName, childNodeName)
}
}
Expand All @@ -216,7 +221,8 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
// All children completed. Determine step group status as a whole
for _, childNodeID := range node.Children {
childNode := woc.wf.Status.Nodes[childNodeID]
if !childNode.Successful() {
step := nodeSteps[childNode.Name]
if !childNode.Successful() && !step.ContinuesOn(childNode.Phase) {
failMessage := fmt.Sprintf("child '%s' failed", childNodeID)
woc.log.Infof("Step group node %s deemed failed: %s", node, failMessage)
return woc.markNodePhase(node.Name, wfv1.NodeFailed, failMessage)
Expand Down

0 comments on commit 94cda3d

Please sign in to comment.