Skip to content

Commit

Permalink
fix: Consider expanded tasks in getTaskFromNode (#2756)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Apr 20, 2020
1 parent ca5cdc4 commit e8cd8d7
Show file tree
Hide file tree
Showing 2 changed files with 522 additions and 10 deletions.
37 changes: 27 additions & 10 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,20 @@ func (d *dagContext) taskNodeName(taskName string) string {
return fmt.Sprintf("%s.%s", d.boundaryName, taskName)
}

// nodeTaskName formulates the corresponding task name for a dag node. Note that this is not simply the inverse of
// taskNodeName. A task name might be from an expanded task, in which case it will not have an explicit task defined for it.
// When that is the case, we formulate the name of the original expanded task by removing the fields after "("
func (d *dagContext) taskNameFromNodeName(nodeName string) string {
nodeName = strings.TrimPrefix(nodeName, fmt.Sprintf("%s.", d.boundaryName))
// Check if this nodeName comes from an expanded task. If it does, return the original parent task
if index := strings.Index(nodeName, "("); index != -1 {
nodeName = nodeName[:index]
}
return nodeName
}

func (d *dagContext) getTaskFromNode(node *wfv1.NodeStatus) *wfv1.DAGTask {
for _, task := range d.tasks {
if node.Name == d.taskNodeName(task.Name) {
return &task
}
}
panic("task from node " + node.Name + " does not exist")
return d.getTask(d.taskNameFromNodeName(node.Name))
}

// taskNodeID formulates the node ID for a dag task
Expand Down Expand Up @@ -331,6 +338,10 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
dependenciesCompleted := true
dependenciesSuccessful := true
nodeName := dagCtx.taskNodeName(taskName)
// Ensure that the generated taskNodeName can be reversed into the original (not expanded) task name
if dagCtx.taskNameFromNodeName(nodeName) != task.Name {
panic("unreachable: node name cannot be reversed into task name; please file a bug on GitHub")
}
for _, depName := range task.Dependencies {
depNode := dagCtx.GetTaskNode(depName)
if depNode != nil && depNode.Completed() {
Expand Down Expand Up @@ -399,7 +410,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {

// Next, expand the DAG's withItems/withParams/withSequence (if any). If there was none, then
// expandedTasks will be a single element list of the same task
expandedTasks, err := woc.expandTask(*newTask)
expandedTasks, err := expandTask(*newTask)
if err != nil {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeError, err.Error())
connectDependencies(nodeName)
Expand All @@ -409,16 +420,22 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
// If DAG task has withParam of with withSequence then we need to create virtual node of type TaskGroup.
// For example, if we had task A with withItems of ['foo', 'bar'] which expanded to ['A(0:foo)', 'A(1:bar)'], we still
// need to create a node for A.
if len(task.WithItems) > 0 || task.WithParam != "" || task.WithSequence != nil {
if task.ShouldExpand() {
if taskGroupNode == nil {
connectDependencies(nodeName)
taskGroupNode = woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeRunning, "")
}
}

for _, t := range expandedTasks {
node = dagCtx.GetTaskNode(t.Name)

taskNodeName := dagCtx.taskNodeName(t.Name)
// Ensure that the generated taskNodeName can be reversed into the original (not expanded) task name
if dagCtx.taskNameFromNodeName(taskNodeName) != task.Name {
panic("unreachable: task node name cannot be reversed into tag name; please file a bug on GitHub")
}

node = dagCtx.GetTaskNode(t.Name)
if node == nil {
woc.log.Infof("All of node %s dependencies %s completed", taskNodeName, task.Dependencies)
// Add the child relationship from our dependency's outbound nodes to this node.
Expand Down Expand Up @@ -581,7 +598,7 @@ func findLeafTaskNames(tasks []wfv1.DAGTask) []string {
}

// expandTask expands a single DAG task containing withItems, withParams, withSequence into multiple parallel tasks
func (woc *wfOperationCtx) expandTask(task wfv1.DAGTask) ([]wfv1.DAGTask, error) {
func expandTask(task wfv1.DAGTask) ([]wfv1.DAGTask, error) {
taskBytes, err := json.Marshal(task)
if err != nil {
return nil, errors.InternalWrapError(err)
Expand Down
Loading

0 comments on commit e8cd8d7

Please sign in to comment.