Skip to content

Commit

Permalink
New Feature: provide failFast flag, allow a DAG to run all branches o…
Browse files Browse the repository at this point in the history
…f the DAG (either success or failure) (#1443)

* Fix bug:   dag will missing some nodes when another branch node fails

* Add test file

* New Feature:   provide failFast  flag, allow a DAG to run all branches of the DAG (either success or failure)

* Move failFast flag to DAG template spec

* * Move test case file to test/e2e/expectedfailures since it is expected to fail
* Remove unused check code
  • Loading branch information
xianlubird authored and sarabala1979 committed Jul 1, 2019
1 parent b9b87b7 commit 7a21adf
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 0 deletions.
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@
"tasks"
],
"properties": {
"failFast": {
"description": "This flag is for DAG logic. The DAG logic has a built-in \"fail fast\" feature to stop scheduling new steps, as soon as it detects that one of the DAG nodes is failed. Then it waits until all DAG nodes are completed before failing the DAG itself. The FailFast flag default is true, if set to false, it will allow a DAG to run all branches of the DAG to completion (either success or failure), regardless of the failed outcomes of branches in the DAG. More info and example about this feature at https://github.com/argoproj/argo/issues/1442",
"type": "boolean"
},
"target": {
"description": "Target are one or more names of targets to execute in a DAG",
"type": "string"
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,14 @@ type DAGTemplate struct {

// Tasks are a list of DAG tasks
Tasks []DAGTask `json:"tasks"`

// This flag is for DAG logic. The DAG logic has a built-in "fail fast" feature to stop scheduling new steps,
// as soon as it detects that one of the DAG nodes is failed. Then it waits until all DAG nodes are completed
// before failing the DAG itself.
// The FailFast flag default is true, if set to false, it will allow a DAG to run all branches of the DAG to
// completion (either success or failure), regardless of the failed outcomes of branches in the DAG.
// More info and example about this feature at https://github.com/argoproj/argo/issues/1442
FailFast *bool `json:"failFast,omitempty"`
}

// DAGTask represents a node in the graph during DAG execution
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions test/e2e/expectedfailures/dag-disable-failFast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-primay-branch-
spec:
entrypoint: statis
templates:
- name: a
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
- name: b
retryStrategy:
limit: 2
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 30; echo haha"]
- name: c
retryStrategy:
limit: 3
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 2"]
- name: d
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
- name: statis
dag:
failFast: false
tasks:
- name: A
template: a
- name: B
dependencies: [A]
template: b
- name: C
dependencies: [A]
template: c
- name: D
dependencies: [B]
template: d
- name: E
dependencies: [D]
template: d
48 changes: 48 additions & 0 deletions test/e2e/expectedfailures/dag-noroot-branch-failed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-primay-branch-
spec:
entrypoint: statis
templates:
- name: a
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
- name: b
retryStrategy:
limit: 2
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 30; echo haha"]
- name: c
retryStrategy:
limit: 3
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 2"]
- name: d
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
- name: statis
dag:
tasks:
- name: A
template: a
- name: B
dependencies: [A]
template: b
- name: C
dependencies: [A]
template: c
- name: D
dependencies: [B]
template: d
- name: E
dependencies: [D]
template: d
22 changes: 22 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,29 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes map[string]wfv1.
retriesExhausted = false
}
}

if unsuccessfulPhase != "" {
// If failFast set to false, we should return Running to continue this workflow for other DAG branch
if d.tmpl.DAG.FailFast != nil && !*d.tmpl.DAG.FailFast {
tmpOverAllFinished := true
// If all the nodes have finished, we should mark the failed node to finish overall workflow
// So we should check all the targetTasks have finished
for _, tmpDepName := range targetTasks {
tmpDepNode := d.getTaskNode(tmpDepName)
if tmpDepNode == nil {
tmpOverAllFinished = false
break
}
if tmpDepNode.Type == wfv1.NodeTypeRetry && hasMoreRetries(tmpDepNode, d.wf) {
tmpOverAllFinished = false
break
}
}
if !tmpOverAllFinished {
return wfv1.NodeRunning
}
}

// if we were unsuccessful, we can return *only* if all retry nodes have ben exhausted.
if retriesExhausted {
return unsuccessfulPhase
Expand Down
8 changes: 8 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,11 @@ func TestDagRetryExhaustedXfail(t *testing.T) {
woc.operate()
assert.Equal(t, string(wfv1.NodeFailed), string(woc.wf.Status.Phase))
}

// TestDagDisableFailFast test disable fail fast function
func TestDagDisableFailFast(t *testing.T) {
wf := test.LoadTestWorkflow("testdata/dag-disable-fail-fast.yaml")
woc := newWoc(*wf)
woc.operate()
assert.Equal(t, string(wfv1.NodeFailed), string(woc.wf.Status.Phase))
}
Loading

0 comments on commit 7a21adf

Please sign in to comment.