Skip to content

Commit

Permalink
refactor GetPipelineConditionStatus
Browse files Browse the repository at this point in the history
Adding an attribute to PipelineRunState to tell whether a PipelineTask
will not be run because (1) its condition checks failed or (2) one of
the parents conditions failed or (3) pipeline is in stopping state

Instead of explicitly checking (3) in GetPipelineConditionStatus, moved this
check to the IsSkipped function which is used by both the scheduler and
while determining pipeline status.
  • Loading branch information
pritidesai committed Jun 29, 2020
1 parent 851a266 commit 2f8539c
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 76 deletions.
122 changes: 53 additions & 69 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,43 @@ func (t ResolvedPipelineRunTask) IsStarted() bool {
return true
}

// IsSkipped returns true if a PipelineTask will not be run because
// (1) its Condition Checks failed or
// (2) one of the parent task's conditions failed or
// (3) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means IsSkipped returns false if a conditionCheck is in progress
func (t ResolvedPipelineRunTask) IsSkipped(state PipelineRunState, d *dag.Graph) bool {
// it already has TaskRun associated with it - PipelineTask not skipped
if t.IsStarted() {
return false
}

// Check if conditionChecks have failed, if so task is skipped
if len(t.ResolvedConditionChecks) > 0 {
if t.ResolvedConditionChecks.IsDone() && !t.ResolvedConditionChecks.IsSuccess() {
return true
}
}

// Skip the PipelineTask if pipeline is in stopping state
if isTaskInGraph(t.PipelineTask.Name, d) && state.IsStopping(d) {
return true
}

stateMap := state.ToMap()
// Recursively look at parent tasks to see if they have been skipped,
// if any of the parents have been skipped, skip as well
node := d.Nodes[t.PipelineTask.Name]
if isTaskInGraph(t.PipelineTask.Name, d) {
for _, p := range node.Prev {
if stateMap[p.Task.HashKey()].IsSkipped(state, d) {
return true
}
}
}
return false
}

// ToMap returns a map that maps pipeline task name to the resolved pipeline run task
func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask {
m := make(map[string]*ResolvedPipelineRunTask)
Expand Down Expand Up @@ -174,7 +211,7 @@ func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
// at least one task already failed or was cancelled in the specified dag
func (state PipelineRunState) IsStopping(d *dag.Graph) bool {
for _, t := range state {
if _, ok := d.Nodes[t.PipelineTask.Name]; ok {
if isTaskInGraph(t.PipelineTask.Name, d) {
if t.IsCancelled() {
return true
}
Expand Down Expand Up @@ -213,45 +250,26 @@ func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) [
// which have successfully completed or skipped
func (state PipelineRunState) SuccessfulOrSkippedDAGTasks(d *dag.Graph) []string {
tasks := []string{}
stateMap := state.ToMap()
for _, t := range state {
if _, ok := d.Nodes[t.PipelineTask.Name]; ok {
if t.IsSuccessful() || isSkipped(t, stateMap, d) {
if isTaskInGraph(t.PipelineTask.Name, d) {
if t.IsSuccessful() || t.IsSkipped(state, d) {
tasks = append(tasks, t.PipelineTask.Name)
}
}
}
return tasks
}

// isDAGTasksStopped returns true if any of the pipeline task has failed
// and none of the pipeline task are still running
func (state PipelineRunState) isDAGTasksStopped(d *dag.Graph) bool {
failed := false
for _, t := range state {
if t.IsFailure() {
failed = true
continue
}
if t.IsStarted() && !t.IsDone() {
failed = false
break
}
}
return failed
}

// checkTasksDone returns true if all tasks from the specified graph are finished executing
// a task is considered done if it has failed/succeeded/skipped
func (state PipelineRunState) checkTasksDone(d *dag.Graph) bool {
stateMap := state.ToMap()
for _, t := range state {
if _, ok := d.Nodes[t.PipelineTask.Name]; ok {
if isTaskInGraph(t.PipelineTask.Name, d) {
if t.TaskRun == nil {
// this task might have skipped if taskRun is nil
// continue and ignore if this task was skipped
// skipped task is considered part of done
if isSkipped(t, stateMap, d) {
if t.IsSkipped(state, d) {
continue
}
return false
Expand All @@ -272,10 +290,10 @@ func (state PipelineRunState) GetFinalTasks(d *dag.Graph, dfinally *dag.Graph) [
finalCandidates := map[string]struct{}{}
// check either pipeline has finished executing all DAG pipelineTasks
// or any one of the DAG pipelineTask has failed
if state.isDAGTasksStopped(d) || state.checkTasksDone(d) {
if state.checkTasksDone(d) {
// return list of tasks with all final tasks
for _, t := range state {
if _, ok := dfinally.Nodes[t.PipelineTask.Name]; ok && !t.IsSuccessful() {
if isTaskInGraph(t.PipelineTask.Name, dfinally) && !t.IsSuccessful() {
finalCandidates[t.PipelineTask.Name] = struct{}{}
}
}
Expand All @@ -284,6 +302,14 @@ func (state PipelineRunState) GetFinalTasks(d *dag.Graph, dfinally *dag.Graph) [
return tasks
}

// Check if a PipelineTask belongs to the specified Graph
func isTaskInGraph(pipelineTaskName string, d *dag.Graph) bool {
if _, ok := d.Nodes[pipelineTaskName]; ok {
return true
}
return false
}

// GetTaskRun is a function that will retrieve the TaskRun name.
type GetTaskRun func(name string) (*v1beta1.TaskRun, error)

Expand Down Expand Up @@ -494,8 +520,6 @@ func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState,
failedTasks := int(0)
cancelledTasks := int(0)
reason := v1beta1.PipelineRunReasonSuccessful.String()
stateAsMap := state.ToMap()
isStopping := state.IsStopping(dag)

// Check to see if all tasks are success or skipped
//
Expand All @@ -512,18 +536,9 @@ func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState,
for _, rprt := range state {
allTasks = append(allTasks, rprt.PipelineTask.Name)
switch {
case !rprt.IsStarted() && isStopping:
// If the pipeline is in stopping mode, all tasks that are not running
// already will be skipped. Otherwise these tasks end up in the
// incomplete count.
// this should never be the case for final task
if _, ok := dag.Nodes[rprt.PipelineTask.Name]; ok {
skipTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
}
case rprt.IsSuccessful():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
case isSkipped(rprt, stateAsMap, dag):
case rprt.IsSkipped(state, dag):
skipTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
// At least one is skipped and no failure yet, mark as completed
Expand Down Expand Up @@ -576,37 +591,6 @@ func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState,
}
}

// isSkipped returns true if a Task in a TaskRun will not be run either because
// its Condition Checks failed or because one of the parent tasks' conditions failed
// Note that this means isSkipped returns false if a conditionCheck is in progress
func isSkipped(rprt *ResolvedPipelineRunTask, stateMap map[string]*ResolvedPipelineRunTask, d *dag.Graph) bool {
// Taskrun not skipped if it already exists
if rprt.TaskRun != nil {
return false
}

// Check if conditionChecks have failed, if so task is skipped
if len(rprt.ResolvedConditionChecks) > 0 {
// isSkipped is only true iof
if rprt.ResolvedConditionChecks.IsDone() && !rprt.ResolvedConditionChecks.IsSuccess() {
return true
}
}

// Recursively look at parent tasks to see if they have been skipped,
// if any of the parents have been skipped, skip as well
// continue if the task does not belong to the specified Graph
if node, ok := d.Nodes[rprt.PipelineTask.Name]; ok {
for _, p := range node.Prev {
skip := isSkipped(stateMap[p.Task.HashKey()], stateMap, d)
if skip {
return true
}
}
}
return false
}

func resolveConditionChecks(pt *v1beta1.PipelineTask, taskRunStatus map[string]*v1beta1.PipelineRunTaskRunStatus, taskRunName string, getTaskRun resources.GetTaskRun, getCondition GetCondition, providedResources map[string]*resourcev1alpha1.PipelineResource) ([]*ResolvedConditionCheck, error) {
rccs := []*ResolvedConditionCheck{}
for i := range pt.Conditions {
Expand Down
53 changes: 46 additions & 7 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func TestIsSkipped(t *testing.T) {
TaskSpec: &task.Spec,
},
}},
expected: false,
expected: true,
}, {
name: "tasks-parent-cancelled",
taskName: "mytask7",
Expand All @@ -1099,7 +1099,7 @@ func TestIsSkipped(t *testing.T) {
TaskSpec: &task.Spec,
},
}},
expected: false,
expected: true,
}, {
name: "tasks-grandparent-failed",
taskName: "mytask10",
Expand Down Expand Up @@ -1129,7 +1129,7 @@ func TestIsSkipped(t *testing.T) {
TaskSpec: &task.Spec,
},
}},
expected: false,
expected: true,
}, {
name: "tasks-parents-failed-passed",
taskName: "mytask8",
Expand All @@ -1155,7 +1155,33 @@ func TestIsSkipped(t *testing.T) {
TaskSpec: &task.Spec,
},
}},
expected: false,
expected: true,
}, {
name: "task-failed-pipeline-stopping",
taskName: "mytask7",
state: PipelineRunState{{
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: makeFailed(trs[0]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}, {
PipelineTask: &pts[5],
TaskRunName: "pipelinerun-mytask2",
TaskRun: makeStarted(trs[1]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}, {
PipelineTask: &pts[6], // mytask7 runAfter mytask6
TaskRunName: "pipelinerun-mytask3",
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}},
expected: true,
}}

for _, tc := range tcs {
Expand All @@ -1169,7 +1195,7 @@ func TestIsSkipped(t *testing.T) {
if rprt == nil {
t.Fatalf("Could not get task %s from the state: %v", tc.taskName, tc.state)
}
isSkipped := isSkipped(rprt, stateMap, dag)
isSkipped := rprt.IsSkipped(tc.state, dag)
if d := cmp.Diff(isSkipped, tc.expected); d != "" {
t.Errorf("Didn't get expected isSkipped %s", diff.PrintWantGot(d))
}
Expand Down Expand Up @@ -1197,7 +1223,7 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
}, {
name: "one-task-failed",
state: oneFailedState,
expectedNames: []string{},
expectedNames: []string{pts[1].Name},
}, {
name: "all-finished",
state: allFinishedState,
Expand Down Expand Up @@ -1301,6 +1327,9 @@ func TestGetPipelineConditionStatus(t *testing.T) {
TaskRun: makeFailed(trs[0]),
}}

var taskMultipleFailuresOneCancel = taskMultipleFailuresSkipRunning
taskMultipleFailuresOneCancel = append(taskMultipleFailuresOneCancel, cancelledTask[0])

var taskNotRunningWithSuccesfulParentsOneFailed = PipelineRunState{{
TaskRunName: "task0taskrun",
PipelineTask: &pts[5],
Expand Down Expand Up @@ -1439,7 +1468,7 @@ func TestGetPipelineConditionStatus(t *testing.T) {
expectedStatus: corev1.ConditionFalse,
expectedCancelled: 1,
}, {
name: "task with multiple failures; one cancelled",
name: "task with multiple failures",
state: taskMultipleFailuresSkipRunning,
expectedReason: v1beta1.PipelineRunReasonStopping.String(),
expectedStatus: corev1.ConditionUnknown,
Expand All @@ -1448,6 +1477,16 @@ func TestGetPipelineConditionStatus(t *testing.T) {
expectedIncomplete: 1,
expectedCancelled: 0,
expectedSkipped: 0,
}, {
name: "task with multiple failures; one cancelled",
state: taskMultipleFailuresOneCancel,
expectedReason: v1beta1.PipelineRunReasonStopping.String(),
expectedStatus: corev1.ConditionUnknown,
expectedSucceeded: 1,
expectedFailed: 1,
expectedIncomplete: 1,
expectedCancelled: 1,
expectedSkipped: 0,
}, {
name: "task not started with passed parent; one failed",
state: taskNotRunningWithSuccesfulParentsOneFailed,
Expand Down

0 comments on commit 2f8539c

Please sign in to comment.