Skip to content

Commit

Permalink
fix: Separate global scope processing from local scope building (#2528)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 authored Mar 31, 2020
1 parent 618b6de commit fb74ba1
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
7 changes: 4 additions & 3 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
// Can happen when dag.target was specified
continue
}
woc.processNodeOutputs(&scope, fmt.Sprintf("tasks.%s", task.Name), taskNode)
woc.buildLocalScope(&scope, fmt.Sprintf("tasks.%s", task.Name), taskNode)
woc.addOutputsToGlobalScope(taskNode.Outputs)
}
outputs, err := getTemplateOutputsFromScope(tmpl, &scope)
if err != nil {
Expand Down Expand Up @@ -448,7 +449,7 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(dagCtx *dagContext, task *wfv
tmpl: dagCtx.tmpl,
scope: make(map[string]interface{}),
}
woc.addOutputsToScope("workflow", woc.wf.Status.Outputs, &scope)
woc.addOutputsToLocalScope("workflow", woc.wf.Status.Outputs, &scope)

ancestors := common.GetTaskAncestry(dagCtx, task.Name, dagCtx.tasks)
for _, ancestor := range ancestors {
Expand All @@ -473,7 +474,7 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(dagCtx *dagContext, task *wfv
return nil, errors.InternalWrapError(err)
}
} else {
woc.processNodeOutputs(&scope, prefix, ancestorNode)
woc.buildLocalScope(&scope, prefix, ancestorNode)
}
}
return &scope, nil
Expand Down
24 changes: 17 additions & 7 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
if node, ok := woc.wf.Status.Nodes[nodeID]; ok {
if newState := woc.assessNodeStatus(pod, &node); newState != nil {
woc.wf.Status.Nodes[nodeID] = *newState
woc.addOutputsToScope("workflow", node.Outputs, nil)
woc.addOutputsToGlobalScope(node.Outputs)
woc.updated = true
}
node := woc.wf.Status.Nodes[pod.ObjectMeta.Name]
Expand Down Expand Up @@ -1895,9 +1895,9 @@ func (woc *wfOperationCtx) executeScript(nodeName string, templateScope string,
return node, err
}

// processNodeOutputs adds all of a nodes outputs to the local scope with the given prefix, as well
// buildLocalScope adds all of a nodes outputs to the local scope with the given prefix, as well
// as the global scope, if specified with a globalName
func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, node *wfv1.NodeStatus) {
func (woc *wfOperationCtx) buildLocalScope(scope *wfScope, prefix string, node *wfv1.NodeStatus) {
if node.PodIP != "" {
key := fmt.Sprintf("%s.ip", prefix)
scope.addParamToScope(key, node.PodIP)
Expand All @@ -1906,10 +1906,10 @@ func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, nod
key := fmt.Sprintf("%s.status", prefix)
scope.addParamToScope(key, string(node.Phase))
}
woc.addOutputsToScope(prefix, node.Outputs, scope)
woc.addOutputsToLocalScope(prefix, node.Outputs, scope)
}

func (woc *wfOperationCtx) addOutputsToScope(prefix string, outputs *wfv1.Outputs, scope *wfScope) {
func (woc *wfOperationCtx) addOutputsToLocalScope(prefix string, outputs *wfv1.Outputs, scope *wfScope) {
if outputs == nil {
return
}
Expand All @@ -1924,14 +1924,24 @@ func (woc *wfOperationCtx) addOutputsToScope(prefix string, outputs *wfv1.Output
if scope != nil {
scope.addParamToScope(key, *param.Value)
}
woc.addParamToGlobalScope(param)
}
for _, art := range outputs.Artifacts {
key := fmt.Sprintf("%s.outputs.artifacts.%s", prefix, art.Name)
if scope != nil {
scope.addArtifactToScope(key, art)
}
woc.addArtifactToGlobalScope(art, scope)
}
}

func (woc *wfOperationCtx) addOutputsToGlobalScope(outputs *wfv1.Outputs) {
if outputs == nil {
return
}
for _, param := range outputs.Parameters {
woc.addParamToGlobalScope(param)
}
for _, art := range outputs.Artifacts {
woc.addArtifactToGlobalScope(art, nil)
}
}

Expand Down
6 changes: 4 additions & 2 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu
},
tmplCtx: tmplCtx,
}
woc.addOutputsToScope("workflow", woc.wf.Status.Outputs, stepsCtx.scope)
woc.addOutputsToLocalScope("workflow", woc.wf.Status.Outputs, stepsCtx.scope)

for i, stepGroup := range tmpl.Steps {
sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i)
Expand Down Expand Up @@ -126,7 +126,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu
woc.log.Infof("Step '%s' has no expanded child nodes", childNode)
}
} else {
woc.processNodeOutputs(stepsCtx.scope, prefix, childNode)
woc.buildLocalScope(stepsCtx.scope, prefix, childNode)
}
}
}
Expand Down Expand Up @@ -259,6 +259,8 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
return node
}

woc.addOutputsToGlobalScope(node.Outputs)

// All children completed. Determine step group status as a whole
for _, childNodeID := range node.Children {
childNode := woc.wf.Status.Nodes[childNodeID]
Expand Down

0 comments on commit fb74ba1

Please sign in to comment.