Skip to content

Commit

Permalink
Implement Skipping
Browse files Browse the repository at this point in the history
When `WhenExpressions` evaluate to False, the guarded `Task` and
its branch (dependent `Tasks`) are skipped

A `Task` is dependent on and in the branch of another `Task` as specified
by ordering using `runAfter` or by resources using `Results`, `Workspaces`
 and `Resources`

In some use cases, when `WhenExpressions` evaluate to `False`,
users need to skip the guarded `Task` only and allow ordering-dependent
`Tasks` to execute

When  `WhenExpressions` evaluate to `False`, it is possible to allow for
execution of ordering-dependent `Tasks` as specified by `runAfter` using
the `continueAfterSkip` field by setting it to `true` (`continueAfterSkip`
defaults to `false`).

However, setting `continueAfterSkip` in `Tasks` without `WhenExpressions`
or `Tasks` with resource dependencies is invalid, and will cause
`Pipeline` validation errors.

`continueAfterSkip` is only supported in `Tasks` with `WhenExpressions`;
there will be a validation error if it is specified in `Tasks` without
`WhenExpressions`
  • Loading branch information
jerop committed Sep 9, 2020
1 parent 079a6c8 commit c280a08
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 7 deletions.
21 changes: 21 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,27 @@ tasks:
name: echo-file-exists
```

When `WhenExpressions` evaluate to `False`, it is possible to allow for execution of ordering-dependent `Tasks` as specified by [`runAfter`](#using-the-runafter-parameter) using the `continueAfterSkip` field by setting it to `true` (`continueAfterSkip` defaults to `false`). However, setting `continueAfterSkip` in `Tasks` without `WhenExpressions` or `Tasks` with resource dependencies is invalid, and will cause `Pipeline` validation errors.

In this example, `task-should-be-skipped` will be skipped and `task-should-be-executed` will be executed.

```yaml
tasks:
- name: task-should-be-skipped
when:
- input: "foo"
operator: in
values: ["bar"]
continueAfterSkip: true
taskRef:
name: exit-1
- name: task-should-be-executed
runAfter:
- task-should-be-skipped
taskRef:
name: exit-0
```

For an end-to-end example, see [PipelineRun with WhenExpressions](../examples/v1beta1/pipelineruns/pipelinerun-with-when-expressions.yaml).

When `WhenExpressions` are specified in a `Task`, [`Conditions`](#guard-task-execution-using-conditions) should not be specified in the same `Task`. The `Pipeline` will be rejected as invalid if both `WhenExpressions` and `Conditions` are included.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,24 @@ spec:
- input: "$(params.path)"
operator: notin
values: ["README.md"]
continueAfterSkip: true
taskSpec:
steps:
- name: echo
image: ubuntu
script: exit 1
- name: task-should-be-executed-after-skipped-parent-task
runAfter:
- task-should-be-skipped
when:
- input: "$(params.path)"
operator: in
values: ["README.md"]
taskSpec:
steps:
- name: echo
image: ubuntu
script: 'echo created README.md'
---
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
Expand Down
9 changes: 9 additions & 0 deletions internal/builder/v1beta1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,15 @@ func PipelineTaskWhenExpression(input string, operator selection.Operator, value
}
}

// PipelineTaskContinueAfterSkip adds a boolean indicating whether the ordering-dependent
// Tasks, as specified by runAfter, should be executed after the parent Task has been
// skipped due to its When Expressions evaluating to false.
func PipelineTaskContinueAfterSkip(continueAfterSkip bool) PipelineTaskOp {
return func(pt *v1beta1.PipelineTask) {
pt.ContinueAfterSkip = continueAfterSkip
}
}

// PipelineTaskWorkspaceBinding adds a workspace with the specified name, workspace and subpath on a PipelineTask.
func PipelineTaskWorkspaceBinding(name, workspace, subPath string) PipelineTaskOp {
return func(pt *v1beta1.PipelineTask) {
Expand Down
12 changes: 7 additions & 5 deletions internal/builder/v1beta1/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestPipeline(t *testing.T) {
),
tb.PipelineTask("never-gonna", "give-you-up",
tb.PipelineTaskWhenExpression("foo", selection.In, []string{"foo", "bar"}),
tb.PipelineTaskContinueAfterSkip(true),
tb.RunAfter("foo"),
tb.PipelineTaskTimeout(5*time.Second),
),
Expand Down Expand Up @@ -135,11 +136,12 @@ func TestPipeline(t *testing.T) {
}},
},
}, {
Name: "never-gonna",
TaskRef: &v1beta1.TaskRef{Name: "give-you-up"},
WhenExpressions: []v1beta1.WhenExpression{{Input: "foo", Operator: selection.In, Values: []string{"foo", "bar"}}},
RunAfter: []string{"foo"},
Timeout: &metav1.Duration{Duration: 5 * time.Second},
Name: "never-gonna",
TaskRef: &v1beta1.TaskRef{Name: "give-you-up"},
WhenExpressions: []v1beta1.WhenExpression{{Input: "foo", Operator: selection.In, Values: []string{"foo", "bar"}}},
ContinueAfterSkip: true,
RunAfter: []string{"foo"},
Timeout: &metav1.Duration{Duration: 5 * time.Second},
}, {
Name: "foo",
TaskSpec: &v1beta1.EmbeddedTask{
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ type PipelineTask struct {
// +optional
WhenExpressions WhenExpressions `json:"when,omitempty"`

// ContinueAfterSkip is a bool used to indicate whether ordering-dependent tasks should be executed
// when the task is skipped due to its WhenExpressions evaluating to False
// +optional
ContinueAfterSkip bool `json:"continueAfterSkip,omitempty"`

// Retries represents how many times this task should be retried in case of task failure: ConditionSucceeded set to False
// +optional
Retries int `json:"retries,omitempty"`
Expand Down
57 changes: 57 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (ps *PipelineSpec) Validate(ctx context.Context) *apis.FieldError {
return err
}

if err := validateContinueAfterSkip(ps.Tasks); err != nil {
return apis.ErrInvalidValue(err.Error(), "spec.tasks.continueAfterSkip")
}

return nil
}

Expand Down Expand Up @@ -533,3 +537,56 @@ func validateOneOfWhenExpressionsOrConditions(i int, t PipelineTask) *apis.Field
}
return nil
}

func validateContinueAfterSkip(tasks []PipelineTask) error {
d, err := dag.Build(PipelineTaskList(tasks))
if err != nil {
return err
}
taskMap := toMap(tasks)
for i, t := range tasks {
if t.ContinueAfterSkip {
if t.WhenExpressions == nil {
return apis.ErrDisallowedFields("continueAfterSkip not allowed in tasks without WhenExpressions", fmt.Sprintf("spec.tasks[%d].continueAfterSkip", i))
}
if hasResourceDependencies(t, taskMap, d) {
return apis.ErrDisallowedFields("continueAfterSkip not allowed in tasks with resource dependencies", fmt.Sprintf("spec.tasks[%d].continueAfterSkip", i))
}
}
}
return nil
}

func toMap(tasks []PipelineTask) map[string]PipelineTask {
m := make(map[string]PipelineTask)
for _, t := range tasks {
m[t.Name] = t
}
return m
}

func hasResourceDependencies(parentTask PipelineTask, taskMap map[string]PipelineTask, d *dag.Graph) bool {
if node, ok := d.Nodes[parentTask.Name]; ok {
for _, childNode := range node.Next {
childTaskName := childNode.Task.HashKey()
childTask := taskMap[childTaskName]
if isResourceDependent(parentTask, childTask) {
return true
}
}
}
return false
}

func isOrderingDependent(parentTask PipelineTask, childTask PipelineTask) bool {
for _, orderingParent := range childTask.RunAfter {
if orderingParent == parentTask.Name {
return true
}
}
return false
}

func isResourceDependent(parentTask PipelineTask, childTask PipelineTask) bool {
return !isOrderingDependent(parentTask, childTask)
}
36 changes: 36 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,42 @@ func TestPipelineSpec_Validate_Failure(t *testing.T) {
WhenExpressions: []WhenExpression{{}},
}},
},
}, {
name: "invalid pipeline with one pipeline task having continueAfterSkip without WhenExpressions",
ps: &PipelineSpec{
Description: "this is an invalid pipeline with invalid pipeline task",
Tasks: []PipelineTask{{
Name: "valid-pipeline-task",
TaskRef: &TaskRef{Name: "foo-task"},
}, {
Name: "invalid-pipeline-task",
TaskRef: &TaskRef{Name: "foo-task"},
ContinueAfterSkip: true,
}},
},
}, {
name: "invalid pipeline with one pipeline task having continueAfterSkip with resource dependencies",
ps: &PipelineSpec{
Description: "this is an invalid pipeline with invalid pipeline task",
Tasks: []PipelineTask{{
Name: "invalid-pipeline-task",
TaskRef: &TaskRef{Name: "bar-task"},
WhenExpressions: []WhenExpression{{
Input: "foo",
Operator: selection.In,
Values: []string{"foo"},
}},
ContinueAfterSkip: true,
}, {
Name: "valid-pipeline-task",
TaskRef: &TaskRef{Name: "bar-task"},
WhenExpressions: []WhenExpression{{
Input: "$(tasks.invalid-pipeline-task.results.foo)",
Operator: selection.In,
Values: []string{"foo"},
}},
}},
},
}, {
name: "invalid pipeline with pipeline task having reference to resources which does not exist",
ps: &PipelineSpec{
Expand Down
78 changes: 78 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2191,6 +2191,84 @@ func TestReconcileWithWhenExpressionsWithTaskResults(t *testing.T) {
}
}

func TestReconcileWithWhenExpressionsContinueAfterSkip(t *testing.T) {
names.TestingSeed()
ps := []*v1beta1.Pipeline{tb.Pipeline("test-pipeline", tb.PipelineNamespace("foo"), tb.PipelineSpec(
tb.PipelineTask("a-task", "a-task",
tb.PipelineTaskWhenExpression("foo", selection.In, []string{"bar"}),
tb.PipelineTaskContinueAfterSkip(true),
),
tb.PipelineTask("b-task", "b-task",
tb.RunAfter("a-task"),
),
))}
prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-different-service-accs", tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec("test-pipeline",
tb.PipelineRunServiceAccountName("test-sa-0"),
),
)}
ts := []*v1beta1.Task{
tb.Task("a-task", tb.TaskNamespace("foo")),
tb.Task("b-task", tb.TaskNamespace("foo")),
}
trs := []*v1beta1.TaskRun{}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := NewPipelineRunTest(d, t)
defer prt.Cancel()

wantEvents := []string{
"Normal Started",
"Normal Running Tasks Completed: 0 \\(Failed: 0, Cancelled 0\\), Incomplete: 1, Skipped: 1",
}
pipelineRun, clients := prt.reconcileRun("foo", "test-pipeline-run-different-service-accs", wantEvents, false)

expectedTaskRunName := "test-pipeline-run-different-service-accs-b-task-mz4c7"
expectedTaskRun := tb.TaskRun(expectedTaskRunName,
tb.TaskRunNamespace("foo"),
tb.TaskRunOwnerReference("PipelineRun", "test-pipeline-run-different-service-accs",
tb.OwnerReferenceAPIVersion("tekton.dev/v1beta1"),
tb.Controller, tb.BlockOwnerDeletion,
),
tb.TaskRunLabel("tekton.dev/pipeline", "test-pipeline"),
tb.TaskRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-different-service-accs"),
tb.TaskRunLabel("tekton.dev/pipelineTask", "b-task"),
tb.TaskRunSpec(
tb.TaskRunTaskRef("b-task"),
tb.TaskRunServiceAccountName("test-sa-0"),
),
)
// Check that the expected TaskRun was created
actual, err := clients.Pipeline.TektonV1beta1().TaskRuns("foo").List(metav1.ListOptions{
LabelSelector: "tekton.dev/pipelineTask=b-task,tekton.dev/pipelineRun=test-pipeline-run-different-service-accs",
Limit: 1,
})

if err != nil {
t.Fatalf("Failure to list TaskRun's %s", err)
}
if len(actual.Items) != 1 {
t.Fatalf("Expected 1 TaskRuns got %d", len(actual.Items))
}
actualTaskRun := actual.Items[0]
if d := cmp.Diff(&actualTaskRun, expectedTaskRun, ignoreResourceVersion); d != "" {
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRunName, diff.PrintWantGot(d))
}

actualSkippedTasks := pipelineRun.Status.SkippedTasks
expectedSkippedTasks := []v1beta1.SkippedTask{{
Name: "a-task",
}}
if d := cmp.Diff(actualSkippedTasks, expectedSkippedTasks); d != "" {
t.Errorf("expected to find Skipped Tasks %v. Diff %s", expectedSkippedTasks, diff.PrintWantGot(d))
}
}

// TestReconcileWithAffinityAssistantStatefulSet tests that given a pipelineRun with workspaces,
// an Affinity Assistant StatefulSet is created for each PVC workspace and
// that the Affinity Assistant names is propagated to TaskRuns.
Expand Down
7 changes: 5 additions & 2 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo
node := d.Nodes[t.PipelineTask.Name]
if isTaskInGraph(t.PipelineTask.Name, d) {
for _, p := range node.Prev {
if stateMap[p.Task.HashKey()].Skip(state, d) {
return true
parentTask := stateMap[p.Task.HashKey()]
if parentTask.Skip(state, d) {
if !parentTask.PipelineTask.ContinueAfterSkip {
return true
}
}
}
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ var pts = []v1beta1.PipelineTask{{
Operator: selection.NotIn,
Values: []string{"foo", "bar"},
}},
}, {
Name: "mytask12",
TaskRef: &v1beta1.TaskRef{Name: "taskWithWhenExpressions"},
WhenExpressions: []v1beta1.WhenExpression{{
Input: "foo",
Operator: selection.NotIn,
Values: []string{"foo", "bar"},
}},
ContinueAfterSkip: true,
}}

var p = &v1beta1.Pipeline{
Expand Down Expand Up @@ -1175,6 +1184,52 @@ func TestIsSkipped(t *testing.T) {
},
}},
expected: true,
}, {
name: "tasks-when-expression-skip-from-parent",
taskName: "mytask13",
state: PipelineRunState{{
PipelineTask: &pts[10],
TaskRunName: "pipelinerun-guardedtask",
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}, {
PipelineTask: &v1beta1.PipelineTask{
Name: "mytask13",
TaskRef: &v1beta1.TaskRef{Name: "task"},
RunAfter: []string{"mytask11"},
}, // mytask13 runAfter mytask11
TaskRunName: "ordering-dependent-task",
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}},
expected: true,
}, {
name: "tasks-when-expression-continue-after-skip",
taskName: "mytask13",
state: PipelineRunState{{
PipelineTask: &pts[11],
TaskRunName: "pipelinerun-guardedtask",
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}, {
PipelineTask: &v1beta1.PipelineTask{
Name: "mytask13",
TaskRef: &v1beta1.TaskRef{Name: "task"},
RunAfter: []string{"mytask12"},
}, // mytask13 runAfter mytask12
TaskRunName: "ordering-dependent-task",
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}},
expected: false,
}}

for _, tc := range tcs {
Expand Down

0 comments on commit c280a08

Please sign in to comment.