Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow retry strategy on non-leaf nodes, eg for step groups. Fixes #1891 #1892

Merged
merged 3 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,18 @@ func unmarshalWFTmpl(yamlStr string) *wfv1.WorkflowTemplate {
return &wftmpl
}

// makePodsRunning acts like a pod controller and simulates the transition of pods transitioning into a running state
func makePodsRunning(t *testing.T, kubeclientset kubernetes.Interface, namespace string) {
// makePodsPhase acts like a pod controller and simulates the transition of pods transitioning into a specified state
func makePodsPhase(t *testing.T, phase apiv1.PodPhase, kubeclientset kubernetes.Interface, namespace string) {
podcs := kubeclientset.CoreV1().Pods(namespace)
pods, err := podcs.List(metav1.ListOptions{})
assert.NoError(t, err)
for _, pod := range pods.Items {
pod.Status.Phase = apiv1.PodRunning
_, _ = podcs.Update(&pod)
if pod.Status.Phase == "" {
pod.Status.Phase = phase
if phase == apiv1.PodFailed {
pod.Status.Message = "Pod failed"
}
_, _ = podcs.Update(&pod)
}
}
}
43 changes: 25 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,9 +613,8 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
if time.Now().Before(waitingDeadline) {
retryMessage := fmt.Sprintf("Retrying in %s", humanize.Duration(time.Until(waitingDeadline)))
return woc.markNodePhase(node.Name, node.Phase, retryMessage), false, nil
} else {
node = woc.markNodePhase(node.Name, node.Phase, "")
}
node = woc.markNodePhase(node.Name, node.Phase, "")
}

var retryOnFailed bool
Expand Down Expand Up @@ -1301,7 +1300,7 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
// the container. The status of this node should be "Success" if any
// of the retries succeed. Otherwise, it is "Failed".
retryNodeName := ""
if processedTmpl.IsLeaf() && processedTmpl.RetryStrategy != nil {
if processedTmpl.RetryStrategy != nil {
retryNodeName = nodeName
retryParentNode := node
if retryParentNode == nil {
Expand All @@ -1326,19 +1325,20 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}
if lastChildNode != nil && !lastChildNode.Completed() {
// Last child node is still running.
return retryParentNode, nil
}

// Create a new child node and append it to the retry node.
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, len(retryParentNode.Children))
woc.addChildNode(retryNodeName, nodeName)
node = nil

// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, map[string]string{common.LocalVarPodName: woc.wf.NodeID(nodeName)})
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, boundaryID, err), err
nodeName = lastChildNode.Name
node = lastChildNode
} else {
// Create a new child node and append it to the retry node.
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, len(retryParentNode.Children))
woc.addChildNode(retryNodeName, nodeName)
node = nil

// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, map[string]string{common.LocalVarPodName: woc.wf.NodeID(nodeName)})
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, boundaryID, err), err
}
}
}
}
Expand Down Expand Up @@ -1372,9 +1372,16 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}
node = woc.getNodeByName(node.Name)

// Swap the node back to retry node.
// Swap the node back to retry node
if retryNodeName != "" {
node = woc.getNodeByName(retryNodeName)
retryNode := woc.getNodeByName(retryNodeName)
if !retryNode.Completed() && node.Completed() { //if the retry child has completed we need to update outself
node, err = woc.executeTemplate(retryNodeName, orgTmpl, tmplCtx, args, boundaryID)
if err != nil {
return woc.markNodeError(node.Name, err), err
}
}
node = retryNode
}

return node, nil
Expand Down
82 changes: 78 additions & 4 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,80 @@ func TestAssessNodeStatus(t *testing.T) {
}
}

var workflowStepRetry = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: step-retry
spec:
entrypoint: step-retry
templates:
- name: step-retry
retryStrategy:
limit: 1
steps:
- - name: whalesay-success
arguments:
parameters:
- name: message
value: success
template: whalesay
- - name: whalesay-failure
arguments:
parameters:
- name: message
value: failure
template: whalesay

- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay {{inputs.parameters.message}}"]
`

// TestWorkflowParallelismLimit verifies parallelism at a workflow level is honored.
func TestWorkflowStepRetry(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wf := unmarshalWF(workflowStepRetry)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.Nil(t, err)
assert.Equal(t, 1, len(pods.Items))

//complete the first pod
makePodsPhase(t, apiv1.PodSucceeded, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()

// fail the second pod
makePodsPhase(t, apiv1.PodFailed, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err = controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.Nil(t, err)
assert.Equal(t, 3, len(pods.Items))
assert.Equal(t, "cowsay success", pods.Items[0].Spec.Containers[1].Args[0])
assert.Equal(t, "cowsay failure", pods.Items[1].Spec.Containers[1].Args[0])

//verify that after the cowsay failure pod failed, we are retrying cowsay success
assert.Equal(t, "cowsay success", pods.Items[2].Spec.Containers[1].Args[0])

}

var workflowParallelismLimit = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down Expand Up @@ -451,7 +525,7 @@ func TestWorkflowParallelismLimit(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, len(pods.Items))
// operate again and make sure we don't schedule any more pods
makePodsRunning(t, controller.kubeclientset, wf.ObjectMeta.Namespace)
makePodsPhase(t, apiv1.PodRunning, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
// wfBytes, _ := json.MarshalIndent(wf, "", " ")
Expand Down Expand Up @@ -510,7 +584,7 @@ func TestStepsTemplateParallelismLimit(t *testing.T) {
assert.Equal(t, 2, len(pods.Items))

// operate again and make sure we don't schedule any more pods
makePodsRunning(t, controller.kubeclientset, wf.ObjectMeta.Namespace)
makePodsPhase(t, apiv1.PodRunning, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
// wfBytes, _ := json.MarshalIndent(wf, "", " ")
Expand Down Expand Up @@ -566,7 +640,7 @@ func TestDAGTemplateParallelismLimit(t *testing.T) {
assert.Equal(t, 2, len(pods.Items))

// operate again and make sure we don't schedule any more pods
makePodsRunning(t, controller.kubeclientset, wf.ObjectMeta.Namespace)
makePodsPhase(t, apiv1.PodRunning, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
// wfBytes, _ := json.MarshalIndent(wf, "", " ")
Expand Down Expand Up @@ -2135,7 +2209,7 @@ func TestEventTimeout(t *testing.T) {
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
makePodsRunning(t, controller.kubeclientset, wf.ObjectMeta.Namespace)
makePodsPhase(t, apiv1.PodRunning, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
woc = newWorkflowOperationCtx(wf, controller)
Expand Down
3 changes: 0 additions & 3 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,6 @@ func validateNonLeaf(tmpl *wfv1.Template) error {
if tmpl.ActiveDeadlineSeconds != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.activeDeadlineSeconds is only valid for leaf templates", tmpl.Name)
}
if tmpl.RetryStrategy != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.retryStrategy is only valid for container templates", tmpl.Name)
}
return nil
}

Expand Down
28 changes: 0 additions & 28 deletions workflow/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,34 +1066,6 @@ func TestLeafWithParallelism(t *testing.T) {
}
}

var nonLeafWithRetryStrategy = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: non-leaf-with-retry-strategy
spec:
entrypoint: non-leaf-with-retry-strategy
templates:
- name: non-leaf-with-retry-strategy
retryStrategy:
limit: 4
steps:
- - name: try
template: try
- name: try
container:
image: debian:9.4
command: [sh, -c]
args: ["kubectl version"]
`

func TestNonLeafWithRetryStrategy(t *testing.T) {
err := validate(nonLeafWithRetryStrategy)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "is only valid")
}
}

var invalidStepsArgumentNoFromOrLocation = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down