Skip to content

Commit

Permalink
WIP Handle dag in pipelineresoultion
Browse files Browse the repository at this point in the history
In the pipelinerun controller, today we follow this logic:
- build the dag from the spec, using the dag module
- resolve the pipelinerun state using the spec and status, using the
  resources module
- get a list of candidate next tasks from the dag module, passing part
  of the state
- get a list of next tasks from the resources module, using the list
  of candidates and the pipeline run status

The separation of concerns between the dag, resources and reconciler
modules feels a bit mixed up.

This is just a PoC that resolves part of the issue, by moving the
invocation of the dag building as well as obtaining the list of
candidates to the dag module, and aggregating the dag to the pipeline
state struct.

I did not update tests yet, this is for discussion purposes.
  • Loading branch information
afrittoli committed Jun 15, 2020
1 parent f291efc commit 0c7d850
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
20 changes: 7 additions & 13 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
pr.ObjectMeta.Annotations[key] = value
}

d, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Tasks))
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonInvalidGraph,
"PipelineRun %s/%s's Pipeline DAG is invalid: %s",
pr.Namespace, pr.Name, err)
return nil
}

if err := pipelineSpec.Validate(ctx); err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonFailedValidation,
Expand Down Expand Up @@ -352,7 +343,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
pipelineSpec = resources.ApplyParameters(pipelineSpec, pr)

pipelineState, err := resources.ResolvePipelineRun(ctx,
*pr,
pipelineSpec, *pr,
func(name string) (v1beta1.TaskInterface, error) {
return c.taskLister.Tasks(pr.Namespace).Get(name)
},
Expand All @@ -365,7 +356,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
func(name string) (*v1alpha1.Condition, error) {
return c.conditionLister.Conditions(pr.Namespace).Get(name)
},
pipelineSpec.Tasks, providedResources,
providedResources,
)

if err != nil {
Expand All @@ -379,6 +370,10 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
pr.Status.MarkFailed(ReasonCouldntGetCondition,
"PipelineRun %s/%s can't be Run; it contains Conditions that don't exist: %s",
pipelineMeta.Namespace, pr.Name, err)
case *resource.InvalidDagError:
pr.Status.MarkFailed(ReasonInvalidGraph,
"PipelineRun %s/%s's Pipeline DAG is invalid: %s",
pr.Namespace, pr.Name, err)
default:
pr.Status.MarkFailed(ReasonFailedValidation,
"PipelineRun %s/%s can't be Run; couldn't resolve all references: %s",
Expand Down Expand Up @@ -467,13 +462,12 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulPipelineTaskNames()...)
nextRprts, err := pipelineState.GetNextTasks(candidateTasks)
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return nil
}

nextRprts := pipelineState.GetNextTasks(candidateTasks)
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineState, nextRprts)
if err != nil {
logger.Infof("Failed to resolve all task params for %q with error %v", pr.Name, err)
Expand Down
43 changes: 34 additions & 9 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (e *TaskNotFoundError) Error() string {
return fmt.Sprintf("Couldn't retrieve Task %q: %s", e.Name, e.Msg)
}


// ConditionNotFoundError is used to track failures to the
type ConditionNotFoundError struct {
Name string
Expand All @@ -62,6 +63,11 @@ func (e *ConditionNotFoundError) Error() string {
return fmt.Sprintf("Couldn't retrieve Condition %q: %s", e.Name, e.Msg)
}

// InvalidDagError indicates that it was not possible to build a valid dag from the pipeline spec
type InvalidDagError struct {
err error
}

// ResolvedPipelineRunTask contains a Task and its associated TaskRun, if it
// exists. TaskRun can be nil to represent there being no TaskRun.
type ResolvedPipelineRunTask struct {
Expand All @@ -73,9 +79,13 @@ type ResolvedPipelineRunTask struct {
ResolvedConditionChecks TaskConditionCheckState // Could also be a TaskRun or maybe just a Pod?
}

// PipelineRunState is a slice of ResolvedPipelineRunTasks the represents the current execution
// PipelineRunState is the combination of the Pipeline dag and
// a slice of ResolvedPipelineRunTasks the represents the current execution
// state of the PipelineRun.
type PipelineRunState []*ResolvedPipelineRunTask
type PipelineRunState struct {
Dag *dag.Graph
State []*ResolvedPipelineRunTask
}

func (t ResolvedPipelineRunTask) IsDone() (isDone bool) {
if t.TaskRun == nil || t.PipelineTask == nil {
Expand Down Expand Up @@ -144,15 +154,15 @@ func (t ResolvedPipelineRunTask) IsStarted() bool {
// ToMap returns a map that maps pipeline task name to the resolved pipeline run task
func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask {
m := make(map[string]*ResolvedPipelineRunTask)
for _, rprt := range state {
for _, rprt := range state.State {
m[rprt.PipelineTask.Name] = rprt
}
return m
}

func (state PipelineRunState) IsDone() (isDone bool) {
isDone = true
for _, t := range state {
for _, t := range state.State {
if t.TaskRun == nil || t.PipelineTask == nil {
return false
}
Expand All @@ -166,7 +176,7 @@ func (state PipelineRunState) IsDone() (isDone bool) {

// IsBeforeFirstTaskRun returns true if the PipelineRun has not yet started its first TaskRun
func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
for _, t := range state {
for _, t := range state.State {
if t.TaskRun != nil {
return false
}
Expand All @@ -177,7 +187,7 @@ func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
// IsStopping returns true if the PipelineRun won't be scheduling any new Task because
// at least one task already failed or was cancelled
func (state PipelineRunState) IsStopping() bool {
for _, t := range state {
for _, t := range state.State {
if t.IsCancelled() {
return true
}
Expand All @@ -190,7 +200,13 @@ func (state PipelineRunState) IsStopping() bool {

// GetNextTasks will return the next ResolvedPipelineRunTasks to execute, which are the ones in the
// list of candidateTasks which aren't yet indicated in state to be running.
func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) []*ResolvedPipelineRunTask {
func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) ([]*ResolvedPipelineRunTask, error) {

candidateTasks, err := dag.GetSchedulable(state.Dag, state.SuccessfulPipelineTaskNames()...)
if err != nil {
return nil, err
}

tasks := []*ResolvedPipelineRunTask{}
for _, t := range state {
if _, ok := candidateTasks[t.PipelineTask.Name]; ok && t.TaskRun == nil {
Expand All @@ -214,7 +230,7 @@ func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) [
// which have successfully completed.
func (state PipelineRunState) SuccessfulPipelineTaskNames() []string {
done := []string{}
for _, t := range state {
for _, t := range state.State {
if t.TaskRun != nil {
c := t.TaskRun.Status.GetCondition(apis.ConditionSucceeded)
if c.IsTrue() {
Expand Down Expand Up @@ -307,17 +323,26 @@ func ValidateServiceaccountMapping(p *v1beta1.PipelineSpec, pr *v1beta1.Pipeline
// instances from getTask. If it is unable to retrieve an instance of a referenced Task, it
// will return an error, otherwise it returns a list of all of the Tasks retrieved.
// It will retrieve the Resources needed for the TaskRun using the mapping of providedResources.
// It builds the dag based on the specified pipelineSpec, and stores it in the state
func ResolvePipelineRun(
ctx context.Context,
pipelineSpec v1beta1.PipelineSpec,
pipelineRun v1beta1.PipelineRun,
getTask resources.GetTask,
getTaskRun resources.GetTaskRun,
getClusterTask resources.GetClusterTask,
getCondition GetCondition,
tasks []v1beta1.PipelineTask,
providedResources map[string]*resourcev1alpha1.PipelineResource,
) (PipelineRunState, error) {

d, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Tasks))
if err != nil {
return &InvalidDagError{
err: err,
}
}
tasks := pipelineSpec.tasks

state := []*ResolvedPipelineRunTask{}
for i := range tasks {
pt := tasks[i]
Expand Down

0 comments on commit 0c7d850

Please sign in to comment.