Skip to content

Commit

Permalink
feat: Allow retry strategy on non-leaf nodes, eg for step groups. Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
markterm authored Feb 28, 2020
1 parent 62e6db8 commit e9e13d4
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 57 deletions.
13 changes: 9 additions & 4 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,18 @@ func unmarshalWFTmpl(yamlStr string) *wfv1.WorkflowTemplate {
return &wftmpl
}

// makePodsRunning acts like a pod controller and simulates the transition of pods transitioning into a running state
func makePodsRunning(t *testing.T, kubeclientset kubernetes.Interface, namespace string) {
// makePodsPhase acts like a pod controller and simulates the transition of pods transitioning into a specified state
func makePodsPhase(t *testing.T, phase apiv1.PodPhase, kubeclientset kubernetes.Interface, namespace string) {
podcs := kubeclientset.CoreV1().Pods(namespace)
pods, err := podcs.List(metav1.ListOptions{})
assert.NoError(t, err)
for _, pod := range pods.Items {
pod.Status.Phase = apiv1.PodRunning
_, _ = podcs.Update(&pod)
if pod.Status.Phase == "" {
pod.Status.Phase = phase
if phase == apiv1.PodFailed {
pod.Status.Message = "Pod failed"
}
_, _ = podcs.Update(&pod)
}
}
}
43 changes: 25 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,8 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
if time.Now().Before(waitingDeadline) {
retryMessage := fmt.Sprintf("Retrying in %s", humanize.Duration(time.Until(waitingDeadline)))
return woc.markNodePhase(node.Name, node.Phase, retryMessage), false, nil
} else {
node = woc.markNodePhase(node.Name, node.Phase, "")
}
node = woc.markNodePhase(node.Name, node.Phase, "")
}

var retryOnFailed bool
Expand Down Expand Up @@ -1312,7 +1311,7 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
// the container. The status of this node should be "Success" if any
// of the retries succeed. Otherwise, it is "Failed".
retryNodeName := ""
if processedTmpl.IsLeaf() && processedTmpl.RetryStrategy != nil {
if processedTmpl.RetryStrategy != nil {
retryNodeName = nodeName
retryParentNode := node
if retryParentNode == nil {
Expand All @@ -1337,19 +1336,20 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}
if lastChildNode != nil && !lastChildNode.Completed() {
// Last child node is still running.
return retryParentNode, nil
}

// Create a new child node and append it to the retry node.
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, len(retryParentNode.Children))
woc.addChildNode(retryNodeName, nodeName)
node = nil

// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, map[string]string{common.LocalVarPodName: woc.wf.NodeID(nodeName)})
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, boundaryID, err), err
nodeName = lastChildNode.Name
node = lastChildNode
} else {
// Create a new child node and append it to the retry node.
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, len(retryParentNode.Children))
woc.addChildNode(retryNodeName, nodeName)
node = nil

// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, map[string]string{common.LocalVarPodName: woc.wf.NodeID(nodeName)})
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, boundaryID, err), err
}
}
}
}
Expand Down Expand Up @@ -1383,9 +1383,16 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}
node = woc.getNodeByName(node.Name)

// Swap the node back to retry node.
// Swap the node back to retry node
if retryNodeName != "" {
node = woc.getNodeByName(retryNodeName)
retryNode := woc.getNodeByName(retryNodeName)
if !retryNode.Completed() && node.Completed() { //if the retry child has completed we need to update outself
node, err = woc.executeTemplate(retryNodeName, orgTmpl, tmplCtx, args, boundaryID)
if err != nil {
return woc.markNodeError(node.Name, err), err
}
}
node = retryNode
}

return node, nil
Expand Down
82 changes: 78 additions & 4 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,80 @@ func TestAssessNodeStatus(t *testing.T) {
}
}

var workflowStepRetry = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: step-retry
spec:
entrypoint: step-retry
templates:
- name: step-retry
retryStrategy:
limit: 1
steps:
- - name: whalesay-success
arguments:
parameters:
- name: message
value: success
template: whalesay
- - name: whalesay-failure
arguments:
parameters:
- name: message
value: failure
template: whalesay
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay {{inputs.parameters.message}}"]
`

// TestWorkflowParallelismLimit verifies parallelism at a workflow level is honored.
func TestWorkflowStepRetry(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wf := unmarshalWF(workflowStepRetry)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.Nil(t, err)
assert.Equal(t, 1, len(pods.Items))

//complete the first pod
makePodsPhase(t, apiv1.PodSucceeded, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()

// fail the second pod
makePodsPhase(t, apiv1.PodFailed, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err = controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.Nil(t, err)
assert.Equal(t, 3, len(pods.Items))
assert.Equal(t, "cowsay success", pods.Items[0].Spec.Containers[1].Args[0])
assert.Equal(t, "cowsay failure", pods.Items[1].Spec.Containers[1].Args[0])

//verify that after the cowsay failure pod failed, we are retrying cowsay success
assert.Equal(t, "cowsay success", pods.Items[2].Spec.Containers[1].Args[0])

}

var workflowParallelismLimit = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down Expand Up @@ -451,7 +525,7 @@ func TestWorkflowParallelismLimit(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, len(pods.Items))
// operate again and make sure we don't schedule any more pods
makePodsRunning(t, controller.kubeclientset, wf.ObjectMeta.Namespace)
makePodsPhase(t, apiv1.PodRunning, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
// wfBytes, _ := json.MarshalIndent(wf, "", " ")
Expand Down Expand Up @@ -510,7 +584,7 @@ func TestStepsTemplateParallelismLimit(t *testing.T) {
assert.Equal(t, 2, len(pods.Items))

// operate again and make sure we don't schedule any more pods
makePodsRunning(t, controller.kubeclientset, wf.ObjectMeta.Namespace)
makePodsPhase(t, apiv1.PodRunning, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
// wfBytes, _ := json.MarshalIndent(wf, "", " ")
Expand Down Expand Up @@ -566,7 +640,7 @@ func TestDAGTemplateParallelismLimit(t *testing.T) {
assert.Equal(t, 2, len(pods.Items))

// operate again and make sure we don't schedule any more pods
makePodsRunning(t, controller.kubeclientset, wf.ObjectMeta.Namespace)
makePodsPhase(t, apiv1.PodRunning, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
// wfBytes, _ := json.MarshalIndent(wf, "", " ")
Expand Down Expand Up @@ -2135,7 +2209,7 @@ func TestEventTimeout(t *testing.T) {
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
makePodsRunning(t, controller.kubeclientset, wf.ObjectMeta.Namespace)
makePodsPhase(t, apiv1.PodRunning, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
woc = newWorkflowOperationCtx(wf, controller)
Expand Down
3 changes: 0 additions & 3 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,6 @@ func validateNonLeaf(tmpl *wfv1.Template) error {
if tmpl.ActiveDeadlineSeconds != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.activeDeadlineSeconds is only valid for leaf templates", tmpl.Name)
}
if tmpl.RetryStrategy != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.retryStrategy is only valid for container templates", tmpl.Name)
}
return nil
}

Expand Down
28 changes: 0 additions & 28 deletions workflow/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,34 +1066,6 @@ func TestLeafWithParallelism(t *testing.T) {
}
}

var nonLeafWithRetryStrategy = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: non-leaf-with-retry-strategy
spec:
entrypoint: non-leaf-with-retry-strategy
templates:
- name: non-leaf-with-retry-strategy
retryStrategy:
limit: 4
steps:
- - name: try
template: try
- name: try
container:
image: debian:9.4
command: [sh, -c]
args: ["kubectl version"]
`

func TestNonLeafWithRetryStrategy(t *testing.T) {
err := validate(nonLeafWithRetryStrategy)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "is only valid")
}
}

var invalidStepsArgumentNoFromOrLocation = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down

0 comments on commit e9e13d4

Please sign in to comment.