Skip to content

Commit

Permalink
fix: Don't attempt to resolve artifacts if task is going to be skipped (
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 authored Apr 10, 2020
1 parent 2caf570 commit 4c452d5
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -5036,4 +5036,4 @@ HTTPHeader describes a custom header to be used in HTTP probes
| Field Name | Field Type | Description |
|:----------:|:----------:|---------------|
|`name`|`string`|The header field name|
|`value`|`string`|The header field value|
|`value`|`string`|The header field value|
8 changes: 8 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,10 @@ func (step *WorkflowStep) GetTemplateRef() *TemplateRef {
return step.TemplateRef
}

func (step *WorkflowStep) ShouldExpand() bool {
return len(step.WithItems) != 0 || step.WithParam != "" || step.WithSequence != nil
}

// Sequence expands a workflow step into numeric range
type Sequence struct {
// Count is number of elements in the sequence (default: 0). Not to be used with end
Expand Down Expand Up @@ -1590,6 +1594,10 @@ func (t *DAGTask) GetTemplateRef() *TemplateRef {
return t.TemplateRef
}

func (t *DAGTask) ShouldExpand() bool {
return len(t.WithItems) != 0 || t.WithParam != "" || t.WithSequence != nil
}

// SuspendTemplate is a template subtype to suspend a workflow at a predetermined point in time
type SuspendTemplate struct {
// Duration is the seconds to wait before automatically resuming a template
Expand Down
18 changes: 18 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,24 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
return nil, errors.InternalWrapError(err)
}

// If we are not executing, don't attempt to resolve any artifact references. We only check if we are executing after
// the initial parameter resolution, since it's likely that the "when" clause will contain parameter references.
proceed, err := shouldExecute(newTask.When)
if err != nil {
// If we got an error, it might be because our "when" clause contains a task-expansion parameter (e.g. {{item}}).
// Since we don't perform task-expansion until later and task-expansion parameters won't get resolved here,
// we continue execution as normal
if newTask.ShouldExpand() {
proceed = true
} else {
return nil, err
}
}
if !proceed {
// We can simply return here; the fact that this task won't execute will be reconciled later on in execution
return &newTask, nil
}

// replace all artifact references
for j, art := range newTask.Arguments.Artifacts {
if art.From == "" {
Expand Down
60 changes: 60 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,63 @@ func TestDagDisableFailFast(t *testing.T) {
woc.operate()
assert.Equal(t, string(wfv1.NodeFailed), string(woc.wf.Status.Phase))
}

var artifactResolutionWhenSkippedDAG = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: conditional-artifact-passing-
spec:
entrypoint: artifact-example
templates:
- name: artifact-example
dag:
tasks:
- name: generate-artifact
template: whalesay
when: "false"
- name: consume-artifact
dependencies: [generate-artifact]
template: print-message
when: "false"
arguments:
artifacts:
- name: message
from: "{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["sleep 1; cowsay hello world | tee /tmp/hello_world.txt"]
outputs:
artifacts:
- name: hello-art
path: /tmp/hello_world.txt
- name: print-message
inputs:
artifacts:
- name: message
path: /tmp/message
container:
image: alpine:latest
command: [sh, -c]
args: ["cat /tmp/message"]
`

// Tests ability to reference workflow parameters from within top level spec fields (e.g. spec.volumes)
func TestArtifactResolutionWhenSkippedDAG(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

wf := unmarshalWF(artifactResolutionWhenSkippedDAG)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)

woc.operate()
woc.operate()
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
}
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1863,7 +1863,7 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out
log.Warnf("Optional artifact '%s' was not found; it won't be available as an output", art.Name)
continue
}
return nil, err
return nil, fmt.Errorf("unable to resolve outputs from scope: %s", err)
}
resolvedArt.Name = art.Name
outputs.Artifacts = append(outputs.Artifacts, *resolvedArt)
Expand Down
23 changes: 21 additions & 2 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,33 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
return nil, errors.InternalWrapError(err)
}

// If we are not executing, don't attempt to resolve any artifact references. We only check if we are executing after
// the initial parameter resolution, since it's likely that the "when" clause will contain parameter references.
proceed, err := shouldExecute(newStep.When)
if err != nil {
// If we got an error, it might be because our "when" clause contains a task-expansion parameter (e.g. {{item}}).
// Since we don't perform task-expansion until later and task-expansion parameters won't get resolved here,
// we continue execution as normal
if newStep.ShouldExpand() {
proceed = true
} else {
return nil, err
}
}
if !proceed {
// We can simply return this WorkflowStep; the fact that it won't execute will be reconciled later on in execution
newStepGroup[i] = newStep
continue
}

// Step 2: replace all artifact references
for j, art := range newStep.Arguments.Artifacts {
if art.From == "" {
continue
}
resolvedArt, err := scope.resolveArtifact(art.From)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to resolve references: %s", err)
}
resolvedArt.Name = art.Name
newStep.Arguments.Artifacts[j] = *resolvedArt
Expand All @@ -375,7 +394,7 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
func (woc *wfOperationCtx) expandStepGroup(sgNodeName string, stepGroup []wfv1.WorkflowStep, stepsCtx *stepsContext) ([]wfv1.WorkflowStep, error) {
newStepGroup := make([]wfv1.WorkflowStep, 0)
for _, step := range stepGroup {
if len(step.WithItems) == 0 && step.WithParam == "" && step.WithSequence == nil {
if !step.ShouldExpand() {
newStepGroup = append(newStepGroup, step)
continue
}
Expand Down
57 changes: 57 additions & 0 deletions workflow/controller/steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,60 @@ func TestStepsFailedRetries(t *testing.T) {
woc.operate()
assert.Equal(t, string(wfv1.NodeFailed), string(woc.wf.Status.Phase))
}

var artifactResolutionWhenSkipped = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: conditional-artifact-passing-
spec:
entrypoint: artifact-example
templates:
- name: artifact-example
steps:
- - name: generate-artifact
template: whalesay
when: "false"
- - name: consume-artifact
template: print-message
when: "false"
arguments:
artifacts:
- name: message
from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}"
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["sleep 1; cowsay hello world | tee /tmp/hello_world.txt"]
outputs:
artifacts:
- name: hello-art
path: /tmp/hello_world.txt
- name: print-message
inputs:
artifacts:
- name: message
path: /tmp/message
container:
image: alpine:latest
command: [sh, -c]
args: ["cat /tmp/message"]
`

// Tests ability to reference workflow parameters from within top level spec fields (e.g. spec.volumes)
func TestArtifactResolutionWhenSkipped(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

wf := unmarshalWF(artifactResolutionWhenSkipped)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)

woc.operate()
assert.Equal(t, wfv1.NodeSucceeded, woc.wf.Status.Phase)
}

0 comments on commit 4c452d5

Please sign in to comment.